# Introduction to Spark

## What is Spark?

* Spark is a fast and general engine for large-scale distributed data processing.
* Spark can run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.
* Spark improves on MapReduce's computation model with an advanced DAG (Directed Acyclic Graph) execution engine that supports cyclic data flow and in-memory computing.
* Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3.
* Spark is written in Scala, but has bindings for Java, Python (PySpark), and R.

## Spark modules

Spark has several modules:

* **Spark SQL**: Interact with Spark via SQL interface (like Hive for Hadoop)
* **Spark Streaming**: Handle streaming data for realtime workflows
* **Spark MLLib**: Machine Learning in Spark (think sklearn for big data)
* **Spark GraphX**: Graph processing in Spark

<img src='image/spark_stack.png'/>

## Spark concepts

There are four concepts that are key to understanding spark:

* uses Resilient Distributed Datasets (RDDs)
* processes on a cluster
* performs lazy evaluation
* multiple interactive shells available

## What are RDDs?

The main abstraction Spark provides is a Resilient Distributed Dataset (RDD), which is a fault-tolerant collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.

An example of a RDD: 

```python
data = [1, 2, 3, 4, 5]   
distData = sc.parallelize(data)
```

<img src='image/RDD.jpg'/>


Our RDD has an array, that is referencing some partions' objects. The idea is that these partion objects are serializable and can be spread around the nodes. For example, if we had a four node cluster, we would expect that each node would be assigned a partion. The partition objects in turn reference memory. The data is kept in RAM on the various nodes (and this is where we will do our operations by default). The reason that Spark is extremely effecient at iterative applications is because RDDs can be cached into memory.
    
Note: if we had an 1000 node cluster (but only 4 partitions), we would only be using 4 nodes on our cluster (Of course, we want to pay attention to the # of partitions!)

Also note that the RDDs are an immutable collection of objects. (Why?) Immutability eliminates potential problems due multiple thread updates. Immutable data is safe to share across processes. 

### RDD operations

There are two main types of operations used by RDDs:

1. **transformations**: operations that will be evaluated later on (*lazy*)
2. **actions**: when those transformations are actually executed (*executing*)

Some examples of each type of operation:

Transformations: *lazy* | Actions: *executing*
--- | --- 
**map(func):** pass each element of source through func, return new RDD | **reduce(func):** aggregate elements with func
**filter(func):** select elements of the source for which func returns true, return new distributed RDD | **take(n):** copy top n elements to driver
**distinct():** return duplicate-free RDD | **collect():** copy all elements to driver
**sample(withReplacement, fraction [seed]):** sample RDD, with or without replacement |  **foreach(func):** apply provided func to each element of RDD
[more examples](http://spark.apache.org/docs/latest/programming-guide.html#transformations) | [more examples](http://spark.apache.org/docs/latest/programming-guide.html#actions)

Below is a command from PySpark with the operation type annotated. Notice the syntax resemblance to Pandas?

```bash
pagecountsEnAllDF
  .select($"project", $"requests")   //transformation
  .groupBy($"project")               //transformation
  .sum()                             //transformation
  .orderBy($"sum(requests)".desc)    //transformation
  .show()                            //action
```

If you comment out `show()`, nothing will happen. It won't execute.


 ## Clusters (and their components)
    
Spark applications run as independent sets of processes on a cluster. These processes are coordinated by a spark context (SC) object in the main program (aka driver program).

1. In order to run on a cluster, the spark context can connect to several types of cluster managers (Spark's or Mesos/YARN) which allocate resources across applications.                                                                                               
2. Spark then acquires executors on nodes in the cluster (executors ~ processes that run computations and store data for your applications.) 
3. Next, it sends application code (defined by JAR or Python files passed to the SC) to the executors (each application gets its own executor processes!)  
8. Finally, the spark context sends tasks for the executors to run (tasks from different applications run in different JVMs!)

http://spark.apache.org/docs/latest/cluster-overview.html

<img src='image/spark_cluster.png'/>

## Lazy transformations

.. so the results are not computed right away. 

Spark remembers that transformations were applied to a given dataset. The transformations are only computed when an action requires a result to be returned to the driver program. 
This design enables Spark to run more efficiently!  

For example, imagine a dataset that is called in a mapper.
The result of the mapper isnt really required until the reducer is called. Thus, it's more effecient to call the data in the mapper than to call the mapped data.


## Interactive shells

Spark contains a number of interactive shells. We will be covering PySpark today.

* `spark-shell` for the Scala shell
* `pyspark` for the Python shell