# Pyspark

Using pyspark from a Jupyter notebook is quite straightforward when using a local spark instance.  This can be installed trivially using conda, i.e.,
```
conda install pyspark
```

Once this is done, a local spark instance can be launched easily from within the notebook.

In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext('local', 'test')

## Example: counting characters

As an example, we read a file that contains an DNA sequence (unrealistically long).  We first check some properties of the file, and show the first few lines.  We want to count the number of nucleotides, i.e., the total number of occurrences of `A`, `C`, `G`, and `T`.

In [5]:
!wc Data/large_dna.txt

  12500   12500 1012500 Data/large_dna.txt


In [6]:
!head -3 Data/large_dna.txt

TAACATTCCAAGGTTTAATTGTTACAAATAAGTGCACCGTGCGACGAAACTCCACGTCTAAGTAATACAATTTGGCCGTG
CCGGGGACTTCACTGATGCCGGCTGATCTGATGCTGTTAATCAAGCACCGCTCATACACTTTAGTTTCACTTTTCAAAGT
GATAAGAGAAGATGGACCGATACATTATGGATCTTTTGTCTAGGTCTTAATAAGAAAAAAAGCCCTAGCTCCACTCAGGG


Read data from a text file, the resulting data is stored in an RDD.

In [3]:
data = sc.textFile('Data/large_dna.txt')

The RDD has as many elements as the data file has lines.  The order of the elements is the same as that of the lines in the file.

In [4]:
data.count()

12500

In [7]:
data.take(3)

['TAACATTCCAAGGTTTAATTGTTACAAATAAGTGCACCGTGCGACGAAACTCCACGTCTAAGTAATACAATTTGGCCGTG',
 'CCGGGGACTTCACTGATGCCGGCTGATCTGATGCTGTTAATCAAGCACCGCTCATACACTTTAGTTTCACTTTTCAAAGT',
 'GATAAGAGAAGATGGACCGATACATTATGGATCTTTTGTCTAGGTCTTAATAAGAAAAAAAGCCCTAGCTCCACTCAGGG']

Define a function that computes the number of nucleotimes in a string, returning the result as a tuple.  Note that this function is not the optimal implementation, but it is straighforward.

In [8]:
def count_nucl(seq):
    return tuple(seq.count(nucl) for nucl in 'ACGT')

This function can be applied to each element in the RDD indepedently, in Spark terminology, it is a transformation. Note that the transformation is lazy, it will only be computed when the result values are required.

In [9]:
counts = data.map(count_nucl)

Next, we define a function that computes the sum of the elements of two tuples, and returns a new tuple.

In [10]:
def sum_nucl(t1, t2):
    return tuple(x + y for x, y in zip(t1, t2))

In [11]:
total_count = counts.reduce(sum_nucl)

In [12]:
total_count

(249077, 250033, 250584, 250306)

### Alternative approach

An alternative approach is to construct an RDD with key/value pairs.

In [43]:
data = sc.textFile('Data/large_dna.txt')

First, we create a list of nucleotides for each element in the RDD.

In [54]:
nucleotides = data.map(list)

For each element in the RDD, we create a key/value pair, the key is the nucleotide, the value is 1.  Using the `flatMap` method ensures that the end result is an RDD with key/value pairs as a flat structure.

In [55]:
nucl = nucl_counts = nucleotides.flatMap(lambda x: ((n, 1) for n in x))

In [57]:
nucl.take(5)

[('T', 1), ('A', 1), ('A', 1), ('C', 1), ('A', 1)]

The `countByKey` method will count all RDD elements that have the same key.

In [56]:
for key, value in nucl_counts.countByKey().items():
    print(f'{key}: {value}')

T: 250306
A: 249077
C: 250033
G: 250584


## Example: counting signs

In [30]:
import numpy as np

RDDs can also be constructured starting from iterables such as numpy arrays.

In [31]:
data = sc.parallelize(np.random.uniform(-1.0, 1.0, (1000,)))

We want to count the number of positive and negative values, and cmopute the sum of all positve and negative numbers in the RDD. The first step is to transform the RDD into key/value pairs where the key is `'pos'` for numbers that are strictly positive, `'neg'` otherwise.  The corresponding values are the original numbers.

In [36]:
signs = data.map(lambda x: ('pos', x) if x > 0 else ('neg', x))

In [37]:
signs.take(5)

[('pos', 0.2536918594726303),
 ('pos', 0.8974359992515826),
 ('pos', 0.6258528836919743),
 ('pos', 0.686967322651217),
 ('pos', 0.1812645225445868)]

As in the previous example, counting can be done by key.

In [39]:
counts = signs.countByKey()

In [42]:
for key, value in counts.items():
    print(f'{key}: {value}')

pos: 503
neg: 497


To compute the sums, we can perform a reduction by key, using a lambda function to compute the pairwise sum.

In [58]:
sums = signs.reduceByKey(lambda x, y: x + y)

In [60]:
sums.take(2)

[('pos', 240.8051861456065), ('neg', -244.63638676192005)]

In [62]:
for key, value in sums.collect():
    print(f'{key}: {value}')

pos: 240.8051861456065
neg: -244.63638676192005
