# <div align=center>Introduction to Machine Learning and Data Analysis with Spark</div>

## Spark

Spark is an open-source framework for fast and scalable data processing. It has built-in modules and libraries for machine learning, SQL and graph processing. Spark has high-level API in Python, Scala, Java and R. A typical Spark deployment has multiple nodes. A Spark cluster can be set up using three cluster management technologies:

1. Spark standalone
2. YARN (used by the Hadoop ecosystem)
3. Mesos

This example uses a Spark standalone cluster running 1 master and 6 slave nodes. It has 24 cores and 17.2 GB of usable memory for processing.

![](cluster-overview.png)

---

## HDFS (Hadoop Distributed File System)

For storage, this Spark cluster uses HDFS, a scalable and fault-tolerant distributed file system that's used extensively in Hadoop applications. HDFS partitions files into blocks of fixed size (usually 128 or 256 MB) and replicates them across the cluster for high availability. Files can be put in HDFS through the command line or code. To read files, we can use the HDFS URI for that cluster, followed by the file path: *hdfs://hdfs-master-ip:port/path-to-file/*

The HDFS cluster in this example is running 1 master *(NameNode)* and 3 slave nodes *(DataNodes)*.

![](hdfsarchitecture.gif)

---

## Communicating with the Spark cluster

A Spark job is handled by a *SparkContext*, which is the mode of communication between the driver process (client) and the Spark executors (workers). A SparkContext requests resources from the Spark cluster and specifies the URI of the Spark cluster along with parameters such as number of cores for the job. The SparkContext can be specified as follows:

In [18]:
import os, sys
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")

sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

import pyspark
from pyspark import SparkContext, SparkConf

# stop the sparkcontext if it's already running
try:
    sc
except:
    pass
else:
    sc.stop()

conf = SparkConf().setMaster("spark://10.0.3.70:7077").setAppName("Intro to Spark").set("spark.driver.port", 8200).set("spark.cores.max", 10)
sc = SparkContext(conf=conf)
sc

<pyspark.context.SparkContext at 0x7f1091a8ba50>

---
## Working with data

A *Resilient Distributed Dataset (RDD)* is the basic data abstraction in Spark. It represents a collection of data elements that can be operated upon. We can parallize a Python collection to form an RDD which can be operated upon in parallel across the Spark cluster:

In [19]:
test_list = range(100)
print(test_list)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]


---
To distribute a collection and form an RDD, we can use the *parallelize()* method.

In [20]:
num_slices = 40
test_RDD = sc.parallelize(test_list, num_slices)

---
We can apply transformations to this RDD:

In [21]:
new_RDD = test_RDD.map(lambda x: x*5)
print(new_RDD.collect())

[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145, 150, 155, 160, 165, 170, 175, 180, 185, 190, 195, 200, 205, 210, 215, 220, 225, 230, 235, 240, 245, 250, 255, 260, 265, 270, 275, 280, 285, 290, 295, 300, 305, 310, 315, 320, 325, 330, 335, 340, 345, 350, 355, 360, 365, 370, 375, 380, 385, 390, 395, 400, 405, 410, 415, 420, 425, 430, 435, 440, 445, 450, 455, 460, 465, 470, 475, 480, 485, 490, 495]


---
We can read a file from HDFS with the *textFile()* method by supplying the HDFS URI of the file. The textFile() method returns the file as an RDD of strings.

In [23]:
# Flight data from http://stat-computing.org/dataexpo/2009/
text_file = sc.textFile('hdfs://10.0.3.113:9000/home/ubuntu/data/2008.csv')

# Year, Month, DayofMonth, DayOfWeek, DepartureTime, ArrivalDelay, DepartureDelay, Origin, Dest, Distance
columns = text_file.map(lambda l: l.split(",")) \
            .map(lambda p: (p[0], p[1], p[2], p[3], p[4], p[14], p[15], p[16], p[17], p[18])) \
            .filter(lambda line: 'NA' not in line) \
            .filter(lambda line: 'Year' not in line)

arrival_delays = columns.map(lambda p: int(p[5]))
depart_delays = columns.map(lambda p: int(p[6]))
distances = columns.map(lambda p: int(p[9]))

arrival_delays.count()

6855029

---

We can calculate correlations between variables using the *Statistics* module in *MLlib* (Spark's machine learning library).

In [24]:
from pyspark.mllib.stat import Statistics
corr1 = Statistics.corr(arrival_delays, depart_delays)
corr1

0.931390780111016

---
## Spark Streaming

Spark Streaming is an extension of the Spark platform that allows stream processing of live data. Data can be ingested from sources such as HDFS, Kafka and Twitter, and can be processed using Spark's libraries. The output can be written to filesystems (such as HDFS) and databases (such as HBase).

![](streaming-arch.png)

---
## Spark Streaming Example

This example counts words from a server listening on a TCP socket.

In [None]:
# Create a StreamingContext using the SparkContext    
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
from pprint import pprint
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

---
To release the Spark cluster resources when we're done, we should stop the SparkContext:

In [25]:
sc.stop()

---
## References

1. Professor Brunner's notebook: https://github.com/ProfessorBrunner/rp-pdm15/blob/master/Week2/intro2spark.ipynb
2. Spark: http://spark.apache.org/
3. HDFS: https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html
4. Spark Streaming: http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-concepts