# Lecture 3: Spark

## Overview

1. Using Apache Spark with Python


## Map-Reduce on a cluster of computers

- The code we have written so far will *not* allow us to exploit parallelism from multiple computers in a [cluster](https://en.wikipedia.org/wiki/Computer_cluster).

- Developing such a framework would be a very large software engineering project.

- There are existing frameworks we can use:
    - [Apache Hadoop](https://hadoop.apache.org/)
    - [Apache Spark](https://spark.apache.org/)
    
- In this lecture we will cover Apache Spark.

## Apache Spark

- Apache Spark provides an object-oriented library for processing data on the cluster.

- It provides objects which represent resilient distributed datasets (RDDs).

- RDDs behave a bit like Python collections (e.g. lists).

- However:
    - the underlying data is distributed across the nodes in the cluster, and
    - the collections are *immutable*.

## Apache Spark and Map-Reduce

- We process the data by using higher-order functions to map RDDs onto *new* RDDs. 

- Each instance of an RDD has at least two *methods* corresponding to the Map-Reduce workflow:
    - `map`
    - `reduceByKey`
    
- These methods work in the same way as the corresponding functions we defined earlier to work with the standard Python collections.  

- There are also additional RDD methods in the Apache Spark API;
    - Apache Spark is a *super-set* of Map-Reduce.
   

## Word-count in Apache Spark



In [122]:
words = "to be or not to be".split()
words

['to', 'be', 'or', 'not', 'to', 'be']

### The `SparkContext` class

- When working with Apache Spark we invoke methods on an object which is an instance of the `pyspark.context.SparkContext` context.

- Typically, an instance of this object will be created automatically for you and assigned to the variable `sc`.

- The `parallelize` method in `SparkContext` can be used to turn any ordinary Python collection into an RDD;
    - normally we would create an RDD from a large file or an HBase table. 

In [None]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder.master("local").getOrCreate() 
sc = spark.sparkContex
words_rdd = sc.parallelize(words)
words_rdd

### Mapping an RDD

- Now when we invoke the `map` or `reduceByKey` methods on `my_rdd` we can set up a parallel processing computation across the cluster.

In [None]:
word_tuples_rdd = words_rdd.map(lambda x: (x, 1))
word_tuples_rdd

PythonRDD[1] at RDD at PythonRDD.scala:53

- Notice that we do not have a result yet.

- The computation is not performed until we request the final result to be *collected*.

- We do this by invoking the `collect()` method:

In [None]:
word_tuples_rdd.collect()

[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]

### Reducing an RDD

- However, we require additional processing:

In [None]:
word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)
word_counts_rdd

PythonRDD[6] at RDD at PythonRDD.scala:53

- Now we request the final result:

In [None]:
word_counts = word_counts_rdd.collect()
word_counts

[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

### Lazy evaluation 

- It is only when we invoke `collect()` that the processing is performed on the cluster.

- Invoking `collect()` will cause both the `map` and `reduceByKey` operations to be performed.

- If the resulting collection is very large then this can be an expensive operation.


### The head of an RDD

- The `take` method is similar to `collect`, but only returns the first $n$ elements.

- This can be very useful for testing.


In [None]:
word_counts_rdd.take(2)

[('to', 2), ('be', 2)]

### The complete word-count example

In [None]:
text = "to be or not to be".split()
rdd = sc.parallelize(text)
counts = rdd.map(lambda word: (word, 1)) \
             .reduceByKey(lambda x, y: x + y)
counts.collect()

[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

## Additional RDD transformations

- Apache Spark offers many more methods for operating on collections of tuples over and above the standard Map-Reduce framework:

    - Sorting: `sortByKey`, `sortBy`, `takeOrdered`
    - Mapping: `flatMap`
    - Filtering: `filter`
    - Counting: `count`
    - Set-theoretic: `intersection`, `union`
    - Many others: [see the Transformations section of the programming guide](https://spark.apache.org/docs/latest/programming-guide.html#transformations)
    

## Creating an RDD from a text file

- In the previous example, we created an RDD from a Python collection.

- This is *not* typically how we would work with big data.

- More commonly we would create an RDD corresponding to data in an
HBase table, or an HDFS file.

- The following example creates an RDD from a text file on the native filesystem (ext4);
    - With bigger data, you would use an HDFS file, but the principle is the same.

- Each element of the RDD corresponds to a single *line* of text.

In [None]:
genome = sc.textFile('../../../data/genome.txt')
genome.take(5)

['TTGGCCATGCTGCCCACTCACCTAGAGCGCACAGCTGACACTGAGTCCTCTTCTGAACCTCATCCATGAA',
 'CATATTTATGAAATCTTTCCTGGCCCCAAGTGGAAATGCCCCCTCATTTGGGTCCTCACTGAACCCCAGT',
 'ACACAACTCTTTTGTACTACTCTATTATGCTGGGGTGTTTTTTTATTGTCTCACCTGATAAACCGTAAGC',
 'CCCTTGAAGACAGCAACTCGTTTTTAAGCTCTTTATAACCCCAGAGCCTCGCACAGTACCTGGACCAGAT',
 'TAAGGGGTACTTAACAGATGCTTAGTGAAGGAAGGAATGGATTTCTCACCTGGTTGCTTATCTTCTAGAC']

## Genome example

- We will use this RDD to calculate the frequencies of sequences of five bases, and then sort the sequences into descending order ranked by their frequency.

- First we will define some functions to split the bases into sequences of a certain size:

In [None]:
def group_characters(line, n=5):
    result = ''
    i = 0
    for ch in line:
        result = result + ch
        i = i + 1
        if (i % n) == 0:
            yield result
            result = ''

def group_and_split(line):
    return [sequence for sequence in group_characters(line)]

In [None]:
group_and_split('abcdefghijklmno')

['abcde', 'fghij', 'klmno']

- Now we will transform the original text RDD into an RDD containing key-value pairs where the key is the sequence and the value is 1, as per the word-count example.

- Notice that if we simply map each line of text, we will obtain multi-dimensional data:

In [None]:
genome.map(group_and_split).take(2)

[['TTGGC',
  'CATGC',
  'TGCCC',
  'ACTCA',
  'CCTAG',
  'AGCGC',
  'ACAGC',
  'TGACA',
  'CTGAG',
  'TCCTC',
  'TTCTG',
  'AACCT',
  'CATCC',
  'ATGAA'],
 ['CATAT',
  'TTATG',
  'AAATC',
  'TTTCC',
  'TGGCC',
  'CCAAG',
  'TGGAA',
  'ATGCC',
  'CCCTC',
  'ATTTG',
  'GGTCC',
  'TCACT',
  'GAACC',
  'CCAGT']]

### Flattening an RDD using `flatMap`

- We will need to flatten this data in order to turn it into a list of base-sequences.

- We can use the `flatMap` method:

In [None]:
sequences = genome.flatMap(group_and_split)
sequences.take(3)

['TTGGC', 'CATGC', 'TGCCC']

In [None]:
counts = \
    sequences.map(
        lambda w: (w, 1)).reduceByKey(lambda x, y: x + y)
counts.take(10)

[('TTGGC', 587),
 ('CATGC', 647),
 ('TGCCC', 599),
 ('ACTCA', 775),
 ('TGACA', 831),
 ('TTCTG', 1257),
 ('AACCT', 726),
 ('TTATG', 819),
 ('AAATC', 996),
 ('TGGCC', 718)]

- We want to rank each sequence according to its count.

- Therefore the key (first element) of each tuple should be the count.

- Thefefore we need to reverse the tuples.

In [None]:
def reverse_tuple(key_value_pair):
    return (key_value_pair[1], key_value_pair[0])

In [None]:
sequences = counts.map(reverse_tuple)
sequences.take(10)

[(587, 'TTGGC'),
 (647, 'CATGC'),
 (599, 'TGCCC'),
 (775, 'ACTCA'),
 (831, 'TGACA'),
 (1257, 'TTCTG'),
 (726, 'AACCT'),
 (819, 'TTATG'),
 (996, 'AAATC'),
 (718, 'TGGCC')]

### Sorting an RDD

- Now we can sort the RDD in descending order of key:

In [None]:
sequences_sorted = sequences.sortByKey(False)
top_ten_sequences = sequences_sorted.take(10)
top_ten_sequences

[(37137, 'NNNNN'),
 (4653, 'AAAAA'),
 (4223, 'TTTTT'),
 (2788, 'AAAAT'),
 (2658, 'ATTTT'),
 (2283, 'AAATA'),
 (2276, 'TAAAA'),
 (2197, 'TTTTA'),
 (2196, 'TATTT'),
 (2185, 'AGAAA')]

## Calculating $\pi$ using Spark

- We can estimate an approximate value for $\pi$ using the following Monte-Carlo method:


1.    Inscribe a circle in a square
2.    Randomly generate points in the square
3.    Determine the number of points in the square that are also in the circle
4.    Let $r$ be the number of points in the circle divided by the number of points in the square, then $\pi \approx 4 r$.
    
- Note that the more points generated, the better the approximation

See [this tutorial](https://computing.llnl.gov/tutorials/parallel_comp/#ExamplesPI).

In [None]:
import numpy as np

def sample(p):
    x, y = np.random.random(), np.random.random()
    return 1 if x*x + y*y < 1 else 0

NUM_SAMPLES = 5000000

count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample) \
             .reduce(lambda a, b: a + b)
r = float(count) / float(NUM_SAMPLES)
print("Pi is approximately %.3f" % (4.0 * r))

Pi is approximately 3.141
