<DIV ALIGN=CENTER>

# Introduction to Spark
## Professor Robert J. Brunner
  
</DIV>  
-----
-----

## Introduction

Previously in this course, we have discussed doing data science at the
Unix command line, and with Python, primarily by using Pandas. We also
have discussed other Python libraries that bring new functionalities to
the Python data science stack. Other _big data_ technologies, however,
also exist and can be relevant to particular data science
investigations, depending on the scale of data. Of these other
technologies, one of the most promising is [**Spark**][sp].

Spark is a cluster computing system that leverages [Hadoop][sh] technologies like [HDFS][shdfs] for high perofmance storage and [Yarn][sy] for cluster management. While some may see Spark as a replacement for Hadoop, an alternative agrument can be made that [Spark is simply another compute engine][sce] for Hadoop, in addition to Map-Reduce.

In this IPython Notebook, we explore using Spark to perform data
processing in a similar maner to our previous efforts with Pandas. For
this we will use the airline data, which has been stored in an HDFS
system that is accesible from within our Spark cluster.

https://github.com/deanwampler/spark-workshop/tree/master/tutorial

-----
[sp]: http://spark.apache.org
[sh]: http://hadoop.apache.org
[sy]: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html
[shdfs]: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html
[sce]: http://techcrunch.com/2015/07/12/spark-and-hadoop-are-friends-not-foes/

### Initialization

In this class, we have a dedicated Spark cluster running to allow
students to explore Spark from within our IPython Notebook environment.
Since our Spark cluster has limited resources, we need to
carefully manage them, in particular we need to ensure that any
SparkContext previously used by this Jupyter Server is properly released
before starting a new one. After this, we will initialize a new
SparkContext to properly interact from this dockerized IPython Notebook
to the Spark cluster.

----- 

In [1]:
# We release the SparkContext if it exists.
try:
    sc
except:
    pass ;
else:
    sc.stop()

# Now handle initial import statements

from os import environ
from pyspark import SparkConf, SparkContext

# Obtain initial environment variables.

port_usage = '''
To use our Spark cluster, we must manage access by nodes in the
JupyterHub Server to the Spark cluster. This may require modification to
the port numbers used by the SparkContext to communicate with the Spark
cluster. We first try to automatically set the port numbers, and display
the allowed range for this particular Notebook in order to allow you to
modify them manually as necessary. The two port values to set are 
spark.driver.port and spark.blockManager.port.

Allowed port range: {0}-{1}
'''

bsp = environ['SPARK_PORT_BEGIN']
esp = environ['SPARK_PORT_END']
print(port_usage.format(bsp, esp))

slip = environ['HOSTNAME'] # Spark Local IP
sdh = "rpds{}".format(slip.split('.')[-1]) # Spark Driver Hostname

# Create new Spark Configuration (port numbers might need to be adjusted from defaults.)
myconf = SparkConf()
myconf.set("spark.driver.port", int(bsp))
myconf.set("spark.blockManager.port", int(esp))
myconf.set("spark.driver.host", sdh)
myconf.set("spark.local.ip", slip)
myconf.set("spark.cores.max",2)

myconf.setAppName("Practical Data Mining, UI RP: Brunner")

# Create and initialize a new Spark Context
sc = SparkContext(conf=myconf)

# Display Spark version information, which also verifies SparkContext is active
print("\nSpark version: {0}".format(sc.version))


To use our Spark cluster, we must manage access by nodes in the
JupyterHub Server to the Spark cluster. This may require modification to
the port numbers used by the SparkContext to communicate with the Spark
cluster. We first try to automatically set the port numbers, and display
the allowed range for this particular Notebook in order to allow you to
modify them manually as necessary. The two port values to set are 
spark.driver.port and spark.blockManager.port.

Allowed port range: 8250-8299


Spark version: 1.4.1


-----

### Using Spark

Discuss the basic idea. Can access data in different formats, basic concetp is RDD. We cn create an RDD from a textfile, in this case from a text file stored in HDFS.

Then do some basic processing.


In [71]:
data = range(50)
print(data)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]


In [72]:
myRDD = sc.parallelize(data, 8)

In [73]:
print("Initial RDD id: {0}".format(myRDD.id()))


Initial RDD id: 45


In [74]:
myRDD.setName("Professor Brunner's RDD")

Brunner's RDD ParallelCollectionRDD[45] at parallelize at PythonRDD.scala:396

In [75]:
print(myRDD.toDebugString())

(8) Brunner's RDD ParallelCollectionRDD[45] at parallelize at PythonRDD.scala:396 []


In [76]:
myaddRDD = myRDD.map(lambda a: a + 1)

In [77]:
print(myaddRDD.toDebugString())

(8) PythonRDD[46] at RDD at PythonRDD.scala:43 []
 |  Brunner's RDD ParallelCollectionRDD[45] at parallelize at PythonRDD.scala:396 []


In [78]:
print(myaddRDD.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]


In [79]:
myfilterRDD = myaddRDD.filter(lambda x: (x % 5) == 0)

In [80]:
myfilterRDD.collect()

[5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

In [81]:
print(myfilterRDD.toDebugString())

(8) PythonRDD[47] at collect at <ipython-input-80-4166111fb352>:1 []
 |  Brunner's RDD ParallelCollectionRDD[45] at parallelize at PythonRDD.scala:396 []


In [82]:
(sc
 .parallelize(data)
 .map(lambda x: x + 1)
 .filter(lambda x: (x % 5) == 0)
 .collect())

[5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

http://nbviewer.ipython.org/github/jdwittenauer/ipython-notebooks/blob/master/Spark-Lab0-Tutorial.ipynb#-(8d)-Readability-and-code-style-

Comments on style

In [108]:
text_file = sc.textFile("hdfs://10.0.3.113:9000/home/ubuntu/data/2001.csv", use_unicode=False)

In [109]:
text_file.count()

5967781

In [110]:
text_file.take(5)

['Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay',
 '2001,1,17,3,1806,1810,1931,1934,US,375,N700\xef\xbf\xbd\xef\xbf\xbd,85,84,60,-3,-4,BWI,CLT,361,5,20,0,NA,0,NA,NA,NA,NA,NA',
 '2001,1,18,4,1805,1810,1938,1934,US,375,N713\xef\xbf\xbd\xef\xbf\xbd,93,84,64,4,-5,BWI,CLT,361,9,20,0,NA,0,NA,NA,NA,NA,NA',
 '2001,1,19,5,1821,1810,1957,1934,US,375,N702\xef\xbf\xbd\xef\xbf\xbd,96,84,80,23,11,BWI,CLT,361,6,10,0,NA,0,NA,NA,NA,NA,NA',
 '2001,1,20,6,1807,1810,1944,1934,US,375,N701\xef\xbf\xbd\xef\xbf\xbd,97,84,66,10,-3,BWI,CLT,361,4,27,0,NA,0,NA,NA,NA,NA,NA']

In [22]:
help(text_file)

Help on RDD in module pyspark.rdd object:

class RDD(__builtin__.object)
 |  A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
 |  Represents an immutable, partitioned collection of elements that can be
 |  operated on in parallel.
 |  
 |  Methods defined here:
 |  
 |  __add__(self, other)
 |      Return the union of this RDD and another one.
 |      
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> (rdd + rdd).collect()
 |      [1, 1, 2, 3, 1, 1, 2, 3]
 |  
 |  __getnewargs__(self)
 |  
 |  __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))
 |  
 |  __repr__(self)
 |  
 |  aggregate(self, zeroValue, seqOp, combOp)
 |      Aggregate the elements of each partition, and then the results for all
 |      the partitions, using a given combine functions and a neutral "zero
 |      value."
 |      
 |      The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
 |      as its result value to avoid object allocat

In [141]:
col_data = (text_file.map(lambda l: l.split(",")) 
            .map(lambda p: (p[0], p[1], p[2], p[4], p[14], p[15], p[16], p[17], p[18]))
            .filter(lambda line: 'Year' not in line)
            )

In [142]:
col_data.first()

('2001', '1', '17', '1806', '-3', '-4', 'BWI', 'CLT', '361')

In [143]:
cols = col_data.filter(lambda line: 'NA' not in line)

In [146]:
cols.count()

5723673

In [147]:
fields = cols.map(lambda p: (int(p[0]), int(p[1]), int(p[2]), int(p[3]),
                          int(p[4]), int(p[5]), p[6], p[7], int(p[8])))

In [148]:
fields.count()

5723673

In [165]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

schemaString = "Year Month DayOfMonth DepTime ArrDelay DepDelay Origin Destination Distance"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

In [166]:
df = sqlContext.createDataFrame(fields, schema)
df

TypeError: StructType(List(StructField(Year,StringType,true),StructField(Month,StringType,true),StructField(DayOfMonth,StringType,true),StructField(DepTime,StringType,true),StructField(ArrDelay,StringType,true),StructField(DepDelay,StringType,true),StructField(Origin,StringType,true),StructField(Destination,StringType,true),StructField(Distance,StringType,true))) can not accept object in type <class 'pyspark.sql.types.StructField'>

In [151]:
df.show(5)

+----+--+--+----+--+--+---+---+---+
|  _1|_2|_3|  _4|_5|_6| _7| _8| _9|
+----+--+--+----+--+--+---+---+---+
|2001| 1|17|1806|-3|-4|BWI|CLT|361|
|2001| 1|18|1805| 4|-5|BWI|CLT|361|
|2001| 1|19|1821|23|11|BWI|CLT|361|
|2001| 1|20|1807|10|-3|BWI|CLT|361|
|2001| 1|21|1810|20| 0|BWI|CLT|361|
+----+--+--+----+--+--+---+---+---+



In [152]:
df.head(4)

[Row(_1=2001, _2=1, _3=17, _4=1806, _5=-3, _6=-4, _7=u'BWI', _8=u'CLT', _9=361),
 Row(_1=2001, _2=1, _3=18, _4=1805, _5=4, _6=-5, _7=u'BWI', _8=u'CLT', _9=361),
 Row(_1=2001, _2=1, _3=19, _4=1821, _5=23, _6=11, _7=u'BWI', _8=u'CLT', _9=361),
 Row(_1=2001, _2=1, _3=20, _4=1807, _5=10, _6=-3, _7=u'BWI', _8=u'CLT', _9=361)]

In [155]:
df.describe().show()

+-------+-------+-----------------+-----------------+------------------+------------------+------------------+-----------------+
|summary|     _1|               _2|               _3|                _4|                _5|                _6|               _9|
+-------+-------+-----------------+-----------------+------------------+------------------+------------------+-----------------+
|  count|5723673|          5723673|          5723673|           5723673|           5723673|           5723673|          5723673|
|   mean| 2001.0|6.291580773394986|15.71320251873229|1348.6880443729053| 5.528248731190619| 8.115271609681406| 735.173682004545|
| stddev|    0.0|3.381754330822876|8.827993155975875|   482.63871515896|31.429288422846703|28.234080794004345|574.8151318384248|
|    min|   2001|                1|                1|                 1|             -1116|              -204|               21|
|    max|   2001|               12|               31|              2400|              1688|      

In [156]:
df.schema

StructType(List(StructField(_1,LongType,true),StructField(_2,LongType,true),StructField(_3,LongType,true),StructField(_4,LongType,true),StructField(_5,LongType,true),StructField(_6,LongType,true),StructField(_7,StringType,true),StructField(_8,StringType,true),StructField(_9,LongType,true)))

In [159]:
df._1

Column<_1>

-----

### Bulk File Processing

Lets Do bulk file processing. We read in all data files


In [17]:
filename = 'hdfs://10.0.3.113:9000/home/ubuntu/data/*'

flight_files = sc.wholeTextFiles(filename)

print(flight_files.count())

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 7.0 failed 4 times, most recent failure: Lost task 1.3 in stage 7.0 (TID 346, 10.0.3.105): ExecutorLostFailure (executor 28 lost)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


In [14]:
count = len(flight_files.collect())

print("Number of flight data files: {0}".format(count))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5.0 (TID 330, 10.0.3.116): ExecutorLostFailure (executor 13 lost)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


-----
## Breakout Session

During this breakout, you should work with the previous Spark examples
in order to better learn how Spark works, and how it is different than
pure Python approaches like Pandas. Specific problems you can attempt
include the following:

1. Change the `myRDD` example to start with all integers from 0 to 199.
Use an appropriate lambda function to convert this RDD to a new RDD that
has all odd integers from 1 to 399.

2. Filter the previous RDD to contain only entries that are divisible by
9.

3. Convert this RDD to a Spark DataFrame, specify the column name as
`Numbers`.

4. Add an index column to this Spark DataFrame, which sequentially
increases.

Additional, more advanced problems:

1. Create an RDD containing the 'Year', 'Month', 'DayofMonth', 'dDelay',
and 'Origin' columns for the airline data for all years 1990-2005.

2. Filter this RDD to contain only flight data for flights leaving O'Hare
airport.

3. Implement a linear fit to the airline flight data in this RDD.

-----

-----

### Ending the Spark Session

We must stop the `SparkContext` in order to release resources on the
instructional cluster before existing this Notebook.

-----

In [9]:
sc.stop()

### Additional References

2. [Official Spark Documentation][osd] .
5. [Spark][sn] for Data Science Notebook.
3. [Pandas and Spark][psd1] Comparison.
3. Another [Pandas & Spark ][psd1] Comparison.
8. [IPython Spark][ipys] Docker image to simplify learning.
-----
[osd]: https://spark.apache.org/docs/latest/index.html
[sn]: https://github.com/donnemartin/data-science-ipython-notebooks/blob/master/spark/spark.ipynb
[psd1]: https://github.com/christophebourguignat/notebooks/blob/master/Spark-Pandas-Differences.ipynb
[psd2]: https://lab.getbase.com/pandarize-spark-dataframes/
[ipys]: https://github.com/Lab41/ipython-spark-docker

### Return to the [Week Two](index.ipynb) index.

-----