In [1]:
# Apache Spark – a unified computing engine and set of libraries for big data

# Spark is designed to support a wide range of data analytics tasks, ranging from simple data loading and SQL queries to machine learning and streaming computation, over the same computing engine and with a consistent set of APIs. For example, data scientists benefit from a unified set of libraries (e.g., Python or R) when doing modeling. Spark’s unified nature makes these tasks both easier and more efficient to write. 

# APIs just allow applications to communicate with one another. Web based APIs return data in response to a request made by a client.

# Spark carefully limits its scope to a computing engine. By this, we mean that Spark only handles loading data from storage systems and performing computation on it, not permanent storage as the end itself. Spark can be used with a wide variety of persistent storage systems, including cloud storage systems such as Azure Storage and Amazon S3 and distributed file systems such as Apache Hadoop

# Spark’s focus on computation makes it different from earlier big data software platforms such as Apache Hadoop. Hadoop included both a storage system (the Hadoop file system, designed for low-cost storage over clusters of commodity servers) and a computing system (MapReduce), which were closely integrated together. However, this choice makes it hard to run one of the systems without the other, or even more importantly, to write applications that access data stored anywhere else

# Spark includes libraries for SQL and structured data (Spark SQL), machine learning (MLlib), stream processing (Spark Streaming and the newer Structured Streaming), and graph analytics (GraphX). Beyond these libraries, there hundreds of open source external libraries ranging from connectors for various storage systems to machine learning algorithms. One index of external libraries is available at spark-packages.org.

In [2]:
# Spark’s Basic Architecture

# Single machines do not have enough power and resources to perform computations on huge amounts of information. A cluster, or group of machines, pools the resources of many machines together allowing us to use all the cumulative resources as if they were one. Now a group of machines alone is not powerful, you need a framework to coordinate work across them. Spark is a tool for just that, managing and coordinating the execution of tasks on data across a cluster of computers.The cluster of machines that Spark will leverage to execute tasks will be managed by a cluster manager like Spark’s Standalone cluster manager, YARN, or Mesos.

In [3]:
# Spark Applications

# Spark Applications consist of a driver process and a set of executor processes. The driver process runs your main() function, sits on a node in the cluster, and is responsible for three things: maintaining information about the Spark Application; responding to a user’s program or input; and analyzing, distributing, and scheduling work across the executors. The executors are responsible for actually executing the work that the driver assigns them.

In [4]:
# Spark’s Language APIs

# Spark’s language APIs allow you to run Spark code from other languages. For the most part, Spark presents some core “concepts” in every language and these concepts are translated into Spark code that runs on the cluster of machines. When using Spark from a Python or R, the user never writes explicit JVM instructions, but instead writes Python and R code that Spark will translate into code that Spark can then run on the executor JVMs. 

In [5]:
# The SparkSession

# The SparkSession instance is the way Spark executes user-defined manipulations across the cluster. Let’s go ahead and look at the SparkSession in Python:

myRange = spark.range(1000).toDF("number")

In [6]:
# You just ran your first Spark code! We created a DataFrame with one column containing 1000 rows with values from 0 to 999. This range of number represents a distributed collection. When run on a cluster, each part of this range of numbers exists on a different executor. This is a Spark DataFrame.

In [7]:
# DataFrames

# A DataFrame is the most common Structured API and simply represents a table of data with rows and columns. The DataFrame concept is not unique to Spark. R and Python both have similar concepts. However, Python/R DataFrames (with some exceptions) exist on one machine rather than multiple machines. This limits what you can do with a given DataFrame in python and R to the resources that exist on that specific machine. However, since Spark has language interfaces for both Python and R, it’s quite easy to convert to Pandas (Python) DataFrames to Spark DataFrames and R DataFrames to Spark DataFrames (in R).

In [8]:
# Partitions

# In order to allow every executor to perform work in parallel, Spark breaks up the data into chunks, called partitions. A partition is a collection of rows that sit on one physical machine in our cluster. An important thing to note, is that with DataFrames, we do not (for the most part) manipulate partitions manually (on an individual basis). We simply specify high-level transformations of data in the physical partitions and Spark determines how this work will actually execute on the cluster

In [9]:
# Transformations

# In Spark, the core data structures are immutable meaning they cannot be changed once created.  In order to “change” a DataFrame you will have to instruct Spark how you would like to modify the DataFrame you have into the one that you want. These instructions are called transformations. Let’s perform a simple transformation to find all even numbers in our current DataFrame.

divisBy2 = myRange.where("number % 2 = 0")

# You will notice that these return no output, that’s because we only specified an abstract transformation and Spark will not act on transformations until we call an action. There are two types of transformations, those that specify narrow dependencies and those that specify wide dependencies.

# Transformations consisting of narrow dependencies (we’ll call them narrow transformations) are those where each input partition will contribute to only one output partition. 

# A wide dependency (or wide transformation) style transformation will have input partitions contributing to many output partitions. You will often hear this referred to as a shuffle where Spark will exchange partitions across the cluster. 

In [10]:
# Lazy Evaluation

# Lazy evaluation means that Spark will wait until the very last moment to execute the graph of computation instructions. In Spark, instead of modifying the data immediately when we express some operation, we build up a plan of transformations that we would like to apply to our source data. Spark, by waiting until the last minute to execute the code, will compile this plan from your raw, DataFrame transformations, to an efficient physical plan that will run as efficiently as possible across the cluster

In [11]:
# Actions

# Transformations allow us to build up our logical transformation plan. To trigger the computation, we run an action. An action instructs Spark to compute a result from a series of transformations. 

divisBy2.count()

# In specifying our action, we started a Spark job that runs our filter transformation (a narrow transformation), then an aggregation (a wide transformation) that performs the counts on a per partition basis, then a collect which brings our result to a native object in the respective language in this case python.

In [12]:
# Spark UI

# During Spark’s execution of the previous code block, users can monitor the progress of their job through the Spark UI. The Spark UI maintains information on the state of our Spark jobs, environment, and cluster state. You can see the Spark UI in the clusters tab in Databricks. 

In [13]:
# An End to End Example

# Spark includes the ability to read and write from a large number of data sources. In order to read this data in, we will use a DataFrameReader that is associated with our SparkSession. In doing so, we will specify the file format as well as any options we want to specify. You can get the data from here: https://github.com/databricks/Spark-The-Definitive-Guide 

# After downloading the data you can add data using the Data tab on Databricks and copy and paste the file path below.

flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("/FileStore/tables/2015_summary-ebaee.csv")

# Each of these DataFrames (in Scala and Python) each have a set of columns with an unspecified number of rows. The reason the number of rows is “unspecified” is because reading data is a transformation, and is therefore a lazy operation. Spark only peeked at a couple of rows of data to try to guess what types each column should be.

In [14]:
# If we perform the take action on the DataFrame, we will be able to see results

flightData2015.take(3)

In [15]:
# Nothing happens to the data when we call sort because it’s just a transformation. However, we can see that Spark is building up a plan for how it will execute this across the cluster by looking at the explain plan. We can call explain on any DataFrame object to see the DataFrame’s lineage (or how Spark will execute this query).

flightData2015.sort("count").explain()

In [16]:
# . By default, when we perform a shuffle Spark will output two hundred shuffle partitions. We will set this value to five in order to reduce the number of the output partitions from the shuffle from two hundred to five.

spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

#  Go ahead and experiment with different values and see the number of partitions yourself. In experimenting with different values, you should see drastically different run times. 

In [17]:
# DataFrames and SQL

# You can express your business logic in SQL or DataFrames (either in R, Python, Scala, or Java) and Spark will compile that logic down to an underlying plan (that we see in the explain plan) before actually executing your code. Spark SQL allows you as a user to register any DataFrame as a table or view (a temporary table) and query it using pure SQL.

flightData2015.createOrReplaceTempView("flight_data_2015")

In [18]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

sqlWay.explain()
dataFrameWay.explain()

# We can see that these plans compile to the exact same underlying plan!

In [19]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

In [20]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

In [21]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.collect()

In [22]:
# Now let’s move to the DataFrame syntax that is semantically similar but slightly different in implementation and ordering. But, as we mentioned, the underlying plans for both of them are the same. Let’s execute the queries and see their results as a sanity check.

from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.collect()

# This execution plan is a directed acyclic graph (DAG) of transformations, each resulting in a new immutable DataFrame, on which we call an action to generate a result.

# The first step is to read in the data. We defined the DataFrame previously but, as a reminder, Spark does not actually read it in until an action is called on that DataFrame or one derived from the original DataFrame.

# The second step is our grouping, technically when we call groupBy we end up with a RelationalGroupedDataset which is a fancy name for a DataFrame that has a grouping specified but needs the user to specify an aggregation before it can be queried further. 

# Therefore the third step is to specify the aggregation. Let’s use the sum aggregation method. This takes as input a column expression or simply, a column name. The result of the sum method call is a new dataFrame.

# The fourth step is a simple renaming, we use the withColumnRenamed method that takes two arguments, the original column name and the new column name. Of course, this doesn’t perform computation - this is just another transformation! 

# The fifth step sorts the data such that if we were to take results off of the top of the DataFrame, they would be the largest values found in the destination_total column.

# Penultimately, we’ll specify a limit.

# The last step is our action! Now we actually begin the process of collecting the results of our DataFrame above and Spark will give us back a list or array in the language that we’re executing.