# ![](https://ga-dash.s3.amazonaws.com/production/assets/logo-9f88ae6c9c3871690e33280fcf557f33.png) Introduction to Spark
Week 11 | Lesson 4.1

### LEARNING OBJECTIVES
*After this lesson, you will be able to:*
- Describe Spark, how it works and its role in a Hadoop ecosystem
- Identify Spark use cases
- Run simple queries in Spark from the command line

### STUDENT PRE-WORK
*Before this lesson, you should already be able to:*
- Perform MapReduce jobs with Hadoop

### LESSON GUIDE
| TIMING  | TYPE  | TOPIC  |
|:-:|---|---|
| 5 min | [Opening](#opening) | Opening |
| 20 min | [Introduction](#introduction) | Intro to Spark |
| 10 min | [Introduction2](#introduction2) | Spark Stack and API |
| 15 min | [Demo](#demo) | Demo: Spark Map Reduce |
| 10 min | [Guided-practice](#guided-practice) | Guided Practice: Spark Map Reduce |
| 15 min | [Ind-practice](#ind-practice) | Independent Practice: Explore the Spark Shell |
| 10 min | [Conclusion](#conclusion) | Conclusion |

<a name="opening"></a>
## Opening (5 min)

Spark, a cluster computing framework, has gained tremendous ground in industry since its widescale release several years ago. It can run on Hadoop as a replacement to MapReduce.

![](./assets/images/indeed-hadoop-spark.png)

**Check:** what's MapReduce?


> It's a framework -- and a specific implementation on Hadoop -- to solve problems that involve parallel calculation. It's composed of 2 steps: a mapper step in which multiple workers solve the same task on different parts of the dataset and a reducer phase where the results of each workers' work are combined to give a final result.



**Check:** what limitations have you encountered when processing data with Hadoop and MapReduce?


> Mainly performance. It takes a long time to perform (and sometimes to write) a MapReduce job, which makes it really hard to experiment and iterate.

### K-means in Java on Hadoop

```java
/*
 * @author Himank Chaudhary
 */

import java.io.IOException;
import java.util.*;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.Reducer;

@SuppressWarnings("deprecation")
public class KMeans {
	public static String OUT = "outfile";
	public static String IN = "inputlarger";
	public static String CENTROID_FILE_NAME = "/centroid.txt";
	public static String OUTPUT_FILE_NAME = "/part-00000";
	public static String DATA_FILE_NAME = "/data.txt";
	public static String JOB_NAME = "KMeans";
	public static String SPLITTER = "\t| ";
	public static List<Double> mCenters = new ArrayList<Double>();

	/*
	 * In Mapper class we are overriding configure function. In this we are
	 * reading file from Distributed Cache and then storing that into instance
	 * variable "mCenters"
	 */
	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, DoubleWritable, DoubleWritable> {
		@Override
		public void configure(JobConf job) {
			try {
				// Fetch the file from Distributed Cache Read it and store the
				// centroid in the ArrayList
				Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job);
				if (cacheFiles != null && cacheFiles.length > 0) {
					String line;
					mCenters.clear();
					BufferedReader cacheReader = new BufferedReader(
							new FileReader(cacheFiles[0].toString()));
					try {
						// Read the file split by the splitter and store it in
						// the list
						while ((line = cacheReader.readLine()) != null) {
							String[] temp = line.split(SPLITTER);
							mCenters.add(Double.parseDouble(temp[0]));
						}
					} finally {
						cacheReader.close();
					}
				}
			} catch (IOException e) {
				System.err.println("Exception reading DistribtuedCache: " + e);
			}
		}

		/*
		 * Map function will find the minimum center of the point and emit it to
		 * the reducer
		 */
		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<DoubleWritable, DoubleWritable> output,
				Reporter reporter) throws IOException {
			String line = value.toString();
			double point = Double.parseDouble(line);
			double min1, min2 = Double.MAX_VALUE, nearest_center = mCenters
					.get(0);
			// Find the minimum center from a point
			for (double c : mCenters) {
				min1 = c - point;
				if (Math.abs(min1) < Math.abs(min2)) {
					nearest_center = c;
					min2 = min1;
				}
			}
			// Emit the nearest center and the point
			output.collect(new DoubleWritable(nearest_center),
					new DoubleWritable(point));
		}
	}

	public static class Reduce extends MapReduceBase implements
			Reducer<DoubleWritable, DoubleWritable, DoubleWritable, Text> {

		/*
		 * Reduce function will emit all the points to that center and calculate
		 * the next center for these points
		 */
		@Override
		public void reduce(DoubleWritable key, Iterator<DoubleWritable> values,
				OutputCollector<DoubleWritable, Text> output, Reporter reporter)
				throws IOException {
			double newCenter;
			double sum = 0;
			int no_elements = 0;
			String points = "";
			while (values.hasNext()) {
				double d = values.next().get();
				points = points + " " + Double.toString(d);
				sum = sum + d;
				++no_elements;
			}

			// We have new center now
			newCenter = sum / no_elements;

			// Emit new center and point
			output.collect(new DoubleWritable(newCenter), new Text(points));
		}
	}

	public static void main(String[] args) throws Exception {
		run(args);
	}

	public static void run(String[] args) throws Exception {
		IN = args[0];
		OUT = args[1];
		String input = IN;
		String output = OUT + System.nanoTime();
		String again_input = output;

		// Reiterating till the convergence
		int iteration = 0;
		boolean isdone = false;
		while (isdone == false) {
			JobConf conf = new JobConf(KMeans.class);
			if (iteration == 0) {
				Path hdfsPath = new Path(input + CENTROID_FILE_NAME);
				// upload the file to hdfs. Overwrite any existing copy.
				DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
			} else {
				Path hdfsPath = new Path(again_input + OUTPUT_FIE_NAME);
				// upload the file to hdfs. Overwrite any existing copy.
				DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
			}

			conf.setJobName(JOB_NAME);
			conf.setMapOutputKeyClass(DoubleWritable.class);
			conf.setMapOutputValueClass(DoubleWritable.class);
			conf.setOutputKeyClass(DoubleWritable.class);
			conf.setOutputValueClass(Text.class);
			conf.setMapperClass(Map.class);
			conf.setReducerClass(Reduce.class);
			conf.setInputFormat(TextInputFormat.class);
			conf.setOutputFormat(TextOutputFormat.class);

			FileInputFormat.setInputPaths(conf,
					new Path(input + DATA_FILE_NAME));
			FileOutputFormat.setOutputPath(conf, new Path(output));

			JobClient.runJob(conf);

			Path ofile = new Path(output + OUTPUT_FIE_NAME);
			FileSystem fs = FileSystem.get(new Configuration());
			BufferedReader br = new BufferedReader(new InputStreamReader(
					fs.open(ofile)));
			List<Double> centers_next = new ArrayList<Double>();
			String line = br.readLine();
			while (line != null) {
				String[] sp = line.split("\t| ");
				double c = Double.parseDouble(sp[0]);
				centers_next.add(c);
				line = br.readLine();
			}
			br.close();

			String prev;
			if (iteration == 0) {
				prev = input + CENTROID_FILE_NAME;
			} else {
				prev = again_input + OUTPUT_FILE_NAME;
			}
			Path prevfile = new Path(prev);
			FileSystem fs1 = FileSystem.get(new Configuration());
			BufferedReader br1 = new BufferedReader(new InputStreamReader(
					fs1.open(prevfile)));
			List<Double> centers_prev = new ArrayList<Double>();
			String l = br1.readLine();
			while (l != null) {
				String[] sp1 = l.split(SPLITTER);
				double d = Double.parseDouble(sp1[0]);
				centers_prev.add(d);
				l = br1.readLine();
			}
			br1.close();

			// Sort the old centroid and new centroid and check for convergence
			// condition
			Collections.sort(centers_next);
			Collections.sort(centers_prev);

			Iterator<Double> it = centers_prev.iterator();
			for (double d : centers_next) {
				double temp = it.next();
				if (Math.abs(temp - d) <= 0.1) {
					isdone = true;
				} else {
					isdone = false;
					break;
				}
			}
			++iteration;
			again_input = output;
			output = OUT + System.nanoTime();
		}
	}
}
```

### In Python with MRJob

```python
from mrjob.job import MRJob
import mrjob
# MRJob is a python class which will be overloaded
from math import sqrt


class MRKMeans(MRJob):

	 SORT_VALUES = True
	 OUTPUT_PROTOCOL = mrjob.protocol.RawProtocol
	 
	 def dist_vec(self,v1,v2):
		 #calculate the distance between two vectors (in two dimensions)
		 return sqrt((v2[0]-v1[0])*(v2[0]-v1[0])+(v2[1]-v1[1])*(v2[1]-v1[1]))
	 
	 def configure_options(self):
		 super(MRKMeans, self).configure_options()
		 #the line below define that the file folowing the --c option is the centroid and is loadable
		 self.add_file_option('--c')

	 def get_centroids(self):
		 """
		 Definition : extracts centroids from the centroids file define afetr --c flag
		 Out : Return the list of centroids
		 """
		 # self.options.c is the name of the file following --c option
		 f = open(self.options.c,'r')
		 centroids=[]
		 for line in f.read().split('\n'):
		 if line:
		 x,y = line.split(',')
		 centroids.append([float(x),float(y)])
		 f.close()
		 return centroids
	 
	 def mapper(self, _, lines):
		 """
		 Definition : Mapper take centroids extract form get_centroids() and the point cloud and for each point, calculate the distance to the centroids, find the mininum of it
		 Out : yield the point with it's class
		 """
		 centroids = self.get_centroids()
		 for l in lines.split('\n x,y = l.split(',')
		 point = [float(x),float(y)]
		 min_dist=100000000.0
		 classe = 0
		 #iterate over the centroids (Here we know that we are doing a 3means)
		 for i in range(3):
		 dist = self.dist_vec(point,centroids[i])
		 if dist < min_dist:
		 min_dist = dist
		 classe = i
		 yield classe, point
	 
	 def combiner(self,k,v):
		 """
		 Definition : Calculate for each class, at the end of the mapper, before reducer, the medium point of each class
		 Out: return for each class, the centroids for each mapper
		 """
		 count = 0
		 moy_x=moy_y=0.0
		 for t in v:
		 count += 1
		 moy_x+=t[0]
		 moy_y+=t[1]
		 yield k, (moy_x/count,moy_y/count)

	 def reducer(self, k, v):
		 """
		 Definition : for each class, get all the tmp centroids from each combiner and calculate the new centroids.
		 """
		 # k is class and v are medium points linked to the class
		 count = 0
		 moy_x=moy_y=0.0
		 for t in v:
		 count += 1
		 moy_x+=t[0]
		 moy_y+=t[1]
		 print str(k)+","+str(moy_x/count)+","+str(moy_y/count)
 
if __name__ == '__main__':
 #just run mapreduce !
 MRKMeans.run()
```

### In Python with the Spark API.

```python
from pyspark.mllib.clustering import KMeans
from math import sqrt

# Load and parse the data
data = sc.textFile("kmeans_data.txt")
parsedData = data.map(lambda line: [float(x) for x in line.split(' ')])

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10,
 runs=10, initializationMode="random")

print clusters.clusterCenters

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
 center = clusters.centers[clusters.predict(point)]
 return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
```

<a name="introduction"></a>
## Intro to Spark (20 min)

Spark was developed in response to limitations in the MapReduce cluster computing paradigm. In MapReduce, data is read from disk and then a function is mapped across the data. Then a reducer will reduce the results of the map and store results back to HDFS. Spark relaxes the constraints of MR by doing the following:

- Generalizes computation from MapReduce-only graphs to arbitrary Directed Acyclic Graphs (DAGs)
- Removes a lot of boilerplate code present in Hadoop
- Allows tweaks to otherwise inaccesible components in Hadoop, such as the sort algorithm
- Loads data into cluster memory, rather than reading from disk, speeding up I/O enormously

This makes Spark vastly preferable for certain use cases, especially ingesting streaming data, doing real-time interactive analytics and running machine learning algorithms.

The two pillars on which Spark is based are RDDs and DAGs.

### Resilient Distributed Datasets (RDDs)

Spark is based on a data structure called _resilient distributed datasets (RDD)_, an object that represents data placed into the system. The critical improvement here is that data can reside in-memory. The two key RDD concepts are:

- Transformations: doing something to RDDs to produce new RDDs (e.g. filtering)
- Actions: asking Spark to process an RDD to output some result (e.g. counting things)



![](./assets/images/spark-rdd.png)
(Image from https://dzone.com/refcardz/apache-spark)

### Directed Acyclic Graph

The DAG (directed acyclic graph) model is not unlike a Git commit history. 

Each node in the graph represent a particular operation on the data. The graph is _directed_, meaning the information only flows in one direction along the edges and it cannot flow backwards. (Or in circles, hence _acyclic_.)  This is makes the identification of inputs and outputs easy and unique. This is good for fault tolerance: when a system failure wrecks an RDD, the Spark engine can trace its lineage to recreate it.

![DAG](./assets/images/dag.png)



<a name="introduction2"></a>
## Spark Stack and API (10 min)

The Spark Core is the foundation of the overall ecosystem. It provides distributed task dispatching, scheduling, and basic I/O functionalities, exposed through an application programming interface (for Java, Python, Scala, and R) centered on the RDD abstraction. 
![](./assets/images/spark-stack.png)

Here are some of the operations offered by the Spark API:
- map, filter, reduce, reduceByKey
- groupby, sort
- union, join, zip
- count, fold, cogroup, cross, sample, take
- ....

> **Check:** In pairs: choose one of the operations listed above and read about how it works in the documentation. Take 2 minutes to explain to each other what you've learned.

Spark is built in Scala, a language derived from Java that uses both the functional programming and OOP paradigms. Spark builds computation by concatenating functions in the DAG.

![](./assets/images/spark-flow.png)



#### Spark Variables
Spark provides two forms of shared variables:
- broadcast variables: they reference read-only data that needs to be available on all nodes
- accumulators: they can be used to program reductions in an imperative style




#### Spark Operations
Spark provides two types of operations:
- Transformations: these are "lazy" operations that only return a result upon "collect"
- Actions: these are "non-lazy" operations that immediately return a result

Using lazy operations, we can build a computation graph that only gets executed when we collect the result. This allows Spark to optimize the requested calculation by optimizing the underlying DAG of operations.



Spark runs on a cluster manager and uses a distributed storage system: these can be Hadoop and HDFS, though there are other options.

**Option 1**

We can also play with Spark in a pseudo-distributed local mode, where the local file system is actually used. Let's do that.

**Option 2**

If you're having trouble running the 'dsi-bigdata' vagrant box, you can spin up a cluster running Spark on AWS by following this guide: https://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-spark-launch.html. Once your cluster is up, ssh into the master like we did yesterday, and run the `pyspark` shell.

Nb - the sample dataset in that tutorial is no longer available. Try reusing the cloudfront log: `sc.textFile("s3://us-west-2.elasticmapreduce.samples/cloudfront/data/")`. Or really go wild with one of these: https://aws.amazon.com/public-data-sets/.

<a name="demo"></a>
## Demo: Spark Map Reduce (15 min)
(adapted from: http://spark.apache.org/docs/latest/quick-start.html)

For the next part we will use our virtual machine:

- In bash, go to the directory with the dsi-bigdata-vm.box and accompanying Vagrantfile
- `vagrant up`
- `vagrant ssh`
- If it requires a password, try `vagrant`
- `bigdata_start.sh`



Open a PySpark shell by typing: `pyspark`.

![](./assets/images/pysparkshell.png)

You should also be able to see an active spark context here: http://10.211.55.101:4040/jobs/

![](./assets/images/sparkweb.png)

Let's load a text file and perform a few operations:

```python
textFile = sc.textFile('file:///home/vagrant/data/project_gutenberg/pg11.txt')
```

We have just created an RDD called `textFile`. As you know, RDDs have actions, which return values, and transformations, which return pointers to new RDDs. Let’s start with a few actions:

```python
>>> textFile.count() # Number of items in this RDD
3735

>>> textFile.first() # First item in this RDD
u"Project Gutenberg's Alice's Adventures in Wonderland, by Lewis Carroll"
```

Now let’s use a transformation. We will use the filter transformation to return a new RDD with a subset of the items in the file.

```python
>>> linesWithAlice = textFile.filter(lambda line: "Alice" in line)
```

Notice that the shell returned immediately, since this is a transformation which is lazy. If you type `linesWithAlice`, you should see: 

```PythonRDD[10] at RDD at PythonRDD.scala:43```

We can chain together transformations and actions:

```python
>>> textFile.filter(lambda line: "Alice" in line).count() # How many lines contain "Alice"?
396
```

RDD actions and transformations can be used for more complex computations. Let’s say we want to find the line with the most words:

```python
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
18
```

Let's stop here for a second and review the last line.

We started with the `textFile` RDD and we first applied the following map: `lambda line: len(line.split())`

**Check:** what does this function do?


> Answer: it splits each line into words and counts how many words per line there are



Then we chained to the result of the map the following reduce map: `lambda a, b: a if (a > b) else b`

**Check** what does this function do?


> Answer: it takes two values and returns the biggest of the two



We used Python anonymous functions (lambdas), but we can also pass any top-level Python function we want. For example, we’ll define a max function to make this code easier to understand:

```python
>>> def max(a, b):
...     if a > b:
...         return a
...     else:
...         return b
...

>>> textFile.map(lambda line: len(line.split())).reduce(max)
18
```

<a name="guided-practice"></a>
## Guided Practice: Spark Map Reduce (10 min)
### Word count

**Check:** how did we implement the word count in Hadoop?


> step 1: split into words
> step 2: map each word to a pair of (word, 1)
> step 3: reduce by key and sum the counts

Here is how you would implement it in Spark

```python
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
```

Notice that this is a lazy operation. To collect the word counts in our shell, we use the collect action:

```python
>>> wordCounts.collect()
[(u"figure!'", 1), (u'four', 6), (u'hanging', 3), (u'ringlets', 1), (u"story!'", 2), (u'Foundation', 14), ...]
```

To do this we used 3 different Spark operations:

- map: Return a new distributed dataset formed by passing each element of the source through a function.
- flatMap: Similar to map, but each input item can be mapped to 0 or more output items. Using this we are given a single list of all the words in the file, instead of a list of lists where each item is the list of words in a given line.
- reduceByKey: When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.



**Check:** Try modifying the code you have just written.
- Try loading a different file
- Try sorting the words by count to find the most common words

### Caching

Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm like [PageRank](https://en.wikipedia.org/wiki/PageRank). As a simple example, let’s mark our `linesWithAlice` dataset to be cached:

```python
>>> linesWithAlice.cache()
```

Now try running `linesWithAlice.count()` twice. What happens?



> Answer: the first time takes longer than the second time, because the result is cached

<a name="ind-practice"></a>
## Independent Practice: Explore the Spark Shell  (15 min)

### Pi Estimation
Let's estimate the value of π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4. Try changing the value of NUM_SAMPLES and see what happens.

In [1]:
from random import random

def sample(p):
    x, y = random(), random()
    return 1 if x*x + y*y < 1 else 0

NUM_SAMPLES = 100000
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

NameError: name 'sc' is not defined

> Pi is roughly 3.137640

**Check:** What's going on? Why did we just do that?
> Answer: We parallelized the generation of NUM_SAMPLES random numbers to estimate the value of Pi. Cool!

<a name="conclusion"></a>
## Conclusion: (10 min)

- Spark is a cluster computing framework, which runs on a cluster manager and uses a distributed storage system (often Hadoop and HDFS).
- Spark is growing rapidly more popular thanks to its increased speed and ease of iteration over MapReduce in some use cases.
- It is built with Scala, and includes APIs for Java, Python, Hive and now R.



The evolving Spark ecosystem includes:

- **Spark Core:** Contains the basic functionality of Spark; in particular the APIs that define RDDs and the operations and actions that can be undertaken upon them. The rest of Spark's libraries are built on top of the RDD and Spark Core.

- **Spark SQL:** Provides APIs for interacting with Spark via the Apache Hive variant of SQL called Hive Query Language (HiveQL). Every database table is represented as an RDD and Spark SQL queries are transformed into Spark operations. For those that are familiar with Hive and HiveQL, Spark can act as a drop-in replacement.



- **Spark Streaming:** Enables the processing and manipulation of live streams of data in real time. Many streaming data libraries (such as Apache Storm) exist for handling real-time data. Spark Streaming enables programs to leverage this data similar to how you would interact with a normal RDD as data is flowing in.

- **MLlib:** A library of common machine learning algorithms implemented as Spark operations on RDDs. This library contains scalable learning algorithms like classifications, regressions, etc. that require iterative operations across large data sets. The Mahout library, formerly the Big Data machine learning library of choice, will move to Spark for its implementations in the future.

- **GraphX:** A collection of algorithms and tools for manipulating graphs and performing parallel graph operations and computations. GraphX extends the RDD API to include operations for manipulating graphs, creating subgraphs, or accessing all vertices in a path.

### ADDITIONAL RESOURCES

- [Spark Slideshare Presentation](http://www.slideshare.net/WillDu1/ten-tools-for-ten-big-data-areas-03apache-spark)
- [Quora: what is Apache Spark](https://www.quora.com/What-exactly-is-Apache-Spark-and-how-does-it-work)
- [Qubole: Apache Spark Use Cases](https://www.qubole.com/blog/big-data/apache-spark-use-cases)
- [Spark Examples](http://spark.apache.org/examples.html)
- [Spark getting started](https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python)