# Apache Spark

![Logo](images/apache_spark_logo.png)

- [Apache Spark](https://spark.apache.org) was first released in 2014. 
- It was originally developed by [Matei Zaharia](http://people.csail.mit.edu/matei) as a class project, and later a PhD dissertation, at University of California, Berkeley.
- Spark is written in [Scala](https://www.scala-lang.org).
- All images come from [Databricks](https://databricks.com/product/getting-started-guide).

- Apache Spark is a fast and general-purpose cluster computing system. 
- It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
- Spark can manage "big data" collections with a small set of high-level primitives like `map`, `filter`, `groupby`, and `join`.  With these common patterns we can often handle computations that are more complex than map, but are still structured.
- It also supports a rich set of higher-level tools including [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) for SQL and structured data processing, [MLlib](https://spark.apache.org/docs/latest/ml-guide.html) for machine learning, [GraphX](https://spark.apache.org/docs/latest/graphx-programming-guide.html) for graph processing, and Spark Streaming.

# Resilient distributed datasets

- The fundamental abstraction of Apache Spark is a read-only, parallel, distributed, fault-tolerent collection called a resilient distributed datasets (RDD).
- RDDs behave a bit like Python collections (e.g. lists).
- When working with Apache Spark we iteratively apply functions to every item of these collections in parallel to produce *new* RDDs.
- The data is distributed across nodes in a cluster of computers.
- Functions implemented in Spark can work in parallel across elements of the collection.
- The  Spark framework allocates data and processing to different nodes, without any intervention from the programmer.
- RDDs automatically rebuilt on machine failure.


# Lifecycle of a Spark Program
1. Create some input RDDs from external data or parallelize a collection in your driver program.
2. Lazily transform them to define new RDDs using transformations like `filter()` or `map()`
3. Ask Spark to cache() any intermediate RDDs that will need to be reused.
4. Launch actions such as count() and collect() to kick off a parallel computation, which is then optimized and executed by Spark.

# Operations on Distributed Data
- Two types of operations: **transformations** and **actions**
- Transformations are *lazy* (not computed immediately) 
- Transformations are executed when an action is run


# [Transformations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) (lazy)
```spark
map() flatMap()
filter() 
mapPartitions() mapPartitionsWithIndex() 
sample()
union() intersection() distinct()
groupBy() groupByKey()
reduceBy() reduceByKey()
sortBy() sortByKey()
join()
cogroup()
cartesian()
pipe()
coalesce()
repartition()
partitionBy()
...
```

# [Actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)

```
reduce()
collect()
count()
first()
take()
takeSample()
saveToCassandra()
takeOrdered()
saveAsTextFile()
saveAsSequenceFile()
saveAsObjectFile()
countByKey()
foreach()
```

# PySpark


PySpark uses Py4J that enables Python programs to dynamically access Java objects.

![PySpark Internals](http://i.imgur.com/YlI8AqEl.png)

## The `SparkContext` class

- When working with Apache Spark we invoke methods on an object which is an instance of the `pyspark.SparkContext` context.

- Typically, an instance of this object will be created automatically for you and assigned to the variable `sc`.

- The `parallelize` method in `SparkContext` can be used to turn any ordinary Python collection into an RDD;
    - normally we would create an RDD from a large file or an HBase table. 

## First example

PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. You can address this by either symlinking pyspark into your site-packages, or adding pyspark to sys.path at runtime. [findspark](https://github.com/minrk/findspark) does the latter.

In [None]:
import findspark, pyspark

findspark.init(spark_home="/export/spark-2.3.1-bin-hadoop2.7/")

sc = pyspark.SparkContext(master="local[4]", appName="FirstExample")

In [None]:
# If tou get an error run this cell with the command below commented out
# and fix the path to spark and/or python in the cell above
# sc.stop()

We have a spark context sc to use with a tiny local spark cluster with 2 nodes (will work just fine on a multicore machine).

In [None]:
print(sc) # it is like a Pool Processor executor

# Create your first RDD

In [None]:
rdd = sc.parallelize(list(range(8))) # create collection

In [None]:
rdd

### Collect

Action / To Driver: Return all items in the RDD to the driver in a single list

![](http://i.imgur.com/DUO6ygB.png)

In [None]:
rdd.collect()  # Gather results back to local process

### Map

Transformation / Narrow: Return a new RDD by applying a function to each element of this RDD

![](http://i.imgur.com/PxNJf0U.png)

In [None]:
rdd.map(lambda x: x ** 2).collect() # Square each element

### Filter

Transformation / Narrow: Return a new RDD containing only the elements that satisfy a predicate

![](http://i.imgur.com/GFyji4U.png)

In [None]:
# Select only the even elements
rdd.filter(lambda x: x % 2 == 0).collect()

### FlatMap

Transformation / Narrow: Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results

![](http://i.imgur.com/TsSUex8.png)

In [None]:
rdd = sc.parallelize([1,2,3])
rdd.flatMap(lambda x: (x, x*100, 42)).collect()

### GroupBy

Transformation / Wide: Group the data in the original RDD. Create pairs where the key is the output of a user function, and the value is all items for which the function yields this key.

![](http://i.imgur.com/gdj0Ey8.png)

In [None]:
rdd = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
rdd = rdd.groupBy(lambda w: w[0])
[(k, list(v)) for (k, v) in rdd.collect()]

### GroupByKey

Transformation / Wide: Group the values for each key in the original RDD. Create a new pair where the original key corresponds to this collected group of values.

![](http://i.imgur.com/TlWRGr2.png)

In [None]:
rdd = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
rdd = rdd.groupByKey()
[(j[0], list(j[1])) for j in rdd.collect()]

### Join

Transformation / Wide: Return a new RDD containing all pairs of elements having the same key in the original RDDs

![](http://i.imgur.com/YXL42Nl.png)

In [None]:
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)])
x.join(y).collect()

### Distinct

Transformation / Wide: Return a new RDD containing distinct items from the original RDD (omitting all duplicates)

![](http://i.imgur.com/Vqgy2a4.png)

In [None]:
rdd = sc.parallelize([1,2,3,3,4])
rdd.distinct().collect()

### KeyBy

Transformation / Narrow: Create a Pair RDD, forming one pair for each item in the original RDD. The pair’s key is calculated from the value via a user-supplied function.

![](http://i.imgur.com/nqYhDW5.png)

In [None]:
rdd = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
rdd.keyBy(lambda w: w[0]).collect()

## Actions

### Map-Reduce operation 

Action / To Driver: Aggregate all the elements of the RDD by applying a user function pairwise to elements and partial results, and return a result to the driver

![](http://i.imgur.com/R72uzwX.png)

In [None]:
from operator import add
rdd = sc.parallelize(list(range(8)))
rdd.map(lambda x: x ** 2).reduce(add) # reduce is an action!

### Max, Min, Sum, Mean, Variance, Stdev

Action / To Driver: Compute the respective function (maximum value, minimum value, sum, mean, variance, or standard deviation) from a numeric RDD

![](http://i.imgur.com/HUCtib1.png)

### CountByKey

Action / To Driver: Return a map of keys and counts of their occurrences in the RDD

![](http://i.imgur.com/jvQTGv6.png)

In [None]:
rdd = sc.parallelize([('J', 'James'), ('F','Fred'), 
                    ('A','Anna'), ('J','John')])

rdd.countByKey()

In [None]:
# Stop the local spark cluster
sc.stop()

### Exercise 10.1 Word-count in Apache Spark

- Write the sample text file

In [None]:
from lorem import text
with open('sample.txt','w') as f:
    f.write(text())


- Create the rdd with `SparkContext.textFile method`
- lower, remove dots and split using `rdd.flatMap`
- use `rdd.map` to create the list of key/value pair (word, 1)
- `rdd.reduceByKey` to get all occurences
- `rdd.takeOrdered`to get sorted frequencies of words

All documentation is available [here](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html?highlight=textfile#pyspark.SparkContext) for textFile and [here](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html?highlight=textfile#pyspark.RDD) for RDD. 

For a global overview see the Transformations section of the [programming guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

In [None]:
import pyspark

sc = pyspark.SparkContext(master="local[4]", appName="wordcount")

In [None]:
# ... code here ...
#
#

In [None]:
sc.stop()

# SparkSession

Since SPARK 2.0.0,  SparkSession provides a single point 
of entry to interact with Spark functionality and
allows programming Spark with DataFrame and Dataset APIs. 

### $\pi$ computation example

- We can estimate an approximate value for $\pi$ using the following Monte-Carlo method:

1.    Inscribe a circle in a square
2.    Randomly generate points in the square
3.    Determine the number of points in the square that are also in the circle
4.    Let $r$ be the number of points in the circle divided by the number of points in the square, then $\pi \approx 4 r$.
    
- Note that the more points generated, the better the approximation

See [this tutorial](https://computing.llnl.gov/tutorials/parallel_comp/#ExamplesPI).

In [None]:
import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

spark = (SparkSession.builder.master("local[4]")
         .appName("PythonPi")
         .config("spark.executor.instances", "4")
         .getOrCreate())

partitions = 8
n = 100000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0

count = spark.sparkContext.parallelize(range(1, n+1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

spark.stop()

### Exercise 9.2

Using the same method than the PI computation example, compute the integral
$$
I = \int_0^1 \exp(-x^2) dx
$$
You can check your result with numpy

In [None]:
# numpy evaluates solution using numeric computation. 
# It uses discrete values of the function
import numpy as np
x = np.linspace(0,1,1000)
np.trapz(np.exp(-x*x),x)

In [None]:
# numpy and scipy evaluates solution using numeric computation. It uses discrete values
# of the function
import numpy as np
from scipy.integrate import quad
quad(lambda x: np.exp(-x*x), 0, 1)
# note: the solution returned is complex 

### Correlation between daily stock

- Data preparation

In [2]:
import os  # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives

def extract_data(name, where):
    datadir = os.path.join(where,name)
    if not os.path.exists(datadir):
       print("Extracting data...")
       tar_path = os.path.join(where, name+'.tgz')
       with tarfile.open(tar_path, mode='r:gz') as data:
          data.extractall(where)
            
extract_data('daily-stock','../data') # this function call will extract json files

In [3]:
import json
import pandas as pd
import os, glob

here = os.getcwd()
datadir = os.path.join(here,'..','data','daily-stock')
filenames = sorted(glob.glob(os.path.join(datadir, '*.json')))
filenames

['/home/navaro_p/big-data/notebooks/../data/daily-stock/aet.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/afl.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/aig.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/al.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/amgn.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/avy.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/b.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/bwa.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/ge.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/hal.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/hp.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/hpq.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/ibm.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/jbl.json',
 '/home/navaro_p/big-data/notebooks/../data/daily-stock/jpm.json',

In [4]:
from glob import glob
import os, json
import pandas as pd

for fn in filenames:
    with open(fn) as f:
        data = [json.loads(line) for line in f]
        
    df = pd.DataFrame(data)
    
    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')
    print("Finished : %s" % out_filename.split(os.path.sep)[-1])

filenames = sorted(glob(os.path.join('..','data', 'daily-stock', '*.h5')))  # ../data/json/*.json

Finished : aet.h5
Finished : afl.h5
Finished : aig.h5
Finished : al.h5
Finished : amgn.h5
Finished : avy.h5
Finished : b.h5
Finished : bwa.h5
Finished : ge.h5
Finished : hal.h5
Finished : hp.h5
Finished : hpq.h5
Finished : ibm.h5
Finished : jbl.h5
Finished : jpm.h5
Finished : luv.h5
Finished : met.h5
Finished : pcg.h5
Finished : tgt.h5
Finished : usb.h5
Finished : xom.h5


### Sequential code

In [None]:
%%time

series = []
for fn in filenames:   # Simple map over filenames
    series.append(pd.read_hdf(fn)['close'])

results = []

for a in series:    # Doubly nested loop over the same collection
    for b in series:  
        if not (a == b).all():     # Filter out comparisons of the same series 
            results.append(a.corr(b))  # Apply function

result = max(results)

### Exercise 9.3

Parallelize the code above with Apache Spark.

- Change the filenames because of the Hadoop environment.

In [1]:
import os, glob

here = os.getcwd()

filenames = sorted(glob.glob(os.path.join(here,'..','data', 'daily-stock', '*.h5')))
filenames = [ "file://"+filename for filename in filenames]
filenames

['file:///home/navaro_p/big-data/notebooks/../data/daily-stock/aet.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/afl.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/aig.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/al.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/amgn.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/avy.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/b.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/bwa.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/ge.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/hal.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/hp.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/hpq.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/ibm.h5',
 'file:///home/navaro_p/big-data/notebooks/../data/daily-stock/jbl.h

Start the PySpark context

In [2]:
import findspark, pyspark

findspark.init(spark_home="/export/spark-2.3.1-bin-hadoop2.7/")

sc = pyspark.SparkContext(master="local[4]", appName="DailyStock")

In [4]:
### Parallel code
import pandas as pd

rdd = sc.parallelize(filenames)
series = rdd.map(lambda fn: pd.read_hdf(fn)['close'])

corr = (series.cartesian(series)
              .filter(lambda ab: not (ab[0] == ab[1]).all())
              .map(lambda ab: ab[0].corr(ab[1]))
              .max())

corr

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 9, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/export/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/export/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/export/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/export/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-4-96802513f48c>", line 5, in <lambda>
  File "/usr/local/anaconda/envs/big-data/lib/python3.6/site-packages/pandas/io/pytables.py", line 371, in read_hdf
    'File %s does not exist' % path_or_buf)
FileNotFoundError: File file:///home/navaro_p/big-data/notebooks/../data/daily-stock/aet.h5 does not exist

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/export/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/export/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/export/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/export/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-4-96802513f48c>", line 5, in <lambda>
  File "/usr/local/anaconda/envs/big-data/lib/python3.6/site-packages/pandas/io/pytables.py", line 371, in read_hdf
    'File %s does not exist' % path_or_buf)
FileNotFoundError: File file:///home/navaro_p/big-data/notebooks/../data/daily-stock/aet.h5 does not exist

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


In [15]:
sc.stop

<bound method SparkContext.stop of <SparkContext master=local[4] appName=DailyStock>>

Computation time is slower because there is a lot of setup, workers creation, there is a lot of communications the correlation function is too small

### Exercise 9.4 Fasta file example

Use this RDD to calculate the GC content of fasta file nucleotide-sample.txt:

$$\cfrac{G+C}{A+T+G+C}\times100%$$

Create a rdd from fasta file nucleotide-sample.txt in data directory and count 'G' and 'C' then divide by the total number of bases.