# Spark Operation
 
The first step is to initiate Spark using `SparkContext` and `SparkConf`. The configuration allows to give parameter to the job. There are 3 parameters you will always need:
     * Master node
     * Application name
     * JVM configurations (such as set memory size for workers)
 

In [1]:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

conf = SparkConf().setMaster("local").setAppName("Spark Operation")
sc = SparkContext(conf=conf)

Loading Data
--
In order to load data, `SparkContext` provides the following methods:
    * textFile(pathToFile)
    * parallelize(collection)

Most of the time, `parallelize` is used only for debugging. `textFile` can load files from local system, from HDFS and from S3.

In this notebook, data will be loaded using `parallelize`.
In the snippet below, the action `first()` returns the first element of the RDD.

In [2]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.first()

1

Transformation
--

Transformations return another RDD from the first one. Actions compute a result based from an RDD.

Transformations are _lazy_. This means that when you call a transformation, nothing will happen until an action is performed. 

In [None]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.filter(lambda x: x % 2 == 0) # Nothing actually happens

So nothing happened. To illustrate transformation, the action `collect` will be used. 
`collect` returns the RDD as a list

Filter
--

`filter` takes a predicate and return an RDD with all elements matching the predicate.

In [3]:
# collect
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.collect()

rdd.filter(lambda x: x % 2 == 0).collect()

[2, 4, 6, 8, 10]

Map
--
`map` transform RDD's elements to another RDD.

In [5]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.map(lambda n: "%s element" % str(n)).collect()

['1 element',
 '2 element',
 '3 element',
 '4 element',
 '5 element',
 '6 element',
 '7 element',
 '8 element',
 '9 element',
 '10 element']

In [6]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.map(lambda n: n * 2).collect()

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

FlatMap
--
`flatMap` transform and flatten an RDD with a function 

In [8]:
rdd = sc.parallelize([1,2,3])
rdd.flatMap(lambda n: [n, n * 2, n * 3]).collect()

[1, 2, 3, 2, 4, 6, 3, 6, 9]

Distinct
---
Remove duplicates

In [10]:
rdd = sc.parallelize(["bien", "ou", "bien", "?"])
rdd.distinct().collect()

['bien', 'ou', '?']

Sample
---
Provides a data sample from the RDD.
This method has 2 arguments:
    - with replacement or not
    - fraction

In [13]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.sample(False, 0.5).collect()

[3, 4, 9, 10]

Transformation between RDD
---
These transformation allows the user to play with sets.
The parameter of all these methods is another RDD.

Union
---
Produce an RDD containing elements of both RDD


In [14]:
rdd_a = sc.parallelize([1,2,3])
rdd_b = sc.parallelize([3,4,5,6])
rdd_a.union(rdd_b).collect()

[1, 2, 3, 3, 4, 5, 6]

Intersection
---
Produce an RDD containing elements found in both RDD


In [15]:
rdd_a = sc.parallelize([1,2,3])
rdd_b = sc.parallelize([3,4,5,6])
rdd_a.intersection(rdd_b).collect()

[3]

Sustract
---
Produce an RDD that remove all elements of one RDD

In [16]:
rdd_a = sc.parallelize([1,2,3,4])
rdd_b = sc.parallelize([3,4,5,6])
rdd_a.subtract(rdd_b).collect()

[2, 1]

Cartesian
---
Cartesian product with another RDD

In [18]:
rdd_a = sc.parallelize([1,2,3])
rdd_b = sc.parallelize([3,4,5])
rdd_a.cartesian(rdd_b).collect()

[(1, 3), (1, 4), (1, 5), (2, 3), (2, 4), (2, 5), (3, 3), (3, 4), (3, 5)]

Action
---
The only action used until now is `collect`. 

Reduce
---
The most common action is `reduce`. 
`reduce` takes 2 elements of the RDD and return only one of the same type.


In [19]:
rdd = sc.parallelize([1,2,3,4])
rdd.reduce(lambda a, b: a + b)

10

Fold
---
`fold` is similar to `reduce` but in addition, a _zero value_ must be provided. In mathematics, this _zero value_ is called identity element. 

e.g: 
    *  +, identity element 0
    *  x, identity element 1
    *  collections, identity element empty collections

NB: The identity element is applied foreach partition in parallel.


In [20]:
rdd = sc.parallelize([1,2,3,4])
rdd.fold(0, lambda a, b: a + b)

10

In [22]:
rdd = sc.parallelize([1,2,3,4])
rdd.fold(1, lambda a, b: a * b)

24

Aggregate
---
`fold` and `reduce` always return the same type. `aggregate` combines and reduces.
The signature of aggregate:
    1. The identity element
    2. The operation to apply for each record
    3. The combine function is applied for each partition as local result at first, then for the global result to combine result for all partitions.
    
NB: The second argument in `parallelize` is the number of partition. 

In [10]:
rdd = sc.parallelize([1,2,3,4], 2)
rdd.aggregate((0, 0), \
              lambda local_result, current_value: (local_result[0] + current_value, local_result[1] + 1), \
              lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))

(10, 4)

First
---
`first` returns the first element of a RDD

In [11]:
rdd = sc.parallelize([4,3,2,1])
rdd.first()

4

Count
---
`count` returns the RDD's size

In [12]:
rdd = sc.parallelize([1,2,3,4,5,6])
rdd.count()

6

Take & Take Ordered
---
`take` return n elements from a RDD
`takeOrdered` return n element from a RDD based on the provided ordering

In [13]:
rdd = sc.parallelize([1,2,3,4,5,6])
rdd.take(3)

[1, 2, 3]

In [14]:
rdd = sc.parallelize([4,2,1,5,6,3,7])
rdd.takeOrdered(3)

[1, 2, 3]

In [16]:
rdd = sc.parallelize([4,2,1,5,6,3,7])
rdd.takeOrdered(3, key = lambda x: -x)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 16, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/rdd.py", line 362, in func
    return f(iterator)
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/rdd.py", line 1305, in <lambda>
    return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge)
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/heapq.py", line 508, in nsmallest
    result = [(key(elem), i, elem) for i, elem in zip(range(n), it)]
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/heapq.py", line 508, in <listcomp>
    result = [(key(elem), i, elem) for i, elem in zip(range(n), it)]
TypeError: 'int' object is not callable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/rdd.py", line 362, in func
    return f(iterator)
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/site-packages/pyspark/rdd.py", line 1305, in <lambda>
    return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge)
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/heapq.py", line 508, in nsmallest
    result = [(key(elem), i, elem) for i, elem in zip(range(n), it)]
  File "/home/cyril/Documents/Dev/anaconda3/envs/spark/lib/python3.6/heapq.py", line 508, in <listcomp>
    result = [(key(elem), i, elem) for i, elem in zip(range(n), it)]
TypeError: 'int' object is not callable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
