![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)  ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# **Spark Tutorial: Learning Apache Spark**
#### This tutorial will teach you how to use [Apache Spark](http://spark.apache.org/), a framework for large-scale data processing, within a notebook. Many traditional frameworks were designed to be run on a single computer.  However, many datasets today are too large to be stored on a single computer, and even when a dataset can be stored on one computer (such as the datasets in this tutorial), the dataset can often be processed much more quickly using multiple computers.  Spark has efficient implementations of a number of transformations and actions that can be composed together to perform data processing and analysis.  Spark excels at distributing these operations across a cluster while abstracting away many of the underlying implementation details.  Spark has been designed with a focus on scalability and efficiency.  *With Spark you can begin developing your solution on your laptop, using a small dataset, and then use that same code to process terabytes or even petabytes across a distributed cluster.*
#### **During this tutorial we will cover:**
#### * *Part 1:* Basic notebook usage and [Python](https://docs.python.org/2/) integration
#### * *Part 2:* An introduction to using [Apache Spark](https://spark.apache.org/) with the [PySpark SQL API](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark-sql-module) running in a notebook
#### * *Part 3:* Using DataFrames and chaining together transformations and actions
#### * *Part 4*: Python Lambda functions and User Defined Functions
#### * *Part 5:* Additional DataFrame actions
#### * *Part 6:* Additional DataFrame transformations
#### * *Part 7:* Unioning and joining DataFrames
#### * *Part 8:* Caching DataFrames and storage options


###  The following methods will be covered:
* `createDataFrame()`, `read()`, `write()`

####  The following transformations will be covered:
* `select()`, `filter()`, `distinct()`, `dropDuplicates()`, `orderBy()`, `groupBy()`, `withColumn()`, `withColumnRenamed()`, `sample()`, `union()`, `join()`

###  The following actions will be covered:
* `first()`, `take()`, `count()`, `collect()`, `show()`

####  Also covered:
* `cache()`, `unpersist()`

#### Note that, for reference, you can look up the details of these methods in the [Spark's PySpark SQL API](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark-sql-module)

### **Part 1: Basic notebook usage and [Python](https://docs.python.org/2/) integration **

#### **(1a) Notebook usage**
#### A notebook is comprised of a linear sequence of cells.  These cells can contain either markdown or code, but we won't mix both in one cell.  When a markdown cell is executed it renders formatted text, images, and links just like HTML in a normal webpage.  The text you are reading right now is part of a markdown cell.  Python code cells allow you to execute arbitrary Python commands just like in any Python shell. Place your cursor inside the cell below, and press "Shift" + "Enter" to execute the code and advance to the next cell.  You can also press "Ctrl" + "Enter" to execute the code and remain in the cell.  These commands work the same in both markdown and code cells.

In [1]:
# This is a Python cell. You can run normal Python code here...
print('The sum of 1 and 1 is ' + str(1+1))

The sum of 1 and 1 is 2


In [2]:
# Here is another Python cell, this time with a variable (x) declaration and an if statement:
x = 42
if x > 40:
    print('The value of x is ' + str(x))

The value of x is 42


#### **(1b) Notebook state**
#### As you work through a notebook it is important that you run all of the code cells.  The notebook is stateful, which means that variables and their values are retained until the kernel is restarted.  If you do not run all of the code cells as you proceed through the notebook, your variables will not be properly initialized and later code might fail.  You will also need to rerun any cells that you have modified in order for the changes to be available to other cells.

In [3]:
# This cell relies on x being defined already.
# If we didn't run the cells from part (1a) this code would fail.
print(x * 2)

84


#### **(1c) Library imports**
#### We can import standard Python libraries ([modules](https://docs.python.org/2/tutorial/modules.html)) the usual way.  An `import` statement will import the specified module.  In this tutorial and future labs, we will provide any imports that are necessary.

In [4]:
# Import the regular expression library
import re
m = re.search('(?<=abc)def', 'abcdef')
m.group(0)

'def'

In [5]:
# Import the datetime library
import datetime
print('This was last run on: ' + str(datetime.datetime.now()))

This was last run on: 2019-05-02 06:53:02.196911


### **Part 2: An introduction to using [Apache Spark](https://spark.apache.org/) with the [PySpark SQL API](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark-sql-module) running in a notebook**

### Spark Context

#### In Spark, communication occurs between a driver and executors.  The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion.  The results from these tasks are delivered back to the driver.

#### In part 1, we saw that normal Python code can be executed via cells. When using  Jupyter notebook it is executed within the kernel associated with the notebook. Since no Spark functionality is actually being used, no tasks are launched on the executors.

#### In order to use Spark and its DataFrame API we will need to use a `SQLContext`.  When running Spark, you start a new Spark application by creating a [SparkContext](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext). You can then create a [SQLContext](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext) from the `SparkContext`. When the `SparkContext` is created, it asks the master for some cores to use to do work.  The master sets these cores aside just for you; they won't be used for other applications. When using a notebook, both a `SparkContext` and a `SQLContext` are created for you automatically. `sc` is your `SparkContext`, and `sqlContext` is your `SQLContext`.

#### Starting from Spark 2.0, the seperate creation of your `SparkContext` and `SQLContext` is not necessary anymore. By creating a [`SparkSession`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.newSession), both are automatically created within the SparkSession. The SparkSession is a new session, that has separate SQLConf, registered temporary views and UDFs, but shared SparkContext and table cache.

#### In Notebooks (Jupyter, Zeppelin, etc.) and CLIs (pyspark, spark-shell, etc.) the contexts and sessions are automatically created at the start of the session. 

#### If you run an application the following lines of code should be included to create the contexts and sessions:

##### # Import necessary pyspark and pyspark.sql packages
from pyspark import SparkContext 
<br /> 
from pyspark.sql import SparkSession, SQLContext

##### # create Spark Context as sc
sc = SparkContext(appName = "First steps in Spark")

##### # create SQL Context as sc
sqlContext = SQLContext(sc)

##### Create spark session
spark = SparkSession.builder.master("local").appName("First steps in Spark").getOrCreate()

### (2a) Example Cluster
#### The diagram shows an example cluster, where the slots allocated for an application are outlined in purple. (Note: We're using the term _slots_ here to indicate threads available to perform parallel work for Spark.) Spark documentation often refers to these threads as _cores_, which is a confusing term, as the number of slots available on a particular machine does not necessarily have any relationship to the number of physical CPU cores on that machine.)

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-2a.png" style="height: 800px"/>
 
####  You can view the details of your Spark application in the Spark web UI on *IP*:4040.  In the web UI, under the "Jobs" tab, you can see a list of jobs that have been scheduled or run.  It's likely there isn't any thing interesting here yet because we haven't run any jobs, but we'll return to this page later. 
####  At a high level, every Spark application consists of a driver program that launches various parallel operations on executor Java Virtual Machines (JVMs) running either in a cluster or locally on the same machine.  When running locally, `pyspark` is the driver program. In all cases, this driver program contains the main loop for the program and creates distributed datasets on the cluster, then applies operations (transformations & actions) to those datasets.
####  Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster. A Spark SQL context object (`sqlContext`) is the main entry point for Spark DataFrame and SQL functionality. A `SQLContext` can be used to create DataFrames, which allows you to direct the operations on your data.
 
####  Try printing out `sqlContext` to see its type.

In [6]:
# Display the type of the Spark sqlContext
type(sqlContext)

pyspark.sql.context.SQLContext

#### that the type is `HiveContext`. This means we're working with a version of Spark that has Hive support. Compiling Spark with Hive support is a good idea, even if you don't have a Hive metastore. As the
#### [Spark Programming Guide](http://spark.apache.org/docs/latest/sql-programming-guide.html#starting-point-sqlcontext) states, a `HiveContext` "provides a superset of the functionality provided by the basic `SQLContext`. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs [user-defined functions], and the ability to read data from Hive tables. To use a `HiveContext`, you do not need to have an existing Hive setup, and all of the data sources available to a `SQLContext` are still available."

### (2b) SparkContext attributes
####  You can use Python's [dir()](https://docs.python.org/2/library/functions.html?highlight=dir#dir) function to get a list of all the attributes (including methods) accessible through the `sqlContext` object.

In [7]:
dir(sqlContext)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_conf',
 '_inferSchema',
 '_instantiatedContext',
 '_jsc',
 '_jsqlContext',
 '_jvm',
 '_sc',
 '_ssql_ctx',
 'cacheTable',
 'clearCache',
 'createDataFrame',
 'createExternalTable',
 'dropTempTable',
 'getConf',
 'getOrCreate',
 'newSession',
 'range',
 'read',
 'readStream',
 'registerDataFrameAsTable',
 'registerFunction',
 'registerJavaFunction',
 'setConf',
 'sparkSession',
 'sql',
 'streams',
 'table',
 'tableNames',
 'tables',
 'udf',
 'uncacheTable']

#### Outside of `pyspark` or a notebook, `SQLContext` is created from the lower-level `SparkContext`, which is usually used to create Resilient Distributed Datasets (RDDs). An RDD is the way Spark actually represents data internally; DataFrames are actually implemented in terms of RDDs.

#### While you can interact directly with RDDs, DataFrames are preferred. They're generally faster, and they perform the same no matter what language (Python, R, Scala or Java) you use with Spark.

#### In this course, we'll be using DataFrames, so we won't be interacting directly with the Spark Context object very much. However, it's worth knowing that inside `pyspark` or a notebook, you already have an existing `SparkContext` in the `sc` variable. One simple thing we can do with `sc` is check the version of Spark we're using:

In [8]:
# After reading the help we've decided we want to use sc.version to see what version of Spark we are running
sc.version

'2.4.0'

In [10]:
sc

In [11]:
spark

In [12]:
# Help can be used on any Python object
help(list)

Help on class list in module builtins:

class list(object)
 |  list() -> new empty list
 |  list(iterable) -> new list initialized from iterable's items
 |  
 |  Methods defined here:
 |  
 |  __add__(self, value, /)
 |      Return self+value.
 |  
 |  __contains__(self, key, /)
 |      Return key in self.
 |  
 |  __delitem__(self, key, /)
 |      Delete self[key].
 |  
 |  __eq__(self, value, /)
 |      Return self==value.
 |  
 |  __ge__(self, value, /)
 |      Return self>=value.
 |  
 |  __getattribute__(self, name, /)
 |      Return getattr(self, name).
 |  
 |  __getitem__(...)
 |      x.__getitem__(y) <==> x[y]
 |  
 |  __gt__(self, value, /)
 |      Return self>value.
 |  
 |  __iadd__(self, value, /)
 |      Implement self+=value.
 |  
 |  __imul__(self, value, /)
 |      Implement self*=value.
 |  
 |  __init__(self, /, *args, **kwargs)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  __iter__(self, /)
 |      Implement iter(self).
 |  
 |  __l

## **Part 3: Using DataFrames and chaining together transformations and actions**

### Working with your first DataFrames
 
#### In Spark, we first create a base [DataFrame](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame). We can then apply one or more transformations to that base DataFrame. *A DataFrame is immutable, so once it is created, it cannot be changed.* As a result, each transformation creates a new DataFrame. Finally, we can apply one or more actions to the DataFrames.
#### > Note that Spark uses lazy evaluation, so transformations are not actually executed until an action occurs.

#### We will perform several exercises to obtain a better understanding of DataFrames:
#### * Create a Python collection of 10,000 integers
#### * Create a Spark DataFrame from that collection
#### * Create a Spark Dataframe from a source
#### * Subtract one from each value using `map`
#### * Perform action `collect` to view results
#### * Perform action `count` to view counts
#### * Apply transformation `filter` and view results with `collect`
#### * Learn about lambda functions
#### * Explore how lazy evaluation works and the debugging challenges that it introduces
#### * Write to a file

#### A DataFrame consists of a series of `Row` objects; each `Row` object has a set of named columns. You can think of a DataFrame as modeling a table, though the data source being processed does not have to be a table.
 
#### More formally, a DataFrame must have a _schema_, which means it must consist of columns, each of which has a _name_ and a _type_. Some data sources have schemas built into them. Examples include RDBMS databases, Parquet files, and NoSQL databases like Cassandra. Other data sources don't have computer-readable schemas, but you can often apply a schema programmatically.

### (3a) Create a Python collection of 10,000 people

#### We will use a third-party Python testing library called [fake-factory](https://pypi.python.org/pypi/fake-factory/0.5.3) to create a collection of fake person records.

In [12]:
#%%sh
#pip3 install faker

In [13]:
from faker import Factory
fake = Factory.create()
fake.seed(4321)

#### We're going to use this factory to create a collection of randomly generated people records. In the next section, we'll turn that collection into a DataFrame. We'll use a Python tuple to help us define the Spark DataFrame schema. There are other ways to define schemas, though; see the Spark Programming Guide's discussion of [schema inference](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection) for more information. (For instance,
we could also use a Python `namedtuple` or a Spark `Row` object.)

In [14]:
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
  name = fake.name().split()
  return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)

In [15]:
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
    for _ in range(times):
        yield func(*args, **kwargs)

In [16]:
data = list(repeat(10000, fake_entry))

#### `data` is just a normal Python list, containing Python tuples objects. Let's look at the first item in the list:

10000

### (3b) Distributed data and using a collection to create a DataFrame

#### In Spark, datasets are represented as a list of entries, where the list is broken up into many different partitions that are each stored on a different machine.  Each partition holds a unique subset of the entries in the list.  Spark calls datasets that it stores "Resilient Distributed Datasets" (RDDs). Even DataFrames are ultimately represented as RDDs, with additional meta-data.
 
#### <img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3b.png" style="width: 900px; margin: 5px"/>

#### One of the defining features of Spark, compared to other data analytics frameworks (e.g., Hadoop), is that it stores data in memory rather than on disk.  This allows Spark applications to run much more quickly, because they are not slowed down by needing to read data from disk.
#### The figure to the right illustrates how Spark breaks a list of data entries into partitions that are each stored in memory on a worker.
 
 
####  To create the DataFrame, we'll use `spark.createDataFrame()`, and we'll pass our array of data in as an argument to that function. Spark will create a new set of input data based on data that is passed in.  A DataFrame requires a _schema_, which is a list of columns, where each column has a name and a type. Our list of data has elements with types (mostly strings, but one integer). We'll supply the rest of the schema and the column names as the second argument to `createDataFrame()`.

#### The [`createDataFrame()` method](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame) can take an RDD, Python list or Python Pandas DafaFrame as input.  

#### Let's view the help for `createDataFrame()`.

In [18]:
help(spark.createDataFrame)

Help on method createDataFrame in module pyspark.sql.session:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True) method of pyspark.sql.session.SparkSession instance
    Creates a :class:`DataFrame` from an :class:`RDD`, a list or a :class:`pandas.DataFrame`.
    
    When ``schema`` is a list of column names, the type of each column
    will be inferred from ``data``.
    
    When ``schema`` is ``None``, it will try to infer the schema (column names and types)
    from ``data``, which should be an RDD of :class:`Row`,
    or :class:`namedtuple`, or :class:`dict`.
    
    When ``schema`` is :class:`pyspark.sql.types.DataType` or a datatype string, it must match
    the real data, or an exception will be thrown at runtime. If the given schema is not
    :class:`pyspark.sql.types.StructType`, it will be wrapped into a
    :class:`pyspark.sql.types.StructType` as its only field, and the field name will be "value",
    each record will also be wrapped into a tuple,

In [20]:
dataDF = spark.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))

#### Let's see what type `spark.createDataFrame()` returned.

In [21]:
type(dataDF)

pyspark.sql.dataframe.DataFrame

#### Let's take a look at the DataFrame's schema and some of its rows.

In [22]:
dataDF.printSchema()

root
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- ssn: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- age: long (nullable = true)



In [29]:
dataDF.show()

+----------+----------+-----------+--------------------+---+
| last_name|first_name|        ssn|          occupation|age|
+----------+----------+-----------+--------------------+---+
|     Brown|     Jason|182-83-5988|Building services...|  2|
|     Brown|      Cody|298-53-9877|Dance movement ps...|  3|
| Hendricks|   Jessica|461-01-1867|Learning disabili...| 18|
|     Baker|     Scott|603-81-3455|Maintenance engineer| 18|
|   Alvarez|     Sarah|304-30-5738|          Geochemist| 40|
|Montgomery|    George|200-45-4002|      Phytotherapist| 27|
|    Brooks|     Jesse|674-90-7121|  Secretary, company| 27|
|     Davis|   Valerie|093-81-5058|          Aid worker| 34|
|  Johnston|    Andrea|405-71-6928|Engineer, production| 40|
| Zimmerman|      Mark|285-09-1368|  Personal assistant|  7|
|     Drake|     Sheri|432-75-9180|Biomedical scientist|  6|
|  Campbell| Stephanie|287-31-8774|Chartered certifi...| 15|
|    Murphy|    Steven|885-84-0932|Commercial/reside...| 25|
|  Andersen|   Lindsay|2

### (3d) Create a Spark Dataframe from a source

#### Another way to create Spark Dataframes is using the `read` method that allows you to read in data from sources like HDFS, local file storage, but also external DBs like MongoDB, Cassandra, PostgreSQL, etc.. The `read` method does natively support Parquet, Avro and all other well-known file formats like csv, xml, json, etc.

#### As specified, Spark can also read from external storage frameworks like MongoDB, Cassandra, PostgreSQL, etc. For reading in data from these sources, specific connectors are created. Most of them can be found and are well documented on the [_Spark Packages site_](https://spark-packages.org/), which is the equivalent of to [_R Packages site_](https://cran.r-project.org/) for Spark.

In [30]:
# Reading from local file

irisDF = spark.read.option("header", True).option("sep", ",").csv("/data/iris.csv")
irisDF.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1| 

### (3e) Transform DF to rdd and find out how many partitions will the DataFrame be split into?

In [31]:
dataDF.rdd.getNumPartitions()

4

###### A note about DataFrames and queries

When you use DataFrames or Spark SQL, you are building up a _query plan_. Each transformation you apply to a DataFrame adds some information to the query plan. When you finally call an action, which triggers execution of your Spark job, several things happen:

1. Spark's Catalyst optimizer analyzes the query plan (called an _unoptimized logical query plan_) and attempts to optimize it. Optimizations include (but aren't limited to) rearranging and combining `filter()` operations for efficiency, converting `Decimal` operations to more efficient long integer operations, and pushing some operations down into the data source (e.g., a `filter()` operation might be translated to a SQL `WHERE` clause, if the data source is a traditional SQL RDBMS). The result of this optimization phase is an _optimized logical plan_.
2. Once Catalyst has an optimized logical plan, it then constructs multiple _physical_ plans from it. Specifically, it implements the query in terms of lower level Spark RDD operations.
3. Catalyst chooses which physical plan to use via _cost optimization_. That is, it determines which physical plan is the most efficient (or least expensive), and uses that one.
4. Finally, once the physical RDD execution plan is established, Spark actually executes the job.

You can examine the query plan using the `explain()` function on a DataFrame. By default, `explain()` only shows you the final physical plan; however, if you pass it an argument of `True`, it will show you all phases.

(If you want to take a deeper dive into how Catalyst optimizes DataFrame queries, this blog post, while a little old, is an excellent overview: [Deep Dive into Spark SQL's Catalyst Optimizer](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html).)

Let's add a couple transformations to our DataFrame and look at the query plan on the resulting transformed DataFrame. Don't be too concerned if it looks like gibberish. As you gain more experience with Apache Spark, you'll begin to be able to use `explain()` to help you understand more about your DataFrame operations.

In [37]:
# Transform dataDF through a select transformation and rename the newly created '(age -1)' column to 'age'
# Because select is a transformation and Spark uses lazy evaluation, no jobs, stages,
# or tasks will be launched when we run this code.
subDF = dataDF.select('last_name', 'first_name', 'ssn', 'occupation',(dataDF.age - 1).alias('age'))
subDF.show()

+----------+----------+-----------+--------------------+---+
| last_name|first_name|        ssn|          occupation|age|
+----------+----------+-----------+--------------------+---+
|     Brown|     Jason|182-83-5988|Building services...|  1|
|     Brown|      Cody|298-53-9877|Dance movement ps...|  2|
| Hendricks|   Jessica|461-01-1867|Learning disabili...| 17|
|     Baker|     Scott|603-81-3455|Maintenance engineer| 17|
|   Alvarez|     Sarah|304-30-5738|          Geochemist| 39|
|Montgomery|    George|200-45-4002|      Phytotherapist| 26|
|    Brooks|     Jesse|674-90-7121|  Secretary, company| 26|
|     Davis|   Valerie|093-81-5058|          Aid worker| 33|
|  Johnston|    Andrea|405-71-6928|Engineer, production| 39|
| Zimmerman|      Mark|285-09-1368|  Personal assistant|  6|
|     Drake|     Sheri|432-75-9180|Biomedical scientist|  5|
|  Campbell| Stephanie|287-31-8774|Chartered certifi...| 14|
|    Murphy|    Steven|885-84-0932|Commercial/reside...| 24|
|  Andersen|   Lindsay|2

#### Let's take a look at the query plan.

In [38]:
subDF.explain(True)

== Parsed Logical Plan ==
'Project [unresolvedalias('last_name, None), unresolvedalias('first_name, None), unresolvedalias('ssn, None), unresolvedalias('occupation, None), (age#4L - 1) AS age#253]
+- LogicalRDD [last_name#0, first_name#1, ssn#2, occupation#3, age#4L], false

== Analyzed Logical Plan ==
last_name: string, first_name: string, ssn: string, occupation: string, age: bigint
Project [last_name#0, first_name#1, ssn#2, occupation#3, (age#4L - cast(1 as bigint)) AS age#253L]
+- LogicalRDD [last_name#0, first_name#1, ssn#2, occupation#3, age#4L], false

== Optimized Logical Plan ==
Project [last_name#0, first_name#1, ssn#2, occupation#3, (age#4L - 1) AS age#253L]
+- LogicalRDD [last_name#0, first_name#1, ssn#2, occupation#3, age#4L], false

== Physical Plan ==
*(1) Project [last_name#0, first_name#1, ssn#2, occupation#3, (age#4L - 1) AS age#253L]
+- Scan ExistingRDD[last_name#0,first_name#1,ssn#2,occupation#3,age#4L]


### (3f) Use _collect_ to view results

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3d.png" style="height:700px"/>

#### To see a list of elements decremented by one, we need to create a new list on the driver from the the data distributed in the executor nodes.  To do this we can call the `collect()` method on our DataFrame.  `collect()` is often used after transformations to ensure that we are only returning a *small* amount of data to the driver.  This is done because the data returned to the driver must fit into the driver's available memory.  If not, the driver will crash.

#### The `collect()` method is the first action operation that we have encountered.  Action operations cause Spark to perform the (lazy) transformation operations that are required to compute the values returned by the action.  In our example, this means that tasks will now be launched to perform the `createDataFrame`, `select`, and `collect` operations.

#### In the diagram, the dataset is broken into four partitions, so four `collect()` tasks are launched. Each task collects the entries in its partition and sends the result to the driver, which creates a list of the values, as shown in the figure below.
 
#### Now let's run `collect()` on `subDF`.

In [40]:
# Let's collect the data
results = subDF.take(2)
print(results)

[Row(last_name='Brown', first_name='Jason', ssn='182-83-5988', occupation='Building services engineer', age=1), Row(last_name='Brown', first_name='Cody', ssn='298-53-9877', occupation='Dance movement psychotherapist', age=2)]


#### A better way to visualize the data is to use the `show()` method. If you don't tell `show()` how many rows to display, it displays 20 rows.

In [45]:
subDF.show()

+----------+----------+-----------+--------------------+---+
| last_name|first_name|        ssn|          occupation|age|
+----------+----------+-----------+--------------------+---+
|     Brown|     Jason|182-83-5988|Building services...|  1|
|     Brown|      Cody|298-53-9877|Dance movement ps...|  2|
| Hendricks|   Jessica|461-01-1867|Learning disabili...| 17|
|     Baker|     Scott|603-81-3455|Maintenance engineer| 17|
|   Alvarez|     Sarah|304-30-5738|          Geochemist| 39|
|Montgomery|    George|200-45-4002|      Phytotherapist| 26|
|    Brooks|     Jesse|674-90-7121|  Secretary, company| 26|
|     Davis|   Valerie|093-81-5058|          Aid worker| 33|
|  Johnston|    Andrea|405-71-6928|Engineer, production| 39|
| Zimmerman|      Mark|285-09-1368|  Personal assistant|  6|
|     Drake|     Sheri|432-75-9180|Biomedical scientist|  5|
|  Campbell| Stephanie|287-31-8774|Chartered certifi...| 14|
|    Murphy|    Steven|885-84-0932|Commercial/reside...| 24|
|  Andersen|   Lindsay|2

#### If you'd prefer that `show()` not truncate the data, you can tell it not to:

In [46]:
subDF.show(n=30, truncate=False)

+----------+-----------+-----------+-----------------------------------------------------+---+
|last_name |first_name |ssn        |occupation                                           |age|
+----------+-----------+-----------+-----------------------------------------------------+---+
|Brown     |Jason      |182-83-5988|Building services engineer                           |1  |
|Brown     |Cody       |298-53-9877|Dance movement psychotherapist                       |2  |
|Hendricks |Jessica    |461-01-1867|Learning disability nurse                            |17 |
|Baker     |Scott      |603-81-3455|Maintenance engineer                                 |17 |
|Alvarez   |Sarah      |304-30-5738|Geochemist                                           |39 |
|Montgomery|George     |200-45-4002|Phytotherapist                                       |26 |
|Brooks    |Jesse      |674-90-7121|Secretary, company                                   |26 |
|Davis     |Valerie    |093-81-5058|Aid worker    

### (3g) Use _count_ to get total

#### One of the most basic jobs that we can run is the `count()` job which will count the number of elements in a DataFrame, using the `count()` action. Since `select()` creates a new DataFrame with the same number of elements as the starting DataFrame, we expect that applying `count()` to each DataFrame will return the same result.

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3e.png" style="height:700px"/>

#### Note that because `count()` is an action operation, if we had not already performed an action with `collect()`, then Spark would now perform the transformation operations when we executed `count()`.

#### Each task counts the entries in its partition and sends the result to your SparkContext, which adds up all of the counts. The figure on the right shows what would happen if we ran `count()` on a small example dataset with just four partitions.

In [47]:
print(dataDF.count())
print(subDF.count())

10000
10000


### (3f) Apply transformation _filter_ and view results with _show_

#### Next, we'll create a new DataFrame that only contains the people whose ages are less than 10. To do this, we'll use the `filter()` transformation. (You can also use `where()`, an alias for `filter()`, if you prefer something more SQL-like). The `filter()` method is a transformation operation that creates a new DataFrame from the input DataFrame, keeping only values that match the filter expression.

#### The figure shows how this might work on the small four-partition dataset.

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3f.png" style="height:700px"/>

In [51]:
filteredDF = subDF.filter(subDF.age < 10)
filteredDF.show(truncate=False)
filteredDF.count()

+----------+----------+-----------+-------------------------------------------------+---+
|last_name |first_name|ssn        |occupation                                       |age|
+----------+----------+-----------+-------------------------------------------------+---+
|Brown     |Jason     |182-83-5988|Building services engineer                       |1  |
|Brown     |Cody      |298-53-9877|Dance movement psychotherapist                   |2  |
|Zimmerman |Mark      |285-09-1368|Personal assistant                               |6  |
|Drake     |Sheri     |432-75-9180|Biomedical scientist                             |5  |
|Graves    |Samuel    |191-92-9664|Administrator, local government                  |2  |
|Hickman   |Scott     |282-67-0568|Architectural technologist                       |2  |
|King      |Matthew   |263-65-2624|Psychologist, prison and probation services      |7  |
|Lee       |Christina |072-60-9564|Music tutor                                      |9  |
|Jones    

2472

#### Alternetively you can do:

In [49]:
filteredDF = subDF.filter("age < 10")
print(filteredDF.count())

2472


### (3h) Write results to file

#### In part (3d) is  shown how a file is read in. Similarly, you can write a file to an external local file, file system (like Hadoop) or external storage framework.

#### By default Spark expects a connection to hdfs, but also a local file location can be specified. Additionally, by defaults, Spark writes a Parquet file. But also other popular file formats like Avro, csv, xml, or json could be specified.

#### As specified, Spark can also write to external storage frameworks like MongoDB, Cassandra, PostgreSQL, etc. For writing to data these sources, specific connectors are created. Most of them can be found and are well documented on the [_Spark Packages site_](https://spark-packages.org/), which is the equivalent of to [_R Packages site_](https://cran.r-project.org/) for Spark.

#### When writing a Spark dataframe to disk, this is always done distributed. This means that a directory is create (on each machine in the cluster) and whitin this directory partitions assinged to the node are stored.

In [52]:
### Write to local file (Parquet)

fileName = "file:///data/output/iris_out"

filteredDF.write.save(fileName)

In [53]:
### Write to local file (csv)
filteredDF.write.csv(fileName + "_csv")

> ** Note: You can see that 4 partitions are created. Which might not always be desired if you only need 1 single file. To save just one csv file, you can repartition the DataFrame before saving.

In [54]:
### Write to local file (csv): 1 partition
filteredDF.repartition(1).write.csv(fileName + "_csv_one")

In [57]:
pandasDF = filteredDF.toPandas()

pandasDF.to_csv("/data/output/pandasDF.csv")

## Part 4: Python Lambda functions and User Defined Functions

#### Python supports the use of small one-line anonymous functions that are not bound to a name at runtime.

#### `lambda` functions, borrowed from LISP, can be used wherever function objects are required. They are syntactically restricted to a single expression. Remember that `lambda` functions are a matter of style and using them is never required - semantically, they are just syntactic sugar for a normal function definition. You can always define a separate normal function instead, but using a `lambda` function is an equivalent and more compact form of coding. Ideally you should consider using `lambda` functions where you want to encapsulate non-reusable code without littering your code with one-line functions.

#### Here, instead of defining a separate function for the `filter()` transformation, we will use an inline `lambda()` function and we will register that lambda as a Spark _User Defined Function_ (UDF). A UDF is a special wrapper around a function, allowing the function to be used in a DataFrame query.

In [None]:
output = filter(lambda a: a < 10, input)

In [61]:
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf

less_ten = udf(lambda s: s < 10, BooleanType())
lambdaDF = dataDF.filter(less_ten(dataDF.age))
lambdaDF.show()
lambdaDF.count()

+----------+----------+-----------+--------------------+---+
| last_name|first_name|        ssn|          occupation|age|
+----------+----------+-----------+--------------------+---+
|     Brown|     Jason|182-83-5988|Building services...|  2|
|     Brown|      Cody|298-53-9877|Dance movement ps...|  3|
| Zimmerman|      Mark|285-09-1368|  Personal assistant|  7|
|     Drake|     Sheri|432-75-9180|Biomedical scientist|  6|
|    Graves|    Samuel|191-92-9664|Administrator, lo...|  3|
|   Hickman|     Scott|282-67-0568|Architectural tec...|  3|
|      King|   Matthew|263-65-2624|Psychologist, pri...|  8|
|     Jones|   Timothy|215-04-0471| Dispensing optician|  2|
|     Burns|       Roy|200-09-8924|Commercial art ga...|  1|
|       Gay|      John|166-25-3954|        Risk analyst|  3|
|  Mckinney|     Jared|070-71-1782| Early years teacher|  2|
|   Russell|    Sandra|717-41-8980|Radio broadcast a...|  2|
|    Branch|      Jose|552-48-7277|Production assist...|  2|
|      Hall|   Belinda|2

2262

In [62]:
# Let's show the even values less than 10
even = udf(lambda s: s % 2 == 0, BooleanType())
evenDF = lambdaDF.filter(even(lambdaDF.age))
evenDF.show()
evenDF.count()

+----------+----------+-----------+--------------------+---+
| last_name|first_name|        ssn|          occupation|age|
+----------+----------+-----------+--------------------+---+
|     Brown|     Jason|182-83-5988|Building services...|  2|
|     Drake|     Sheri|432-75-9180|Biomedical scientist|  6|
|      King|   Matthew|263-65-2624|Psychologist, pri...|  8|
|     Jones|   Timothy|215-04-0471| Dispensing optician|  2|
|  Mckinney|     Jared|070-71-1782| Early years teacher|  2|
|   Russell|    Sandra|717-41-8980|Radio broadcast a...|  2|
|    Branch|      Jose|552-48-7277|Production assist...|  2|
|      Hall|   Belinda|278-41-1849|Surveyor, buildin...|  4|
|      Hill|   Chelsea|695-10-4686|Research scientis...|  6|
|Harrington|     James|475-20-7259|Historic building...|  4|
|  Mitchell|   Michael|179-13-8207|International aid...|  6|
|    Miller|     James|008-77-3552|Scientist, resear...|  2|
|   Alvarez|   Matthew|266-76-2718|Engineer, broadca...|  4|
|  Sullivan|     Diana|2

1090

In [64]:
def brokenTen(value):
    """
    Args:
        value (int): A number.

    Returns:
        bool: Whether `value` is less than ten.
    """
    if (value < 10):
        return True
    else:
        return False

btUDF = udf(brokenTen)
brokenDF = subDF.filter(btUDF(subDF.age) == True)
brokenDF.show()

+----------+----------+-----------+--------------------+---+
| last_name|first_name|        ssn|          occupation|age|
+----------+----------+-----------+--------------------+---+
|     Brown|     Jason|182-83-5988|Building services...|  1|
|     Brown|      Cody|298-53-9877|Dance movement ps...|  2|
| Zimmerman|      Mark|285-09-1368|  Personal assistant|  6|
|     Drake|     Sheri|432-75-9180|Biomedical scientist|  5|
|    Graves|    Samuel|191-92-9664|Administrator, lo...|  2|
|   Hickman|     Scott|282-67-0568|Architectural tec...|  2|
|      King|   Matthew|263-65-2624|Psychologist, pri...|  7|
|       Lee| Christina|072-60-9564|         Music tutor|  9|
|     Jones|   Timothy|215-04-0471| Dispensing optician|  1|
|     Burns|       Roy|200-09-8924|Commercial art ga...|  0|
|       Gay|      John|166-25-3954|        Risk analyst|  2|
|  Mckinney|     Jared|070-71-1782| Early years teacher|  1|
|   Russell|    Sandra|717-41-8980|Radio broadcast a...|  1|
|    Branch|      Jose|5

## Part 5: Additional DataFrame actions

#### Let's investigate some additional actions:

* [first()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.first)
* [take()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.take)

#### MAGIC One useful thing to do when we have a new dataset is to look at the first few entries to obtain a rough idea of what information is available.  In Spark, we can do that using actions like `first()`, `take()`, and `show()`. Note that for the `first()` and `take()` actions, the elements that are returned depend on how the DataFrame is *partitioned*.

#### MAGIC Instead of using the `collect()` action, we can use the `take(n)` action to return the first _n_ elements of the DataFrame. The `first()` action returns the first element of a DataFrame, and is equivalent to `take(1)[0]`.

In [69]:
print("first: " + str(filteredDF.first()))

first: Row(last_name='Brown', first_name='Jason', ssn='182-83-5988', occupation='Building services engineer', age=1)


In [72]:
print("Four of them: " + str(filteredDF.take(4)))

Four of them: [Row(last_name='Brown', first_name='Jason', ssn='182-83-5988', occupation='Building services engineer', age=1), Row(last_name='Brown', first_name='Cody', ssn='298-53-9877', occupation='Dance movement psychotherapist', age=2), Row(last_name='Zimmerman', first_name='Mark', ssn='285-09-1368', occupation='Personal assistant', age=6), Row(last_name='Drake', first_name='Sheri', ssn='432-75-9180', occupation='Biomedical scientist', age=5)]


## Part 6: Additional DataFrame transformations

## (6a) _orderBy_

####[`orderBy()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.distinct) allows you to sort a DataFrame by one or more columns, producing a new DataFrame.

#### For example, let's get the first five oldest people in the original (unfiltered) DataFrame. We can use the `orderBy()` transformation. `orderBy` takes one or more columns, either as _names_ (strings) or as `Column` objects. To get a `Column` object, we use one of two notations on the DataFrame:

#### * Pandas-style notation: `filteredDF.age`
#### * Subscript notation: `filteredDF['age']`

#### Both of those syntaxes return a `Column`, which has additional methods like `desc()` (for sorting in descending order) or `asc()` (for sorting in ascending order, which is the default).

#### Here are some examples:

```
dataDF.orderBy(dataDF['age'])  # sort by age in ascending order; returns a new DataFrame
dataDF.orderBy(dataDF.last_name.desc()) # sort by last name in descending order
 ```

In [81]:
# Get the five oldest people in the list. To do that, sort by age in descending order.
dataDF.orderBy(dataDF.age.desc()).show(5)

+---------+----------+-----------+--------------------+---+
|last_name|first_name|        ssn|          occupation|age|
+---------+----------+-----------+--------------------+---+
|  Ramirez|       Roy|391-25-9789|Conservation offi...| 47|
|    Foley|    Gerald|337-87-0606|             Curator| 47|
|  Johnson|   Vincent|809-40-1297|Intelligence analyst| 47|
|    Hobbs|     Jaime|210-13-4378|         Ship broker| 47|
|    Smith|   Kristen|205-33-2744|          Contractor| 47|
+---------+----------+-----------+--------------------+---+
only showing top 5 rows



#### Let's reverse the sort order. Since ascending sort is the default, we can actually use a `Column` object expression or a simple string, in this case. The `desc()` and `asc()` methods are only defined on `Column`. Something like `orderBy('age'.desc())` would not work, because there's no `desc()` method on Python string objects. That's why we needed the column expression. But if we're just using the defaults, we can pass a string column name into `orderBy()`. This is sometimes easier to read.

In [83]:
dataDF.orderBy('age').show(5)

+---------+----------+-----------+--------------------+---+
|last_name|first_name|        ssn|          occupation|age|
+---------+----------+-----------+--------------------+---+
|   Newman|   Michael|845-68-5707|Clinical psycholo...|  1|
|Mccormick|     David|139-63-5401|Logistics and dis...|  1|
|   Morris| Stephanie|143-50-7887|Regulatory affair...|  1|
|     Hall|     Renee|765-37-8660| Mental health nurse|  1|
|   Burton|     Holly|574-81-9974|Glass blower/desi...|  1|
+---------+----------+-----------+--------------------+---+
only showing top 5 rows



### (6b) _distinct_ and _dropDuplicates_

#### [`distinct()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.distinct) filters out duplicate rows, and it considers all columns. Since our data is completely randomly generated (by `fake-factory`), it's extremely unlikely that there are any duplicate rows:

In [88]:
print(dataDF.count())
dataDF.select("age").distinct().orderBy('age').show(50)

10000
+---+
|age|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 20|
| 21|
| 22|
| 23|
| 24|
| 25|
| 26|
| 27|
| 28|
| 29|
| 30|
| 31|
| 32|
| 33|
| 34|
| 35|
| 36|
| 37|
| 38|
| 39|
| 40|
| 41|
| 42|
| 43|
| 44|
| 45|
| 46|
| 47|
+---+



#### To demonstrate `distinct()`, let's create a quick throwaway dataset.

In [89]:
tempDF = spark.createDataFrame([("Joe", 1), ("Joe", 1), ("Anna", 15), ("Anna", 12), ("Ravi", 5)], ('name', 'score'))

In [90]:
tempDF.show()

+----+-----+
|name|score|
+----+-----+
| Joe|    1|
| Joe|    1|
|Anna|   15|
|Anna|   12|
|Ravi|    5|
+----+-----+



In [91]:
tempDF.distinct().show()

+----+-----+
|name|score|
+----+-----+
| Joe|    1|
|Ravi|    5|
|Anna|   12|
|Anna|   15|
+----+-----+



#### Note that one of the ("Joe", 1) rows was deleted, but both rows with name "Anna" were kept, because all columns in a row must match another row for it to be considered a duplicate.

#### [`dropDuplicates()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates) is like `distinct()`, except that it allows us to specify the columns to compare. For instance, we can use it to drop all rows where the first name and last name duplicates (ignoring the occupation and age columns).

In [92]:
print(dataDF.count())
print(dataDF.dropDuplicates(['first_name', 'last_name']).count())

10000
9321


### (6c) _drop_
 
#### [`drop()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.drop) is like the opposite of `select()`: Instead of selecting specific columns from a DataFrame, it drops a specifed column from a DataFrame.

####  Here's a simple use case: Suppose you're reading from a 1,000-column CSV file, and you have to get rid of five of the columns. Instead of selecting 995 of the columns, it's easier just to drop the five you don't want.

In [94]:
dataDF.drop('occupation').drop('age').show()

+----------+----------+-----------+
| last_name|first_name|        ssn|
+----------+----------+-----------+
|     Brown|     Jason|182-83-5988|
|     Brown|      Cody|298-53-9877|
| Hendricks|   Jessica|461-01-1867|
|     Baker|     Scott|603-81-3455|
|   Alvarez|     Sarah|304-30-5738|
|Montgomery|    George|200-45-4002|
|    Brooks|     Jesse|674-90-7121|
|     Davis|   Valerie|093-81-5058|
|  Johnston|    Andrea|405-71-6928|
| Zimmerman|      Mark|285-09-1368|
|     Drake|     Sheri|432-75-9180|
|  Campbell| Stephanie|287-31-8774|
|    Murphy|    Steven|885-84-0932|
|  Andersen|   Lindsay|299-52-4282|
|     Clark|   Claudia|572-15-9413|
|  Anderson|   Michael|592-79-1854|
|    Carter|   Bradley|750-85-4885|
|     Lyons|     Chris|795-90-8059|
|   Gregory| Stephanie|800-68-1097|
|    Garcia|      Anna|528-14-5681|
+----------+----------+-----------+
only showing top 20 rows



### (6d) _groupBy_

#### [`groupBy()`]((http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy) is one of the most powerful transformations. It allows you to perform aggregations on a DataFrame.

#### Unlike other DataFrame transformations, `groupBy()` does _not_ return a DataFrame. Instead, it returns a special [GroupedData](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) object that contains various aggregation functions.

#### The most commonly used aggregation function is [count()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.count),
#### but there are others (like [sum()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.sum), [max()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.max), and [avg()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.avg).

#### These aggregation functions typically create a new column and return a new DataFrame.

In [96]:
dataDF.groupBy('occupation').count().orderBy("count", ascending=False).show(truncate=False)

+-------------------------------------------+-----+
|occupation                                 |count|
+-------------------------------------------+-----+
|Surveyor, hydrographic                     |28   |
|Surveyor, minerals                         |28   |
|Psychologist, clinical                     |28   |
|Tourist information centre manager         |27   |
|Architectural technologist                 |26   |
|Librarian, public                          |26   |
|Air traffic controller                     |26   |
|Product manager                            |26   |
|Trading standards officer                  |25   |
|Media planner                              |25   |
|Company secretary                          |25   |
|Insurance claims handler                   |25   |
|Charity officer                            |25   |
|Video editor                               |25   |
|Armed forces training and education officer|24   |
|Herbalist                                  |24   |
|Environment

In [101]:
dataDF.groupBy().avg('age').show(truncate=False)

+---------------------------------------+------------------+
|occupation                             |avg(age)          |
+---------------------------------------+------------------+
|Retail merchandiser                    |27.095238095238095|
|Engineer, aeronautical                 |29.0              |
|Diplomatic Services operational officer|21.714285714285715|
|Designer, ceramics/pottery             |15.642857142857142|
|Librarian, academic                    |18.615384615384617|
|Catering manager                       |25.875            |
|Early years teacher                    |19.235294117647058|
|English as a second language teacher   |16.142857142857142|
|Primary school teacher                 |20.09090909090909 |
|Occupational hygienist                 |22.4              |
|Patent examiner                        |21.41176470588235 |
|Control and instrumentation engineer   |22.157894736842106|
|Clinical molecular geneticist          |24.692307692307693|
|Estate agent           

#### We can also use `groupBy()` to do aother useful aggregations:

In [102]:
print("Maximum age: " + str(dataDF.groupBy().max('age').first()[0]))
print("Minimum age: " + str(dataDF.groupBy().min('age').first()[0]))

Maximum age: 47
Minimum age: 1


### (6e) _sample_ (optional)

#### When analyzing data, the [`sample()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sample) transformation is often quite useful. It returns a new DataFrame with a random sample of elements from the dataset.  It takes in a `withReplacement` argument, which specifies whether it is okay to randomly pick the same item multiple times from the parent DataFrame (so when `withReplacement=True`, you can get the same item back multiple times). It takes in a `fraction` parameter, which specifies the fraction elements in the dataset you want to return. (So a `fraction` value of `0.20` returns 20% of the elements in the DataFrame.) It also takes an optional `seed` parameter that allows you to specify a seed value for the random number generator, so that reproducible results can be obtained.

In [107]:
sampledDF = dataDF.sample(withReplacement=False, fraction=0.10)
print(sampledDF.count())
sampledDF.show()

957
+---------+-----------+-----------+--------------------+---+
|last_name| first_name|        ssn|          occupation|age|
+---------+-----------+-----------+--------------------+---+
|   Brooks|      Jesse|674-90-7121|  Secretary, company| 27|
| Campbell|  Stephanie|287-31-8774|Chartered certifi...| 15|
|  Gregory|  Stephanie|800-68-1097|          Orthoptist| 36|
|    Kelly|    Cynthia|406-47-0397|Education adminis...| 42|
|     King|    Matthew|263-65-2624|Psychologist, pri...|  8|
| Matthews|       Ryan|117-36-4219|                Make| 41|
|Hernandez|   Benjamin|730-95-2184|    Industrial buyer| 41|
|   Garner|      Louis|844-93-4799|  Wellsite geologist| 39|
| Johnston|    William|200-54-7515| Animal technologist| 37|
|   Taylor|       Tina|283-17-5670|Biomedical scientist| 23|
| Johnston|      Steve|547-49-1078|        Bonds trader| 44|
|     Diaz|Christopher|083-24-1465|      Pilot, airline| 28|
| Richards|      Sarah|138-14-9343|Trading standards...| 30|
|   Mendez|    Bradl

In [110]:
print(dataDF.sample(withReplacement=False, fraction=0.05).count())

513


### (6f) _withColumn_ and _withColumnRenamed_

#### [withColumn()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumn) is a transformation that is used to create a new column. This can be a constant column with a fix value, a column based on (an)other column(s). When using _withColumn()_, a new column will be added as last column to the dataframe. The _withColumn_ transformation takes two arguments:
```
a string depicting the new column name
a function describing how the column should be created (can be a predefined, lambda, or udf function)
```

#### Let's first create a new column that combines _last name_ and _first name_ into a new variable _name_

In [118]:
# Define a function to combine two strings
from pyspark.sql.types import StringType

def combineStrings(string1, string2):
    return string1 + " " + string2

udfCombineStrings = udf(combineStrings, StringType())

In [119]:
# Use withColumn
nameDF = dataDF.withColumn("name", udfCombineStrings("last_name", "first_name"))
nameDF.show(5)

+---------+----------+-----------+--------------------+---+-----------------+
|last_name|first_name|        ssn|          occupation|age|             name|
+---------+----------+-----------+--------------------+---+-----------------+
|    Brown|     Jason|182-83-5988|Building services...|  2|      Brown Jason|
|    Brown|      Cody|298-53-9877|Dance movement ps...|  3|       Brown Cody|
|Hendricks|   Jessica|461-01-1867|Learning disabili...| 18|Hendricks Jessica|
|    Baker|     Scott|603-81-3455|Maintenance engineer| 18|      Baker Scott|
|  Alvarez|     Sarah|304-30-5738|          Geochemist| 40|    Alvarez Sarah|
+---------+----------+-----------+--------------------+---+-----------------+
only showing top 5 rows



#### [withColumnRenamed()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumnRenamed) returns a new DataFrame by renaming an existing column. This is a no-op if schema doesn’t contain the given column name. The transformation takes two string arguments. The first string is the name of an existing variable in the dataframe. The second argument is a string signifying the ne name of the variable.

#### The variable _name_ might create confusion with *last_name*, so let's change it to *full_name*

In [120]:
nameDF = nameDF.withColumnRenamed("name", "full_name")
nameDF.show(5)

+---------+----------+-----------+--------------------+---+-----------------+
|last_name|first_name|        ssn|          occupation|age|        full_name|
+---------+----------+-----------+--------------------+---+-----------------+
|    Brown|     Jason|182-83-5988|Building services...|  2|      Brown Jason|
|    Brown|      Cody|298-53-9877|Dance movement ps...|  3|       Brown Cody|
|Hendricks|   Jessica|461-01-1867|Learning disabili...| 18|Hendricks Jessica|
|    Baker|     Scott|603-81-3455|Maintenance engineer| 18|      Baker Scott|
|  Alvarez|     Sarah|304-30-5738|          Geochemist| 40|    Alvarez Sarah|
+---------+----------+-----------+--------------------+---+-----------------+
only showing top 5 rows



## Part 7: Beyond 1 Dataframe: _union_ and _joins_

### (7a) Union
#### Imagine we want to stack two dataframes. To do this, we can use [union()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.union). union() returns a new DataFrame containing union of rows in this and another frame.

#### Notice that the two dataframe needs to have the same schema to be able to union them.

#### Let's us create two sample datasets of dataDF and union them.

In [121]:
# creation of samples
sampleDF1 = dataDF.sample(False, 0.01)
print(sampleDF1.count())
sampleDF2 = dataDF.sample(False, 0.01)
print(sampleDF2.count())

125
103


In [122]:
# Union
unionedDF = sampleDF1.union(sampleDF2)
unionedDF.count()

228

### (7b) Join
#### Like most other programming frameworks, it is possible to join dataframes in Spark. Nevertheless some caution is advised. Spark is a distributed framework and a dataframe is devided into multiple partitions. When joining dataframe this means, the framework needs to go and find matching records accross multiple partitions and nodes within the clusters. In other words, a lot of shuffles need to be done. As network bandwith is limited, join can be expensive within a distributed framework like Spark. 

#### Spark does already a fairly good job in constructing a query plan of how to optimizing joins. Nevertheless, the advice is to think twice before executing a join. Some tips:
* Try to keep the traffic across the network as limited as possible by filtering and sampling before the join operation.
* Try to use inner joins instead of outer join as the later are for more resource intensive.
* Try to put the smallest dataframe on the left of the join operation as this is more optimal in the query plan.

#### Within Spark most [join()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join) options like inner, left, right, and outer are available. The trainsformation takes at least 3 arguments:
* A dataframe to join with the first dataframe
* A column or expression use as variable to match both dataframes
* An string saying how the join() needs to be done. Possible join types can be found [here](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join).

#### Let's get started by creating two dataframes. One only containing _ssn_ and _name_ information and a second containing _ssn_ and _occupation_.

In [123]:
# Dataframe based on dataDF, so 10,000 records
ssnNameDF = dataDF.select("ssn", "last_name", "first_name")
ssnNameDF.show()
ssnNameDF.count()

+-----------+----------+----------+
|        ssn| last_name|first_name|
+-----------+----------+----------+
|182-83-5988|     Brown|     Jason|
|298-53-9877|     Brown|      Cody|
|461-01-1867| Hendricks|   Jessica|
|603-81-3455|     Baker|     Scott|
|304-30-5738|   Alvarez|     Sarah|
|200-45-4002|Montgomery|    George|
|674-90-7121|    Brooks|     Jesse|
|093-81-5058|     Davis|   Valerie|
|405-71-6928|  Johnston|    Andrea|
|285-09-1368| Zimmerman|      Mark|
|432-75-9180|     Drake|     Sheri|
|287-31-8774|  Campbell| Stephanie|
|885-84-0932|    Murphy|    Steven|
|299-52-4282|  Andersen|   Lindsay|
|572-15-9413|     Clark|   Claudia|
|592-79-1854|  Anderson|   Michael|
|750-85-4885|    Carter|   Bradley|
|795-90-8059|     Lyons|     Chris|
|800-68-1097|   Gregory| Stephanie|
|528-14-5681|    Garcia|      Anna|
+-----------+----------+----------+
only showing top 20 rows



10000

In [124]:
# Dataframe based on sampledDF, so only about 10% of records. This signifies only 10% of the people have a job.
ssnOccupationDF = sampledDF.select("ssn", "occupation")
ssnOccupationDF.show()
ssnOccupationDF.count()

+-----------+--------------------+
|        ssn|          occupation|
+-----------+--------------------+
|674-90-7121|  Secretary, company|
|287-31-8774|Chartered certifi...|
|800-68-1097|          Orthoptist|
|406-47-0397|Education adminis...|
|263-65-2624|Psychologist, pri...|
|117-36-4219|                Make|
|730-95-2184|    Industrial buyer|
|844-93-4799|  Wellsite geologist|
|200-54-7515| Animal technologist|
|283-17-5670|Biomedical scientist|
|547-49-1078|        Bonds trader|
|083-24-1465|      Pilot, airline|
|138-14-9343|Trading standards...|
|688-49-6147|Arts development ...|
|415-06-3785|English as a seco...|
|381-66-0713|           Ecologist|
|553-33-6292|   Financial adviser|
|310-62-9455|Chartered legal e...|
|170-80-5933|Horticultural the...|
|513-51-8267|     Publishing copy|
+-----------+--------------------+
only showing top 20 rows



957

#### Let us preform a join in which we are only interested in the people with an occupation, being an inner join

In [125]:
joinedDF = ssnOccupationDF.join(ssnNameDF, "ssn", "inner")
joinedDF.show(5)
joinedDF.count()

+-----------+--------------------+---------+----------+
|        ssn|          occupation|last_name|first_name|
+-----------+--------------------+---------+----------+
|641-37-0830|Clinical research...|   Wilson| Stephanie|
|651-50-0290|Adult guidance wo...|  Trevino|   Anthony|
|027-31-3289|Travel agency man...|   Wright|    Amanda|
|383-53-3448|Fast food restaur...| Johnston|    Sheila|
|741-85-6749|             Barista| Campbell|     Sarah|
+-----------+--------------------+---------+----------+
only showing top 5 rows



957

#### If we perform an outer join, we would get a table of 10,000 rows where occupation is missing for 90% of the people

In [128]:
joinedDF = ssnOccupationDF.join(ssnNameDF, "ssn", "right")
joinedDF.show(5, truncate=False)
joinedDF.count()

+-----------+----------+---------+----------+
|ssn        |occupation|last_name|first_name|
+-----------+----------+---------+----------+
|062-38-3809|null      |Parks    |Jacob     |
|078-99-4683|null      |Johnson  |Daniel    |
|091-06-2600|null      |Miller   |Roberto   |
|092-19-4852|null      |Rodriguez|Denise    |
|123-40-6247|null      |Gonzalez |Ronald    |
+-----------+----------+---------+----------+
only showing top 5 rows



10000

## Part 8: Caching DataFrames and storage options

### (8a) Caching DataFrames

#### For efficiency Spark keeps your DataFrames in memory. (More formally, it keeps the _RDDs_ that implement your DataFrames in memory.) By keeping the contents in memory, Spark can quickly access the data. However, memory is limited, so if you try to keep too many partitions in memory, Spark will automatically delete partitions from memory to make space for new ones. If you later refer to one of the deleted partitions, Spark will automatically recreate it for you, but that takes time.

#### So, if you plan to use a DataFrame more than once, then you should tell Spark to cache it. You can use the `cache()` operation to keep the DataFrame in memory. However, you must still trigger an action on the DataFrame, such as `collect()` or `count()` before the caching will occur. In other words, `cache()` is lazy: It merely tells Spark that the DataFrame should be cached _when the data is materialized_. You have to run an action to materialize the data; the DataFrame will be cached as a side effect. The next time you use the DataFrame, Spark will use the cached data, rather than recomputing the DataFrame from the original data.

#### You can see your cached DataFrame in the "Storage" section of the Spark web UI. If you click on the name value, you can see more information about where the the DataFrame is stored.

In [132]:
# Cache the DataFrame
filteredDF.persist()
# Trigger an action
print(filteredDF.count())
# Check if it is cached
print(filteredDF.is_cached)

Exception: storageLevel must be of type pyspark.StorageLevel

### (8b) Unpersist and storage options

#### Spark automatically manages the partitions cached in memory. If it has more partitions than available memory, by default, it will evict older partitions to make room for new ones. For efficiency, once you are finished using cached DataFrame, you can optionally tell Spark to stop caching it in memory by using the DataFrame's `unpersist()` method to inform Spark that you no longer need the cached data.

#### ** Advanced: ** Spark provides many more options for managing how DataFrames cached. For instance, you can tell Spark to spill cached partitions to disk when it runs out of memory, instead of simply throwing old ones away. You can explore the API for DataFrame's [persist()](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.persist) operation using Python's [help()](https://docs.python.org/2/library/functions.html?highlight=help#help) command.  The `persist()` operation, optionally, takes a pySpark [StorageLevel](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.StorageLevel) object.

In [130]:
# If we are done with the DataFrame we can unpersist it so that its memory can be reclaimed
filteredDF.unpersist()
# Check if it is cached
print(filteredDF.is_cached)

False
