In [1]:
%%spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,,pyspark,idle,,,✔


SparkContext available as 'sc'.
HiveContext available as 'sqlContext'.


In [2]:
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

**Note 1**: In the following exercises, keep the amount of data returned by Spark to the local notebook session to the minimum needed for that exercise. In other words, all the work should be done via distributed computing and not by returning a large collection that is then processed in regular Python.

**Note 2**: To minimize waitinng times, do the exercises using the C. elegans genome. Human genome is at `/data/human/*fa` but takes a long time to processs, so you don't have to analyze the human genome unless you want to see how long it takes.

In [3]:
# Change path when debugging is compete to work on human genome

# fasta_path = '/data/human/*fa'
fasta_path = '/data/c_elegans/*fa'

**Exercise 1 (50 points)**

Write a program using `spark` to find 5 most common k-mers (shifting windows of length k) in the human genome. Ignore case when processing k-mers. You can work one line at a time - we will ignore k-mers that wrap around lines. You should write a function that takes a path to FASTA files and a value for k, and returns an key-value RDD of k-mer counts. Remember to strip comment lines that begin with '>' from the anlaysis. 

**Note**: The textFile method takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Please set this paramter to 60 - it will speed up processing.

**Check**: Use the C. elegans genome at `/data/c_elegans/*fa`. You should get 

```
[
(u'ATATATATATATATATATAT', 2168), 
(u'TATATATATATATATATATA', 2142), 
(u'CTCTCTCTCTCTCTCTCTCT', 1337), 
(u'TCTCTCTCTCTCTCTCTCTC', 1327), 
(u'AGAGAGAGAGAGAGAGAGAG', 1007)
]
```

In [4]:
def count_kmers(path, k):
    """Returns RDD of coutns of k-mers for FASTA files in path."""
    
    counts = (sc.textFile(fasta_path, 60)
              .map(lambda line: line.upper())
              .filter(lambda line: line.strip() and not line.startswith('>') )
              .flatMap(lambda read: [read[i:i+k] for i in range(len(read) - k + 1)])
              .map(lambda kmer: (kmer, 1))
              .reduceByKey(lambda a, b: a + b))
    
    return counts

In [5]:
k = 20
counts = count_kmers(fasta_path, k)
counts.takeOrdered(5, key=lambda x: -x[1])

[(u'ATATATATATATATATATAT', 2217), (u'TATATATATATATATATATA', 2184), (u'CTCTCTCTCTCTCTCTCTCT', 1373), (u'TCTCTCTCTCTCTCTCTCTC', 1361), (u'AGAGAGAGAGAGAGAGAGAG', 1033)]

**Exercise 2 (10 points)**

Find all k-mers that are palindromes. How many are there?

In [6]:
def is_palindrome(seq):
    return seq == seq[::-1]

Counting unique palindromes

In [7]:
palindromes = counts.filter(lambda x: is_palindrome(x[0]))
palindromes.count()

951

Counting all palindromes including repeats.

In [10]:
palindromes.map(lambda x: x[1]).sum()

2816

**Exercise 3 (10 points)** 

As a simple QC measure, we can assume that the k-mers that have a count of only 1 are due to sequencing errors. Put all the k-mers with a count of 2 or more in a Spark DataFrame with two columns (sequence, count). Count how many rows in the DataFrame have counts between 5 and 10 (inclusive of both 5 and 10).

In [8]:
df = counts.filter(lambda x: x[1] > 1).toDF(['seqeunce', 'count'])

In [9]:
df.filter((df['count'] >= 5) & (df['count'] <= 10)).count()

174486

**Exercsie 4 (30 points)**

Make a Markov transition matrix for any nucleotide ('A', 'C', 'T', 'G') to any other nucleotide. The (i,j) entry should indicate the probability of finding the jth nucleotide appearing immediaely after the ith nucleotide in the genome. For example, the entry (0, 2) shows the probability of finding a T immediately followng an A. The matrix should have shape (4,4). Ignore any letter not in 'ACTG'.

In [10]:
pairs = count_kmers(fasta_path, 2).collectAsMap()

In [11]:
import numpy as np

In [12]:
def mapper(s):
    nuc = 'ACTG'
    if not s[0] in nuc or not s[1] in nuc:
        return None
    i, j = nuc.index(s[0]), nuc.index(s[1])
    return i, j

M = np.zeros((4,4))
for key in pairs:
    if mapper(key) is None:
        continue
    i, j = mapper(key)
    M[i, j] = pairs[key]
M/M.sum(axis=1)[:, np.newaxis]

array([[ 0.41930811,  0.14972777,  0.27417737,  0.15678674],
       [ 0.34893751,  0.18929553,  0.28566181,  0.17610515],
       [ 0.196519  ,  0.19243038,  0.41931672,  0.19173389],
       [ 0.35093673,  0.18814073,  0.27257127,  0.18835127]])