In [1]:
from pylab import * 
import seaborn as sns
%matplotlib inline

# Big Data Analysis with Apache Spark

![](spark-logo.png)

Advantages:
* distributed, in-memory
* (almost) arbitrary sources (flatfile, json, ODBC/JDBC data source, ...)
* use from python/R/Java/...
* Use SQL or write complex map-reduce tasks in the programming language of choice (-> machine learning) 


## 1. Setup the cluster
see http://hpcportal.roche.com/documentation/hpc-resources/show/7224119760665747911

Essentially: 
```bash
ssh hpclogin.bas.roche.com
ml load Spark-pREDi
ml load Spark/2.1.0
sbatch spark-launch.sh
```

starts a 5-node spark cluster with 1.2TB of memory. 

Inspect the `slurm-*.out` file to get the URL. 

## 2. Access the cluster from Jupyter

In [2]:
import findspark
findspark.init("/pstore/apps/Spark/2.1.0/")
import pyspark_csv

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from utils import *

In [4]:
conf = SparkConf()
conf.setMaster("spark://rkalbhpc024.cm.cluster:45425")

<pyspark.conf.SparkConf at 0x7fffe52aae48>

In [5]:
sc = SparkContext(conf=conf, pyFiles=['pyspark_csv.py'])
sqlContext = SQLContext(sc)

In [6]:
sc.master

'spark://rkalbhpc024.cm.cluster:45425'

## 3. Example: How does the median rank of liver genes differ between platforms? 
Input: 
* liver signature (~100 genes)
* gene expression data from GEO (54 GB)
* metadata from GEO (19GB)

### Get the liver genes from a `gmt` file

In [7]:
from pygenesig.file_formats import read_gmt

In [8]:
signatures = read_gmt("/pstore/data/bioinfo/users/zhangj83/sturmg_data_migration_201704/gtex-signatures/results/gtex_v6_solid_gini_0.8_1/signatures.gmt")

In [9]:
genes = signatures["Kidney"]
tissue = 'kidney'

### Load the gene expression data

In [10]:
!ls -lh /pstore/data/bioinfo/users/zhangj83/sturmg_data_migration_201704/geo_bigdata/

total 311G
-rw-rw---- 1 sturmg bioinfo  53G Apr 20 17:52 geo_all_82k.uniq.tsv
-rw-rw---- 1 sturmg bioinfo 240G Apr 21 08:58 geo_all.tsv
-rw-rw---- 1 sturmg bioinfo  19G Apr 21 17:03 geo_meta_denormalized.csv
drwxrws--- 2 sturmg bioinfo 4.0K Apr 26 16:24 pathway_bioqc


In [11]:
df_signals = sqlContext.read.format('com.databricks.spark.csv').options(
    header='false', delimiter='\t', nullValue='NA'
).load("/pstore/data/bioinfo/users/zhangj83/sturmg_data_migration_201704/geo_bigdata/geo_all_82k.uniq.tsv", schema=schema_signals)

In [12]:
rdd_signals = sc.textFile('/pstore/data/bioinfo/users/zhangj83/sturmg_data_migration_201704/geo_bigdata/geo_all_82k.uniq.tsv')

In [13]:
def nafloat(x):
    if x == 'NA': 
        return float('nan')
    else:
        return float(x)

In [14]:
rdd2 = rdd_signals.map(lambda x: x.split('\t')).map(lambda r: (r[0], r[1], nafloat(r[2]), nafloat(r[3])))

In [15]:
df_signals = sqlContext.createDataFrame(rdd2, schema_signals)

In [16]:
#df_signals = pyspark_csv.csvToDataFrame(sqlContext, rdd_signals, sep='\t')

### Two ways to achieve the same thing: Using SQL...

In [17]:
df_genes = sqlContext.createDataFrame([(x,) for x in genes], ['hgnc'])

In [18]:
df_signals.createOrReplaceTempView("signals")
df_genes.createOrReplaceTempView("genes")

In [19]:
!head "demodata/bioqc_selected_samples.tsv"

GSM	GPL	ORGANISM	TISSUE_ORIG	TISSUE	YEAR	COUNTRY
GSM1136332	GPL10558	Homo sapiens	whole blood	blood	2013	USA
GSM1136428	GPL10558	Homo sapiens	whole blood	blood	2013	USA
GSM1136334	GPL10558	Homo sapiens	whole blood	blood	2013	USA
GSM1136340	GPL10558	Homo sapiens	whole blood	blood	2013	USA
GSM1136344	GPL10558	Homo sapiens	whole blood	blood	2013	USA
GSM1136617	GPL10558	Homo sapiens	whole blood	blood	2013	USA
GSM1136527	GPL10558	Homo sapiens	whole blood	blood	2013	USA
GSM1136575	GPL10558	Homo sapiens	whole blood	blood	2013	USA
GSM1136622	GPL10558	Homo sapiens	whole blood	blood	2013	USA


In [20]:
meta_schema = StructType([
    StructField("gsm", StringType(), True),
    StructField("gpl", StringType(), True),
    StructField("tissue", StringType(), True)
])

In [21]:
rdd_meta = sc.textFile("demodata/bioqc_selected_samples.tsv")
rdd3 = rdd_meta.map(lambda x: x.split('\t')).map(lambda r: (r[0], r[1], r[4]))
df_meta = sqlContext.createDataFrame(rdd3, meta_schema)
df_meta.createOrReplaceTempView("meta")

In [22]:
# df_meta = sqlContext.read.format("com.databricks.spark.csv").options(
#     header='true', delimiter='\t'
# ).load("demodata/bioqc_selected_samples.tsv")
# df_meta.createOrReplaceTempView('meta')

In [23]:
df_result = sqlContext.sql("""
select signals.gsm
     , meta.gpl
     , percentile_approx(rk, .5) as median_rank
from signals
join genes
  on genes.hgnc = signals.hgnc
join meta
  on meta.gsm = signals.gsm
where meta.tissue = '{}'
group by signals.gsm, meta.gpl
""".format(tissue))
df_result.createOrReplaceTempView("result")

In [24]:
df_meta.show(5)

+----------+--------+------+
|       gsm|     gpl|tissue|
+----------+--------+------+
|       GSM|     GPL|TISSUE|
|GSM1136332|GPL10558| blood|
|GSM1136428|GPL10558| blood|
|GSM1136334|GPL10558| blood|
|GSM1136340|GPL10558| blood|
+----------+--------+------+
only showing top 5 rows



## Get aggregated result in Python 
Now we switch from 'big' to 'small' data. 

In [25]:
res = df_result.toPandas()

Py4JJavaError: An error occurred while calling o68.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 30 in stage 1.1 failed 4 times, most recent failure: Lost task 30.3 in stage 1.1 (TID 1922, 10.115.83.27, executor 0): java.io.FileNotFoundException: /tmp/spark-d708b900-2686-4335-85e0-bd6d2a329d2e/executor-7b521bd5-1a56-4f5f-95fb-df07e3143919/blockmgr-0accf657-1468-4c95-97c9-9d1adba1b4a9/0f/temp_shuffle_1bfc9bc2-a6b9-4d56-95a8-940e017fa6b3 (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:102)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:115)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:229)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:152)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2745)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2742)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2742)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2742)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: /tmp/spark-d708b900-2686-4335-85e0-bd6d2a329d2e/executor-7b521bd5-1a56-4f5f-95fb-df07e3143919/blockmgr-0accf657-1468-4c95-97c9-9d1adba1b4a9/0f/temp_shuffle_1bfc9bc2-a6b9-4d56-95a8-940e017fa6b3 (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:102)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:115)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:229)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:152)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
res.head()

In [None]:
fig, ax = subplots(figsize=(15, 5))
sns.boxplot(data=res, x='gpl', y='median_expression', ax=ax)
ax.set_xticklabels(ax.get_xticklabels(), rotation='vertical');