# Lab11B: Spark

**Background**

You will use Spark and Python to process genomic data. This consists of bout 3 billion nucleotides in the human genome and a smaller number for the flatworm C. elegans. The genome sequences are found as FASTA files. For the purposes of this exercise, treat lower and upper case as the same. Recall that FASTA files have comment lines starting with '>' that must be excluded from the analysis. For the exercises below, assume that k=20 for the k-mers.

In [1]:
%%spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
341,application_1522938745830_0496,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
hadoop = sc._jvm.org.apache.hadoop

fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration() 
path = hadoop.fs.Path('/data/c_elegans')

for f in fs.get(conf).listStatus(path):
    print f.getPath()

hdfs://vcm-2167.oit.duke.edu:8020/data/c_elegans/Caenorhabditis_elegans.WBcel235.dna.chromosome.I.fa
hdfs://vcm-2167.oit.duke.edu:8020/data/c_elegans/Caenorhabditis_elegans.WBcel235.dna.chromosome.II.fa
hdfs://vcm-2167.oit.duke.edu:8020/data/c_elegans/Caenorhabditis_elegans.WBcel235.dna.chromosome.III.fa
hdfs://vcm-2167.oit.duke.edu:8020/data/c_elegans/Caenorhabditis_elegans.WBcel235.dna.chromosome.IV.fa
hdfs://vcm-2167.oit.duke.edu:8020/data/c_elegans/Caenorhabditis_elegans.WBcel235.dna.chromosome.V.fa
hdfs://vcm-2167.oit.duke.edu:8020/data/c_elegans/Caenorhabditis_elegans.WBcel235.dna.chromosome.X.fa

**Exercise 2 (50 points)**

Write a program using `spark` to find 5 most common k-mers (shifting windows of length k) in the human genome. Ignore case when processing k-mers. You can work one line at a time - we will ignore k-mers that wrap around lines. You should write a function that takes a path to FASTA files and a value for k, and returns an key-value RDD of k-mer counts. Remember to strip comment lines that begin with '>' from the anlaysis. 

Use k=20

**Note**: The textFile method takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Please set this paramter to 60 - it will speed up processing.

**Check**: Use the C. elegans genome at `/data/c_elegans`. You should get 

```
[
(u'ATATATATATATATATATAT', 2217), 
(u'TATATATATATATATATATA', 2184), 
(u'CTCTCTCTCTCTCTCTCTCT', 1373), 
(u'TCTCTCTCTCTCTCTCTCTC', 1361), 
(u'AGAGAGAGAGAGAGAGAGAG', 1033)
]
```

In [3]:
def k_mers(g_string, k):
    tmp = [g_string[i:] for i in range(k)]
    return map(lambda x: (''.join(x),1), zip(*tmp))

In [4]:
def k_mers_all(path, k):
    rdd = sc.textFile(path,60).filter(lambda x: x[0]!='>')
    rdd = rdd.flatMap(lambda g_string: k_mers(g_string, k))
    return rdd.reduceByKey(lambda x, y: x+y)

In [5]:
from functools import reduce
path_pattern = '/data/c_elegans/Caenorhabditis_elegans.WBcel235.dna.chromosome.%s.fa'
name_list = ['I','II','III','IV','V']
rdd_list = [k_mers_all(path_pattern%name, 20) for name in name_list]
df_list = [spark.createDataFrame(rdd,['sequence','count%i'%i]) for i,rdd in enumerate(rdd_list)]
df = reduce(lambda x,y: x.join(y, on='sequence', how='outer'), df_list)

In [8]:
df.show(5)

+--------------------+------+------+------+------+------+
|            sequence|count0|count1|count2|count3|count4|
+--------------------+------+------+------+------+------+
|AAAAAAAAAAAAAAATATTT|     1|  null|     1|  null|  null|
|AAAAAAAAAAAAACAAACCG|     1|  null|  null|  null|  null|
|AAAAAAAAAAAAAGTCAACT|  null|  null|  null|     1|  null|
|AAAAAAAAAAAACGTCTAAA|  null|     1|  null|  null|  null|
|AAAAAAAAAAAACTCAATTA|  null|     1|  null|  null|  null|
+--------------------+------+------+------+------+------+
only showing top 5 rows

In [9]:
df = df.fillna(0)

In [10]:
df.show(5)

+--------------------+------+------+------+------+------+
|            sequence|count0|count1|count2|count3|count4|
+--------------------+------+------+------+------+------+
|AAAAAAAAAAAAAAATATTT|     1|     0|     1|     0|     0|
|AAAAAAAAAAAAACAAACCG|     1|     0|     0|     0|     0|
|AAAAAAAAAAAAAGTCAACT|     0|     0|     0|     1|     0|
|AAAAAAAAAAAACGTCTAAA|     0|     1|     0|     0|     0|
|AAAAAAAAAAAACTCAATTA|     0|     1|     0|     0|     0|
+--------------------+------+------+------+------+------+
only showing top 5 rows

In [11]:
df = df.withColumn('count', df['count0']+df['count1']+df['count2']+df['count3']+df['count4'])
df.show(5)

+--------------------+------+------+------+------+------+-----+
|            sequence|count0|count1|count2|count3|count4|count|
+--------------------+------+------+------+------+------+-----+
|AAAAAAAAAAAAAAATATTT|     1|     0|     1|     0|     0|    2|
|AAAAAAAAAAAAACAAACCG|     1|     0|     0|     0|     0|    1|
|AAAAAAAAAAAAAGTCAACT|     0|     0|     0|     1|     0|    1|
|AAAAAAAAAAAACGTCTAAA|     0|     1|     0|     0|     0|    1|
|AAAAAAAAAAAACTCAATTA|     0|     1|     0|     0|     0|    1|
+--------------------+------+------+------+------+------+-----+
only showing top 5 rows

In [12]:
sorted_df = df.select(['sequence','count']).sort(df['count'].desc())
sorted_df.show(5)

+--------------------+-----+
|            sequence|count|
+--------------------+-----+
|ATATATATATATATATATAT| 1757|
|TATATATATATATATATATA| 1739|
|CTCTCTCTCTCTCTCTCTCT|  990|
|TCTCTCTCTCTCTCTCTCTC|  976|
|AGAGAGAGAGAGAGAGAGAG|  861|
+--------------------+-----+
only showing top 5 rows

**Exercise 3 (10 points)** 

As a simple QC measure, we can assume that the k-mers that have a count of only 1 are due to sequencing errors. Put all the k-mers with a count of 2 or more in a Spark DataFrame with two columns (sequence, count). 

In [13]:
valid_df = sorted_df.filter(sorted_df['count'] > 1)
valid_df.show(5)

+--------------------+-----+
|            sequence|count|
+--------------------+-----+
|ATATATATATATATATATAT| 1757|
|TATATATATATATATATATA| 1739|
|CTCTCTCTCTCTCTCTCTCT|  990|
|TCTCTCTCTCTCTCTCTCTC|  976|
|AGAGAGAGAGAGAGAGAGAG|  861|
+--------------------+-----+
only showing top 5 rows

**Exercise 4 (10 points)**

Find all k-mers with count greater than 1 that are palindromes.

In [33]:
import pyspark.sql.functions as F
@F.udf
def reverse_seq(seq):
    return seq[::-1]
palindromes = valid_df.withColumn('reverse_seq',reverse_seq(valid_df['sequence']))
palindromes = palindromes.filter(palindromes['sequence'] == palindromes['reverse_seq'])
palindromes = palindromes.select(['sequence','count'])

In [34]:
palindromes.show(5)

+--------------------+-----+
|            sequence|count|
+--------------------+-----+
|TTTTTTTTTTTTTTTTTTTT|  185|
|AAAAAAAAAAAAAAAAAAAA|  135|
|GGGGGGGGGGGGGGGGGGGG|  135|
|CCCCCCCCCCCCCCCCCCCC|  111|
|AATAATAATAATAATAATAA|   71|
+--------------------+-----+
only showing top 5 rows