# Welcome to Apache Spark

![](images/spark-logo-trademark.png)

# Architecture

A Spark program consists of a <span class="text-primary">driver application</span> and <span class="text-success">worker programs

![](images/cluster-overview.png)

* Worker nodes run on different machines in a cluster, or in local threads.
* Data is distributed among workers.

## Spark Context

The `SparkContext` contains all of the necessary info on the cluster to run Spark code.

In [2]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName('lecture-lyon2').setMaster('local[*]')
sc = SparkContext.getOrCreate(conf=conf)

sc

# Resilient Distributed Dataset

A partitioned collection of objects spread accross a cluster, stored in memory or on disk.

![](images/spark-rdd.png)

3 ways of creating a RDD

* by parallelizing an existing collection

In [6]:
rdd = sc.parallelize(range(10))
rdd

PythonRDD[10] at RDD at PythonRDD.scala:48

3 ways of creating a RDD

* from files in a storage system

In [12]:
titanic = sc.textFile('data/titanic.csv')
titanic

data/titanic.csv MapPartitionsRDD[15] at textFile at <unknown>:0

3 ways of creating a RDD

* by transforming another RDD

In [13]:
rdd.map(lambda number: number * 2)

PythonRDD[16] at RDD at PythonRDD.scala:48

## Working with RDDs

Let's create a RDD from a list of numbers, and play with it.

In [11]:
rdd = sc.parallelize(range(12), 4)
rdd.cache()

PythonRDD[1] at RDD at PythonRDD.scala:48

<h2><span class="text-danger">Remember !</span></h2>

* A RDD is immutable
* A RDD is evaluated lazily
* Only tracks its lineage so it can reconstruct itself

In [14]:
print(rdd)              # prints only info on RDD, no evaluation
print(rdd.take(3))      # specific methods to gather data back to driver
print(rdd.map(lambda num: num + 1).toDebugString())  # check RDD lineage

PythonRDD[3] at RDD at PythonRDD.scala:48
[0, 1, 2]
b'(4) PythonRDD[11] at RDD at PythonRDD.scala:48 []\n |  PythonRDD[3] at RDD at PythonRDD.scala:48 []\n |      CachedPartitions: 1; MemorySize: 134.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B\n |  ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:480 []'


## Spark operations

Come in two types : transformations / actions

* Transformations are lazy _(not computed immediately)_
* Only an action on a RDD will trigger the execution of all subsequent transformations.

![](images/spark-operations.png)

## Transformations

Transformations shape your dataset

### Filter

Return a new RDD containing only the elements that satisfy a predicate.

In [12]:
rdd

PythonRDD[1] at RDD at PythonRDD.scala:48

### Map

Return a new RDD by applying a function to each element of this RDD.

In [16]:
rdd

PythonRDD[3] at RDD at PythonRDD.scala:48

### FlatMap

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

In [25]:
rdd.map(lambda num: range(num)).take(3)

[range(0, 0), range(0, 1), range(0, 2)]

### Distinct

Return a new RDD containing the distinct elements in this RDD.

In [21]:
rdd.map(lambda num: 0 if num % 2 == 0 else 1).distinct().collect()

[0, 1]

## Actions

Actions execute the task and associated transformations

### Collect / take

Return a list that contains all of the elements in this RDD.

Note This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory

In [27]:
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

### Count

Return the number of elements in this RDD.

In [3]:
rdd.count()

12

### Reduce

Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.

In [4]:
rdd.reduce(lambda x,y : x + y)

66

## Key-value transformations

* Key/value RDDs are commonly used to perform aggregations, and often we will do some initial ETL (extract, transform, and load) to get our data into a key/value format. 

* Key/value RDDs expose new operations (e.g., counting up reviews for each product, grouping together data with the same key, and grouping together two different RDDs).

### ReduceByKey

Merge the values for each key using an associative and commutative reduce function.

In [6]:
rdd = sc.parallelize([('a', 1), ('b', 0), ('b', 2), ('a', 5)], 4)
rdd.reduceByKey(lambda x,y : x + y).collect()

[('b', 2), ('a', 6)]

### Join

Return an RDD containing all pairs of elements with matching keys in self and other.

In [9]:
countLetter = sc.parallelize([('a', 1), ('b', 0), ('c', 2), ('a', 5)], 4)
defLetter = sc.parallelize([('a', 'vowel'), ('b', 'consonnant'), ('c', 'consonant'), ('d', 'consonant')], 4)
countLetter.join(defLetter).collect()

[('a', (1, 'vowel')),
 ('a', (5, 'vowel')),
 ('c', (2, 'consonant')),
 ('b', (0, 'consonnant'))]

## Wordcount !

In [13]:
rdd = sc.textFile('data/lorem.txt')
NotImplementedError

NotImplementedError

# RDD conclusion

Resilient Distributed Datasets (RDDs) are a distributed collection of immutable JVM objects that allow you to perform calculations very quickly, and they are the backbone of Apache Spark

In [14]:
sc.stop()

# Combine SQL, streaming, and complex analytics.

Spark powers a stack of libraries including SQL and DataFrames, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these libraries seamlessly in the same application.

<img src="images/spark-stack.png" class="img-responsive center-block">

# SparkSQL

This chapter introduces Spark SQL, Spark’s interface for working with structured and semistructured data.

## SparkSession

The entry point to programming Spark with the Dataset and DataFrame API.

In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName('lecture-lyon2').setMaster('local[*]')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

## Dataframes

Under the hood, a Dataframe is an RDD composed of Row objects with additional schema information of the types in each col‐
umn. Row objects are just wrappers around arrays of basic types.

In [2]:
titanic = spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/titanic.csv')
titanic.createOrReplaceTempView('titanic')
titanic.show(8)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

### Two ways of interacting

* DataFrames provide a domain-specific language for structured data manipulation
* The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.

In [7]:
titanic.filter(titanic.Sex == 'male').select(['Name', 'Sex', 'Survived']).show(3)

spark.sql('SELECT Name, Sex, Survived FROM titanic WHERE Sex = "male"').show(3)

+--------------------+----+--------+
|                Name| Sex|Survived|
+--------------------+----+--------+
|Braund, Mr. Owen ...|male|       0|
|Allen, Mr. Willia...|male|       0|
|    Moran, Mr. James|male|       0|
+--------------------+----+--------+
only showing top 3 rows

+--------------------+----+--------+
|                Name| Sex|Survived|
+--------------------+----+--------+
|Braund, Mr. Owen ...|male|       0|
|Allen, Mr. Willia...|male|       0|
|    Moran, Mr. James|male|       0|
+--------------------+----+--------+
only showing top 3 rows



## Unified data source interaction

Spark provides with a unique interface for reading/saving data, which is then implemented for multiple data storage formats : _json, parquet, jdbc, orc, libsvm, csv, text_.

In [17]:
titanic = spark.read.option('header', 'true').option('inferSchema', 'true').csv('data/titanic.csv')
titanic.show(8)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

## Catalyst optimization

Catalyst is an extensible query optimizer used internally by SparkSQL for planning and defining the execution of SparkSQL queries.

![](images/catalyst.png)

In [3]:
titanic[titanic['Sex'] == 'male'].select(['Name', 'Sex']).explain()

== Physical Plan ==
*Project [Name#15, Sex#16]
+- *Filter (isnotnull(Sex#16) && (Sex#16 = male))
   +- *FileScan csv [Name#15,Sex#16] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/workspaceperso/pyspark-interactive-lecture/notebooks/data/titanic.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Sex), EqualTo(Sex,male)], ReadSchema: struct<Name:string,Sex:string>


# Machine Learning

MLlib is Spark’s machine learning (ML) library. It has an <span class="text-danger">RDD-based API in maintenance mode</span> and a <span class="text-primary">Dataframe-based API</span>.

* Dataframe API = Spark Datasources, SQL/DataFrame queries, Tungsten and Catalyst optimizations, uniform APIs across languages. 
* ML Pipelines are set of high-level APIs on top of DataFrames that help users create and tune practical machine learning pipelines

### Transformers

A Transformer implements a method transform(), which converts one DataFrame into another

In [18]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
titanic_indexed = indexer.fit(titanic).transform(titanic)
titanic_indexed.show(8)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|SexIndex|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|     1.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|     1.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|     1.0|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|     0.0|
|       

### Estimators

An Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer.

In [19]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["SexIndex", "Fare"], outputCol="features")
titanic_train = assembler.transform(titanic_indexed)

rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10)
model = rf.fit(titanic_train)
model.transform(titanic_train).select(["Survived", "prediction", "probability"]).show(8)

+--------+----------+--------------------+
|Survived|prediction|         probability|
+--------+----------+--------------------+
|       0|       0.0|[0.93248794942417...|
|       1|       1.0|[0.13920141262246...|
|       1|       0.0|[0.50999575454479...|
|       1|       1.0|[0.00227272727272...|
|       0|       0.0|[0.84184749682173...|
|       0|       0.0|[0.86187536477318...|
|       0|       0.0|[0.75740407559426...|
|       0|       0.0|[0.77982677472910...|
+--------+----------+--------------------+
only showing top 8 rows



### Pipelines

MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow.

In [20]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[indexer, assembler, rf])
model = pipeline.fit(titanic)
model.transform(titanic).select(["Survived", "prediction", "probability"]).show(8)

+--------+----------+--------------------+
|Survived|prediction|         probability|
+--------+----------+--------------------+
|       0|       0.0|[0.93248794942417...|
|       1|       1.0|[0.13920141262246...|
|       1|       0.0|[0.50999575454479...|
|       1|       1.0|[0.00227272727272...|
|       0|       0.0|[0.84184749682173...|
|       0|       0.0|[0.86187536477318...|
|       0|       0.0|[0.75740407559426...|
|       0|       0.0|[0.77982677472910...|
+--------+----------+--------------------+
only showing top 8 rows



# Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. 

![](images/streaming-flow.png)

In [2]:
from pyspark.streaming import StreamingContext

ssc = StreamingContext(spark.sparkContext, 1)
lines = ssc.socketTextStream('localhost', 9999)
words = lines.flatMap(lambda line: line.split(' '))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()

ssc.start()
ssc.awaitTermination()

Py4JJavaError: An error occurred while calling o36.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\Utilisateurs\a557847\AppData\Local\Continuum\anaconda3\envs\pyspark-interactive-lecture\lib\site-packages\pyspark\streaming\util.py", line 65, in call
    r = self.func(t, *rdds)
  File "C:\Utilisateurs\a557847\AppData\Local\Continuum\anaconda3\envs\pyspark-interactive-lecture\lib\site-packages\pyspark\streaming\dstream.py", line 171, in takeAndPrint
    taken = rdd.take(num + 1)
  File "C:\Utilisateurs\a557847\AppData\Local\Continuum\anaconda3\envs\pyspark-interactive-lecture\lib\site-packages\pyspark\rdd.py", line 1343, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "C:\Utilisateurs\a557847\AppData\Local\Continuum\anaconda3\envs\pyspark-interactive-lecture\lib\site-packages\pyspark\context.py", line 992, in runJob
    port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "C:\Utilisateurs\a557847\AppData\Local\Continuum\anaconda3\envs\pyspark-interactive-lecture\lib\site-packages\py4j\java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Utilisateurs\a557847\AppData\Local\Continuum\anaconda3\envs\pyspark-interactive-lecture\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Utilisateurs\a557847\AppData\Local\Continuum\anaconda3\envs\pyspark-interactive-lecture\lib\site-packages\py4j\protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 1, localhost, executor driver): org.apache.spark.SparkException: Python worker did not connect back in time
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133)
	... 15 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker did not connect back in time
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133)
	... 15 more


	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)


# GraphX

To support graph computation, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. 

# Unified engine

Spark's main contribution is to enable previously disparate cluster workloads to be composed

# Conclusion

In [2]:
spark.stop()

## That's it folks