<img src='https://raw.githubusercontent.com/afo/dataXprague/master/imgs/dx_logo.png' width=600px></img>

**Inspiriation and sources:** Databricks intro tutorial and [Using Apache Spark 2.0 to Analyze the City of San Francisco's Open Data](https://www.youtube.com/watch?v=K14plpZgy_c) by Sameer Farooqui

# Introduction to Spark and Big Data

Databricks is a platform for running Spark without complex cluster management or tedious maintenance tasks. Spark is a distributed computation framework for executing code in parallel across many different machines. Databricks is the Spark team's enterprise solution makes big data simple by providing Spark as a hosted solution.

## Databricks Terminology

-   ****Workspaces**** : Where you store the ****notebooks**** and ****libraries****.
-   ****Notebooks**** : Like Jupyter Notebooks that can run `Scala`, `Python`, `R`, `SQL`, or `Markdown`. Define language by `%[language name]` at the top of the cell. Connect to a cluster to run.
-   ****Dashboards**** can be created from ****notebooks**** as a way of displaying the output of cells without the code.
-   ****Libraries**** : Packages / Modules. You can install them via pypi.
-   ****Tables**** : Structured data, that can be stored in data lake / cloud storage. Stored on Cluster or cached in memory.
-   ****Clusters**** : Groups of computers that you treat as a single computer to perform operations on big sets of data.
-   ****Jobs**** : Schedule execution on ****notebooks**** or Python scripts. They can be created either manually or via the REST API.
-   ****Apps**** : 3rd party integrations with the Databricks platform like Tableau.

### Spark's history

Spark was developed by founders of Databricks in AMPLab at UC Berkeley. Started 2009, donated to Apache open source in 2013.

### The Contexts/Environments

Before Spark 2.X many used the `sparkContext` made available as `sc` and the `SQLContext` made available as `sqlContext`. The `sqlContext` makes a lot of DataFrame functionality available while the `sparkContext` focuses more on the Apache Spark engine itself.

In Spark 2.X, there is just one context - the `SparkSession`.

### The Data Interfaces

Key interfaces.

-   ****The DataFrame**** : Collection of distributed `Row` types (note no indicies for look up). Similar to pandas or R dataframe.
-   ****The RDD (Resilient Distributed Dataset)**** : Interface to a sequence of data objects that consist of one or more types that are located across a variety of machines in a cluster. Focus on DataFrames as those will be supersets of the current RDD functionality.

See speed difference:

<img src='https://databricks.com/wp-content/uploads/2015/02/Screen-Shot-2015-02-16-at-9.46.39-AM.png' width=600px></img>

Spark is a unified processing engine that can analyze big data using SQL, machine learning, graph processing or real time stream analysis. Streaming (infinte Dataframe), Machine Learning, Graph / Pagerank.

![https://camo.githubusercontent.com/ed6aceb55bbc8761830b6effe52e8aa8ef146a99/687474703a2f2f637572726963756c756d2d72656c656173652e73332d776562736974652d75732d776573742d322e616d617a6f6e6177732e636f6d2f77696b692d626f6f6b2f626f6f6b5f696e74726f2f737061726b5f34656e67696e65732e706e67](https://camo.githubusercontent.com/ed6aceb55bbc8761830b6effe52e8aa8ef146a99/687474703a2f2f637572726963756c756d2d72656c656173652e73332d776562736974652d75732d776573742d322e616d617a6f6e6177732e636f6d2f77696b692d626f6f6b2f626f6f6b5f696e74726f2f737061726b5f34656e67696e65732e706e67)

You can read from many different data sources and Spark runs on every major environment. We will use Amazon EC2. We will read CSV data. Stick with Dataframe and SQL.

![https://camo.githubusercontent.com/ed6aceb55bbc8761830b6effe52e8aa8ef146a99/687474703a2f2f637572726963756c756d2d72656c656173652e73332d776562736974652d75732d776573742d322e616d617a6f6e6177732e636f6d2f77696b692d626f6f6b2f626f6f6b5f696e74726f2f737061726b5f34656e67696e65732e706e67](https://camo.githubusercontent.com/165b53e995510bb3c5f77fc837d90faa9f222de6/687474703a2f2f637572726963756c756d2d72656c656173652e73332d776562736974652d75732d776573742d322e616d617a6f6e6177732e636f6d2f77696b692d626f6f6b2f626f6f6b5f696e74726f2f737061726b5f676f616c2e706e67)

# Let's Start

Before you start running code, you need to make sure that the notebook is attached to a cluster.

### To create a Cluster

Click the Clusters button that you'll notice on the left side of the page. On the Clusters page, click on ![img](https://training.databricks.com/databricks_guide/create_cluster.png) in the upper left corner.

Then, on the Create Cluster dialog, enter the configuration for the new cluster.

Finally, 

-   Select a unique name for the cluster.
-   Select the most recent stable Runtime Version.
-   Enter the number of workers to bring up - at least 1 is required to run Spark commands.


**Go back to the notebook and in the top right corner press Detached and connect to your cluster.**

*Note, Databricks community clusters only run for an hour*

first let's explore the previously mentioned `SparkSession` where info is stored. We can access it via the `spark` variable.

In [7]:
spark

We can use the spark context to parallelize a small Python range that will provide a return type of `DataFrame`.

In [9]:
firstDataFrame = spark.range(10000)

print(firstDataFrame) # if you just run a transformation no Spark Job is done.

In [10]:
# or use RDD through sc (spark context)
spark.sparkContext.parallelize(range(1000))

Now one might think that this would actually print the values parallelized. That's not how Spark works.

Spark allows two distinct kinds of operations, **transformations** and **actions**.

![ta2](https://camo.githubusercontent.com/f04cc9974c24d16f9425c4b86af1537cc9257dd0/687474703a2f2f637572726963756c756d2d72656c656173652e73332d776562736974652d75732d776573742d322e616d617a6f6e6177732e636f6d2f77696b692d626f6f6b2f67656e6572616c2f737061726b5f74612e706e67)

### Transformations

Transformations will only get executed once you have called a **action**. An example of a transformation might be to convert an integer into a float or to filter a set of values. I.e. Lazy Evaluation.

### Actions

Actions are computed during execution. Run all of the previous transformations in order to get back an actual result. An action is composed of one or more jobs which consists of tasks that will be executed by the workers in parallel where possible.

Sshort sample of actions and transformations:

![transformations and actions](https://training.databricks.com/databricks_guide/gentle_introduction/trans_and_actions.png)

In [12]:
firstDataFrame.show(3) # example of an action, dataframe is now evaluated

In [13]:
# An example of a transformation
# select the ID column values and multiply them by 2, SQL interfac
secondDataFrame = firstDataFrame.selectExpr("(id * 2) as value")

In [14]:
secondDataFrame.show(5)

In [15]:
from pyspark.sql.functions import col # to select columns

firstDataFrame.withColumn('id2', col('id')*2).show(3)

In [16]:
# Or common before Spark 2.X
firstDataFrame.rdd.map(lambda x: x[0]*2).take(3)

In [17]:
# or
firstDataFrame.take(5)

In [18]:
# or
display(firstDataFrame)

Transformations are lazily evaluated because it is easy to optimize the entire pipeline of computations this way. Computations can be parallellized and executed on many different nodes at once (like a map and a filter).

![transformations and actions](https://training.databricks.com/databricks_guide/gentle_introduction/pipeline.png)

Spark also keeps results in memory, as opposed to other frameworks (e.g. Hadoop Map Reduce) that write to disk.

## Spark Architecture

Spark allows you to treat many machines as one via a master-worker architecture.

There is `driver` or master node in the cluster, accompanied by `worker` nodes. The master sends work to the workers and either instructs them to pull to data from memory or from disk (or from another data source).

Spark Cluster has a Driver node that communicates with executor nodes. Executor nodes are logically like execution cores. 

![spark-architecture](https://training.databricks.com/databricks_guide/gentle_introduction/videoss_logo.png)

The Driver sends Tasks to the empty slots on the Executors when work has to be done:

![spark-architecture](https://training.databricks.com/databricks_guide/gentle_introduction/spark_cluster_tasks.png)

Note: In the case of the Community Edition there is no worker, the master executes the entire code. However, the same code works on any cluster (beware of CPU / GPU frameworks).

![spark-architecture](https://docs.databricks.com/_static/images/notebooks/notebook-microcluster-agnostic.png)

Access details in the web UI by clicking at the top left of this notebook.

# Working example with data

To illustrate **transformations** and **actions** - let's go through an example using `DataFrames` and a csv file of a public dataset that Databricks makes available. Available using the Databricks filesystem. Let's load the popular diamonds dataset in as a spark  `DataFrame`. Now let's go through the dataset that we'll be working with.

Use `%fs` to interact with the spark filesystem

In [22]:
%fs ls /databricks-datasets/Rdatasets/data-001/datasets.csv

In [23]:
dataPath = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
diamonds = spark.read.format("csv")\
  .option("header","true")\
  .option("inferSchema", "true")\
  .load(dataPath)
  
# inferSchema means we will automatically figure out column types 
# at a cost of reading the data more than once

Show the dataframe with Databricks `display` function or the show function.

In [25]:
display(diamonds)

In [26]:
display(diamonds.limit(5)) # for a subset

In [27]:
diamonds.printSchema() # see that the column types are OK and schema inferred correctly.

In [28]:
diamonds.rdd.getNumPartitions() # only one partition. This dataframe does not exist in memory. For big data several partitions.
# Partitions can be optimized according to your cluster size. Have it divisible by cluster size.
# For community edition, any number * 3 is OK
# you can use REPARTITION method

In [29]:
diamonds.count() # reads through the whole data set

In [30]:
display(diamonds.summary())

In [31]:
diamonds.select('cut').distinct().show() # show unique entries in the cut column

What makes `display` exceptional is the fact that we can very easily create some more sophisticated graphs by clicking the graphing icon that you can see below. Here's a plot that allows us to compare price, color, and cut.

In [33]:
display(diamonds)

In [34]:
# most common cut, ordered. First interesting insight.
display(diamonds.select('cut').groupBy('cut').count().orderBy('count',ascending=False))

In [35]:
display(diamonds.select('price','cut').groupBy('cut').avg('price')) # show graph, prepares 5 jobs

Now that we've explored the data, let's return to understanding **transformations** and **actions**. First transformations, then actions.

First we group by two variables, cut and color and then compute the average price. Then we're going to inner join that to the original dataset on the column `color`. Then we'll select the average price as well as the carat.

In [37]:
df1 = diamonds.groupBy("cut", "color").avg("price") # a simple grouping

df2 = df1\
  .join(diamonds, on='color', how='inner')\
  .select("`avg(price)`", "carat")
# a simple join and selecting some columns

These transformations are now complete in a sense but nothing has happened.

The reason for that is these computations are *lazy* in order to build up the entire flow of data from start to finish required by the user. This is an intelligent optimization for two key reasons. Any calculation can be recomputed from the very source data allowing Apache Spark to handle any failures that occur along the way, successfully handle stragglers. Secondly, Spark can optimize computation so that data and computation can be `pipelined`.

To get a sense for what this plan consists of, we can use the `explain` method.

In [39]:
df2.explain()

Now explaining the above results is outside of this introductory tutorial. This is Spark's plan for how it hopes to execute the given query.

In [41]:
df2.count()

This will execute the plan that Apache Spark built up previously. Click the little arrow next to where it says `(X) Spark Jobs` after that cell finishes executing and then click the `View` link. This brings up the Apache Spark Web UI right inside of your notebook.

![img](https://training.databricks.com/databricks_guide/gentle_introduction/spark-dag-ui.png)

These are significant visualizations called Directed Acyclic Graphs (DAG)s of all the computations that have to be performed in order to get to that result. 

Transformations are *lazy* - while generating this series of steps Spark will optimize lots of things, one of core reasons that users should be focusing on using DataFrames and Datasets instead of the legacy RDD API. With DataFrames and Datasets, Apache Spark will work under the hood to optimize the entire query plan and pipeline entire steps together.

# SQL view

In [44]:
diamonds.repartition(3).createOrReplaceTempView("diamondsView") # also repartition, create a table view for SQL

In [45]:
diamonds.count()

In [46]:
%sql SELECT carat, cut, color from diamondsView ORDER BY carat DESC;

In [47]:
# in jupyter
spark.sql('SELECT * FROM diamondsView').show()

# To pandas DataFrame

In [49]:
import pandas as pd

pd_df = diamonds.toPandas()

In [50]:
pd_df.head(5)

In [51]:
type(pd_df)

### Caching

Spark can store things in memory during computation. Can speed up access to commonly queried tables or pieces of data. This is also great for iterative algorithms that work over and over again on the same data.

To cache a DataFrame or RDD, simply use the cache method.

In [53]:
df2.cache() # look in the UI / Storage

Caching, like a transformation, is performed lazily, won't store the data in memory until you call an action on that dataset. 

Here's a simple example. We've created our df2 DataFrame which is essentially a logical plan that tells us how to compute that exact DataFrame. We've told Apache Spark to cache that data after we compute it for the first time. So let's call a full scan of the data with a count twice. The first time, this will create the DataFrame, cache it in memory, then return the result. The second time, rather than recomputing that whole DataFrame, it will just hit the version that it has in memory.

Let's take a look at how we can discover this.

In [55]:
df2.count() # read all data and then materialize, cache it in memory
# 
# Tungsten method to cache DataFrame into memory, makes it smaller.
# Optimize by repartitioning according to your cluster also
# Optimal partition sizes are 50-100Mb

However after we've now counted the data. We'll see that the explain ends up being quite different.

In [57]:
df2.count()

In the above example, we can see that this cuts down on the time needed to generate this data immensely - often by at least an order of magnitude. With much larger and more complex data analysis, the gains that we get from caching can be even greater!

In [59]:
%fs ls /tmp/

In [60]:
# to save work and dataframe save as a Parquet file
diamonds.write.format('parquet').save('/tmp/diamonds/')

In [61]:
%fs ls /tmp/diamonds/

In [62]:
# Easily continue work if the cluster is shutdown, link to folder:
diamonds2 = spark.read.parquet('/tmp/diamonds/')

In [63]:
diamonds2.show() # will include all partitioning, cache into memory etc.

# parque files are really efficient to read from. Always take CSV or JSON, do the ETL and then write to Parquet file.

## Conclusion

In this notebook we've covered a ton of material! But you're now well on your way to understanding Spark and Databricks!