## Spark Introduction
![](spark-logo-trademark.png)
>Apache Spark™ is a fast and general engine for large-scale data processing.

    Spark - Apache Spark has an advanced DAG execution engine that supports acyclic data flow and in-memory computing.
Spark is written in Scala (and that is still likely the best place to use it) and runs on the JVM, but has a rich API in Python, R and JAVA. We will be using the Python API for this module, but will likely touch on the others in the future.

Spark combines a rich and powerful dataframes API along with leveraging built-in sql, streaming, machine learning and graph computation capabilities. We can even combine these in the same analysis. 

We will be using it on top of hadoop and the yarn resource manager.

We are going to follow the training from the creaters of Spark.
## ___Resources___
[Spark Documentation](http://spark.apache.org/docs/latest/)

## Spark Session
The spark session is essentially the entrypoint for interacting with spark. It is how we are connected to our cluster. It is stored in the `spark` variable. We use the `spark` variable to access the dataset, dataframe and sql APIs. We can also use it to access metadata about tables and databases that are cached.
```python
spark.catalog.listTables()
```
The spark session is also how we will read/write data.
```python
spark.read.csv('file.csv')
```

In [40]:
spark # The spark session is store in this object

<pyspark.sql.session.SparkSession at 0x105721a90>

In [None]:
spark.read.

### Our first dataframe

In [43]:
df = spark.range(100000)
df

DataFrame[id: bigint]

And that's it, we have a fully parallelized dataframe! From pandas, we might have expected the above to actually print out (or a subset of it to be accurate). So what's going on here? The answer lies in how Spark actually computes.

## Transformations & Actions
>Nature does not know extinction; all it knows is transformation.

Spark is lazy. Or, I should say, _lazily_ evaluated. With Spark, there are exactly 2 types of operation, transformations and actions.
### Transformations
Transformations are operations that will not be excecuted when the cell or codeblock is run, but rather are accumulated, to be executed ___only___ when an action is initiated. This means that Spark is essentially building up a series of transformations (at least as long as no actions have been run) in a DAG. This allows Spark to optimize an entire transformation or series of transformations in the most efficient way and has the benefit of always being able to reconstruct datasets -- simply code the transformation up. 

#### Example DAG
Here is an example DAG, taken from the web UI (port 4040).
![](dag.png)
### Actions
Actions are what will actually give us a result back. Once a piece of code with an action is run, it is executed immediately and the associated transformations are now executed. 

This will all make sense once we get comfortable with the basic codebase, but you can think about it like this: transformations are blueprints (the DAG) to a particular dataset (which might be a subset or combination of other datasets) and actions are actually the building or exection of the blueprints.

As an example, the `.describe()` method is a transformation -- the buleprints to a dataset. Calling `.show()` will actually execute the DAG and return the computation.


In [51]:
df.describe() # Transformation

DataFrame[summary: string, id: string]

In [52]:
df.describe().show() # Action

+-------+-----------------+
|summary|               id|
+-------+-----------------+
|  count|           100000|
|   mean|          49999.5|
| stddev|28867.65779668774|
|    min|                0|
|    max|            99999|
+-------+-----------------+



## Example
OK, let's see some of this in practice and start to get familiar with the API. We are going to use the diamonds dataset and perform some simple wrangling on it to try and understand these opersations.

In [53]:
s3 = "s3://databricks-datasets-oregon/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
df = spark.read.format("com.databricks.spark.csv").option("header",
"true").option("inferSchema",
"true").load(s3) # Use s3 when in aws


In [54]:
print(df)

DataFrame[carat: double, cut: string, color: string, clarity: string, depth: double, table: double, price: int, x: double, y: double, z: double]


We used "inferSchema" above, but could also have entered it manually. The cost of inferring it is that we have to read through the dataset multiple times. Let's use a transformation, `.sample()` and then call an action after. We will see how the transformation isn't executed until we call the action.

In [63]:
sample = df.sample(True, .01, seed=0) # Transformation

Let's use the `.show()` action. You can think of this action as similar to pandas or R's `head()` methods.

In [66]:
sample.show(5)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



We can checkout the DAG physically at any time. Let's see what it has for this.

In [67]:
sample.explain()

== Physical Plan ==
*Sample 0.0, 0.01, true, 0
+- InMemoryTableScan [carat#520, cut#521, color#522, clarity#523, depth#524, table#525, price#526, x#527, y#528, z#529]
      +- InMemoryRelation [carat#520, cut#521, color#522, clarity#523, depth#524, table#525, price#526, x#527, y#528, z#529], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *FileScan csv [carat#49,cut#50,color#51,clarity#52,depth#53,table#54,price#55,x#56,y#57,z#58] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/acb/Documents/DataSci/sparkcourse/diamonds.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<carat:double,cut:string,color:string,clarity:string,depth:double,table:double,price:int,x:...


This can be a bit confusing at first, but there's the built in web UI that makes understanding it a whole lot easier (port 4040). Now let's make a slightly more complex example.

In [70]:
df1 = diamonds.groupBy("clarity", "color").avg("price") # Transformation

In [71]:
df1.show() # Action

+-------+-----+------------------+
|clarity|color|        avg(price)|
+-------+-----+------------------+
|     IF|    F| 2750.836363636364|
|    VS2|    G| 4416.256497656583|
|   VVS2|    D|3351.1283905967452|
|     IF|    D| 8307.369863013699|
|    VS2|    D|  2587.22569239835|
|    VS2|    H| 4722.414485696896|
|    SI2|    D|3931.1014598540146|
|    VS1|    E|2856.2943013270883|
|    SI1|    H| 5032.414945054945|
|   VVS1|    D|2947.9126984126983|
|   VVS2|    F|3475.5128205128203|
|     I1|    G| 3545.693333333333|
|     I1|    F| 3342.181818181818|
|    VS1|    F| 3796.717741935484|
|    VS1|    J| 4884.461254612546|
|    SI2|    H| 6099.895073576456|
|   VVS1|    E|2219.8201219512193|
|     I1|    H| 4453.413580246914|
|   VVS1|    H|1845.6581196581196|
|     I1|    D|3863.0238095238096|
+-------+-----+------------------+
only showing top 20 rows



In [78]:
df2 = df1.join(diamonds, on='clarity', how='inner').select("`avg(price)`", "carat") # Don't worry about the actual join

In [73]:
df2.explain()

== Physical Plan ==
*Project [avg(price)#1038, carat#1102]
+- *BroadcastHashJoin [clarity#3], [clarity#1105], Inner, BuildRight
   :- *HashAggregate(keys=[clarity#3, color#2], functions=[avg(cast(price#6 as bigint))])
   :  +- Exchange hashpartitioning(clarity#3, color#2, 200)
   :     +- *HashAggregate(keys=[clarity#3, color#2], functions=[partial_avg(cast(price#6 as bigint))])
   :        +- *Filter isnotnull(clarity#3)
   :           +- InMemoryTableScan [color#2, clarity#3, price#6], [isnotnull(clarity#3)]
   :                 +- InMemoryRelation [carat#0, cut#1, color#2, clarity#3, depth#4, table#5, price#6, x#7, y#8, z#9], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   :                       +- *FileScan csv [carat#49,cut#50,color#51,clarity#52,depth#53,table#54,price#55,x#56,y#57,z#58] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/acb/Documents/DataSci/sparkcourse/diamonds.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: st

Wow, looks like we already have quite the DAG built up! 

In [75]:
df2.show(5)

+-----------------+-----+
|       avg(price)|carat|
+-----------------+-----+
|2750.836363636364| 0.57|
|2750.836363636364| 0.52|
|2750.836363636364| 0.52|
|2750.836363636364| 0.51|
|2750.836363636364| 0.54|
+-----------------+-----+
only showing top 5 rows



In [77]:
df2.count()

377580

## Caching
In data science, most of the work we do is interactive. We are constantly querying/wrangling our data and then once it's ready for analysis, we might create predictive models or perform some type of unsupervised learning to do something like anomaly detection. It can be frustrating to think we have to read data from disk constantly for each piece of this puzzle. Spark has a solution to this problem -- caching. Spark can store datasets in memory to significantly increase performance. Let's see an example below.

In [79]:
%timeit df2.count()

1 loop, best of 3: 718 ms per loop


In [80]:
df2.cache()

DataFrame[avg(price): double, carat: double]

In [81]:
df2.count() # Load once more from disk

377580

Now run on the cached dataset.

In [82]:
%timeit df2.count()

1 loop, best of 3: 429 ms per loop


This of course is still a small dataset, so imagine the performance increase in a larger dataset.

In [23]:
spark.sparkContext.uiWebUrl

'http://192.168.2.7:4040'