
# Spark Exercises

Apache Spark&trade; allow you to use DataFrames to query large data files.

## In this lesson you:
* Create RDDs and apply transformations
* Create DataFrames and operate over them.
* Read data from multiple files
* Write data to multiple source using diferent file formats.


## Audience
* Primary Audience: Data Engineers and Data Scientists
* Secondary Audience: Data Analysts

## Prerequisites
* Web browser: Chrome or Firefox
* Spark Installed
* Git 


### Introducing RDDs and DataFrames

Under the covers, DataFrames are derived from data structures known as Resilient Distributed Datasets (RDDs). RDDs and DataFrames are immutable distributed collections of data. Let's take a closer look at what some of these terms mean before we understand how they relate to DataFrames:

* **Resilient**: They are fault tolerant, so if part of your operation fails, Spark  quickly recovers the lost computation.
* **Distributed**: RDDs are distributed across networked machines known as a cluster.
* **DataFrame**: A data structure where data is organized into named columns, like a table in a relational database, but with richer optimizations under the hood. 

Without the named columns and declared types provided by a schema, Spark wouldn't know how to optimize the executation of any computation. Since DataFrames have a schema, they use the Catalyst Optimizer to determine the optimal way to execute your code.

DataFrames were invented because the business community uses tables in a relational database, Pandas or R DataFrames, or Excel worksheets. A Spark DataFrame is conceptually equivalent to these, with richer optimizations under the hood and the benefit of being distributed across a cluster.

### Interacting with DataFrames

Once created (instantiated), a DataFrame object has methods attached to it. Methods are operations one can perform on DataFrames such as filtering,
counting, aggregating and many others.

> <b>Example</b>: To create (instantiate) a DataFrame, use this syntax: `df = ...`

To display the contents of the DataFrame, apply a `show` operation (method) on it using the syntax `df.show()`. 

The `.` indicates you are *applying a method on the object*.

In working with DataFrames, it is common to chain operations together, such as: `df.select().filter().orderBy()`.  

By chaining operations together, you don't need to save intermediate DataFrames into local variables (thereby avoiding the creation of extra objects).

Also note that you do not have to worry about how to order operations because the optimizier determines the optimal order of execution of the operations for you.

`df.select(...).orderBy(...).filter(...)`

versus

`df.filter(...).select(...).orderBy(...)`


### DataFrames and SQL

DataFrame syntax is more flexible than SQL syntax. Here we illustrate general usage patterns of SQL and DataFrames.

Suppose we have a data set we loaded as a table called `myTable` and an equivalent DataFrame, called `df`.
We have three fields/columns called `col_1` (numeric type), `col_2` (string type) and `col_3` (timestamp type)
Here are basic SQL operations and their DataFrame equivalents. 

Notice that columns in DataFrames are referenced by `col("<columnName>")`.

| SQL                                         | DataFrame (Python)                    |
| ------------------------------------------- | ------------------------------------- | 
| `SELECT col_1 FROM myTable`                 | `df.select(col("col_1"))`             | 
| `DESCRIBE myTable`                          | `df.printSchema()`                    | 
| `SELECT * FROM myTable WHERE col_1 > 0`     | `df.filter(col("col_1") > 0)`         | 
| `..GROUP BY col_2`                          | `..groupBy(col("col_2"))`             | 
| `..ORDER BY col_2`                          | `..orderBy(col("col_2"))`             | 
| `..WHERE year(col_3) > 1990`                | `..filter(year(col("col_3")) > 1990)` | 
| `SELECT * FROM myTable LIMIT 10`            | `df.limit(10)`                        |
| `display(myTable)` (text format)            | `df.show()`                           | 
| `display(myTable)` (html format)            | `display(df)`                         |

**Hint:** You can also run SQL queries with the special syntax `spark.sql("SELECT * FROM myTable")`

In this course you see many other usages of DataFrames. It is left up to you to figure out the SQL equivalents 
(left as exercises in some cases).

### Creating RDDs and DataFrames

There are 3 ways to create RDDs or DataFrames
* From a file or set of files
* From data in memory
* From another RDD/DataFrame

### Create SparkSession
In previous versions of Spark there were different entrypoints for the different APIs
* SparkContext for Spark Core
* SQLContext for Spark SQL
* StreamingContext for DStreams
* ... 

After Spark 2.0 the SparkSession is used as the only entry point for APIs. All functionlities of SparkContext are provided in Spark Session

In [1]:
# Trick to find Spark installation
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

#### The NBA Dataset 
This lesson uses the `2017_2018_NBA_Salaries.csv` data set, which is in CSV format.
The NBA dataset contains following data:
* Player Name
* Player Country
* Player Salary
* Player Age


#### Reading NBA Dataset into a DataFrame

In [2]:
# Default CSV read method. No headers
nba_no_headers = spark.read.csv("./2017_2018_NBA_Salaries.csv")

In [8]:
# Print Schema
nba_no_headers.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [5]:
nba_headers = (spark.read
                   .option("inferSchema","true")              # Option to tell Spark to infer the schema
                   .option("header","true")                   # Option telling Spark that the file has a header
                   .csv("./2017_2018_NBA_Salaries.csv"))      # Option telling Spark where the file is

In [7]:
# Print Schema
nba_headers.printSchema()

root
 |-- Player: string (nullable = true)
 |-- NBA_Country: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)



Note: the method to read .CSV file spark.read.csv
Spark provides special methods to read files in different format such as AVRO, PARQUET etc.
spark.read.avro
spark.read.parquet

Take a look at the schema with the `printSchema` method. This tells you the field name, field type, and whether the column is nullable or not (default is true).

In [13]:
nba_parquet = spark \
                .read \
                .parquet("2017_2018_NBA_Salaries_Parquet")

In [16]:
nba_avro = spark \
            .read \
            .format("com.databricks.spark.avro") \
            .load("2017_2018_NBA_Salaries_Avro") 

Now the DataFrame is created, view its contents by invoking the show method.

By default, show() (without any parameters) prints the first 20 rows.

Print the top n rows by invoking show(n)


#### Showing DataFrames

In [18]:
nba_no_headers.show()
nba_headers.show(1)
nba_parquet.show(2)
nba_avro.show(3)

+-------------------+-----------+--------+---+
|                _c0|        _c1|     _c2|_c3|
+-------------------+-----------+--------+---+
|             Player|NBA_Country|  Salary|Age|
|            Zhou Qi|      China|  815615| 22|
|      Zaza Pachulia|    Georgia| 3477600| 33|
|      Zach Randolph|        USA|12307692| 36|
|        Zach LaVine|        USA| 3202217| 22|
|       Zach Collins|        USA| 3057240| 20|
|       Yogi Ferrell|        USA| 1312611| 24|
|       Xavier Silas|        USA|   74159| 30|
|Xavier Rathan-Mayes|     Canada|   46080| 23|
|     Xavier Munford|        USA|       0| 25|
|    Wilson Chandler|        USA|12016854| 30|
|  Willy Hernangomez|      Spain| 1435750| 23|
|        Willie Reed|        USA| 1577230| 27|
|Willie Cauley-Stein|        USA| 3704160| 24|
|        Will Barton|        USA| 3533333| 27|
|    Wesley Matthews|        USA|17884176| 31|
|     Wesley Johnson|        USA| 5881260| 30|
|      Wesley Iwundu|        USA| 1050000| 23|
|       Wayne

Answer the following question:
> According to our data, list out the players in USA

Use the DataFrame `select` and `filter` methods.

In [21]:
from pyspark.sql.functions import *

nba_parquet.select("Player", "NBA_Country", "Salary", "Age").filter(col("NBA_Country") == 'USA')

DataFrame[Player: string, NBA_Country: string, Salary: int, Age: int]

### Built-In Functions

Spark provides a number of <a href="https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$" target="_blank">built-in functions</a>, many of which can be used directly with DataFrames.  Use these functions in the `filter` expressions to filter data and in `select` expressions to create derived columns.

Examples of such functions are:
* split()
* trim()
* substring()
* collect_set()
* regex_extrac()
* regex_replace()
* lpad

for more visit the documentation:
* Scala -> https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.functions$
* python -> https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#module-pyspark.sql.functions



Answer the following question:
> How many Players are there in USA?

Use the DataFrame `count` function

In [47]:
from pyspark.sql.functions import *

nba_parquet.filter(col("NBA_Country") == 'USA').count()
nba_headers.printSchema()

root
 |-- Player: string (nullable = true)
 |-- NBA_Country: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)



Answer the following question:
> Who is the youngest player USA?

Use the DataFrame `limit` or `first` function

In [62]:
nba_parquet.select("Player", "NBA_Country", "Salary", "Age").filter(col("NBA_Country") == 'USA') \
           .orderBy("Age", ascending=True) \
           .limit(1) \
           .show()

nba_parquet.select("Player", "NBA_Country", "Salary", "Age").filter(col("NBA_Country") == 'USA') \
           .orderBy("Age", ascending=True) \
           .head()[0]


+-----------------+-----------+-------+---+
|           Player|NBA_Country| Salary|Age|
+-----------------+-----------+-------+---+
|Terrance Ferguson|        USA|1785000| 19|
+-----------------+-----------+-------+---+



'Terrance Ferguson'

Answer the following question:
> Answer the following question:
> Who is the USA player with the highest salary?

Use the DataFrame `limit` or `first` function

Use the DataFrame `limit` or `first` function

In [77]:
nba_parquet.select("Player", "NBA_Country", "Salary", "Age").filter(col("NBA_Country") == 'USA') \
           .orderBy("Salary", ascending=False) \
           .limit(1) \
           .show()

print(nba_parquet.select("Player", "NBA_Country", "Salary", "Age").filter("NBA_Country == 'USA'") \
           .orderBy("Salary", ascending=False) \
           .head()[0])


+-------------+-----------+--------+---+
|       Player|NBA_Country|  Salary|Age|
+-------------+-----------+--------+---+
|Stephen Curry|        USA|34682550| 29|
+-------------+-----------+--------+---+

Stephen Curry


Can we do the same using the `nba_no_headers` DataFrame?

In [68]:
nba_no_headers.filter(col("_c1") == 'USA') \
           .orderBy("_c2", ascending=False) \
           .limit(1) \
           .show()

print(nba_no_headers.filter(col("_c1") == 'USA') \
           .orderBy("_c2", ascending=False) \
           .head()[0])


+--------------+---+-------+---+
|           _c0|_c1|    _c2|_c3|
+--------------+---+-------+---+
|Meyers Leonard|USA|9904495| 25|
+--------------+---+-------+---+

Meyers Leonard


Something seems to be wrong with the previous result. What is it? Remember that we did not inferred the schema for this DataFrame!! 
> What would you do in order to get the highest payed player using the `nba_no_headers` DataFrame?

Think on renaming the columns and casting too!

In [69]:
nba_no_headers.select(col("_c0").alias("Player"), \
                      col("_c1").alias("NBA_Country"), \
                      col("_c2").cast('integer').alias("Salary"), \
                      col("_c3").cast('integer').alias("Age")) \
              .orderBy("Salary", ascending=False) \
              .limit(1) \
              .show()

+-------------+-----------+--------+---+
|       Player|NBA_Country|  Salary|Age|
+-------------+-----------+--------+---+
|Stephen Curry|        USA|34682550| 29|
+-------------+-----------+--------+---+



Seems like we are mostly interested in USA players.
> Can you think of performance improvement to our data processing?

Think about `cache()` and `persist()` functions. What is the difference between each other?

In [78]:
import pyspark

nba_parquet.filter(nba_parquet.NBA_Country == 'USA').cache()

nba_parquet.filter(nba_parquet.NBA_Country == 'USA').persist(pyspark.StorageLevel.MEMORY_ONLY)

# Too much BIG DATA to fit in memory? See below
nba_parquet.filter(nba_parquet.NBA_Country == 'USA').persist(pyspark.StorageLevel.MEMORY_AND_DISK)

DataFrame[Player: string, NBA_Country: string, Salary: int, Age: int]

In order to output the NBA data into our database we need `first name` and `last name`
> Think of a way to split `Player` column into two columns: `FirstName` and `LastName`

Use `split`, `withColumn` and `drop` function

In [119]:
nba_first_last_name = nba_parquet \
                        .withColumn("FirstName", trim(split(col("Player")," ").getItem(0))) \
                        .withColumn("LastName", trim(split(nba_parquet.Player," ")[1])) \
                        .drop(col("Player")) 

In [120]:
nba_first_last_name.show()

+-----------+--------+---+---------+------------+
|NBA_Country|  Salary|Age|FirstName|    LastName|
+-----------+--------+---+---------+------------+
|      China|  815615| 22|     Zhou|          Qi|
|    Georgia| 3477600| 33|     Zaza|    Pachulia|
|        USA|12307692| 36|     Zach|    Randolph|
|        USA| 3202217| 22|     Zach|      LaVine|
|        USA| 3057240| 20|     Zach|     Collins|
|        USA| 1312611| 24|     Yogi|     Ferrell|
|        USA|   74159| 30|   Xavier|       Silas|
|     Canada|   46080| 23|   Xavier|Rathan-Mayes|
|        USA|       0| 25|   Xavier|     Munford|
|        USA|12016854| 30|   Wilson|    Chandler|
|      Spain| 1435750| 23|    Willy| Hernangomez|
|        USA| 1577230| 27|   Willie|        Reed|
|        USA| 3704160| 24|   Willie|Cauley-Stein|
|        USA| 3533333| 27|     Will|      Barton|
|        USA|17884176| 31|   Wesley|    Matthews|
|        USA| 5881260| 30|   Wesley|     Johnson|
|        USA| 1050000| 23|   Wesley|      Iwundu|


### Temporary Views

In DataFrames, <b>temporary views</b> are used to make the DataFrame available to SQL, and work with SQL syntax seamlessly.

A temporary view gives you a name to query from SQL, but unlike a table it exists only for the duration of your Spark Session. As a result, the temporary view will not carry over when you restart the cluster or switch to a new notebook. It also won't show up in the Data button on the menu on the left side of a Databricks notebook which provides easy access to databases and tables.

The statement in the following cells create a temporary view containing the same data.

In [91]:
from pyspark.sql.functions import *

nba_parquet.createOrReplaceTempView("nbaPlayers")

To view the contents of temporary view, use select notation.

In [98]:
from pyspark.sql.functions import *

spark.sql("SELECT * FROM  nbaPlayers WHERE NBA_Country = 'USA' ").show()

+-------------------+-----------+--------+---+
|             Player|NBA_Country|  Salary|Age|
+-------------------+-----------+--------+---+
|      Zach Randolph|        USA|12307692| 36|
|        Zach LaVine|        USA| 3202217| 22|
|       Zach Collins|        USA| 3057240| 20|
|       Yogi Ferrell|        USA| 1312611| 24|
|       Xavier Silas|        USA|   74159| 30|
|     Xavier Munford|        USA|       0| 25|
|    Wilson Chandler|        USA|12016854| 30|
|        Willie Reed|        USA| 1577230| 27|
|Willie Cauley-Stein|        USA| 3704160| 24|
|        Will Barton|        USA| 3533333| 27|
|    Wesley Matthews|        USA|17884176| 31|
|     Wesley Johnson|        USA| 5881260| 30|
|      Wesley Iwundu|        USA| 1050000| 23|
|       Wayne Selden|        USA| 1312611| 23|
|    Wayne Ellington|        USA| 6270000| 30|
|       Wade Baldwin|        USA| 1874400| 21|
|       Vince Hunter|        USA|   50000| 23|
|       Vince Carter|        USA| 8000000| 41|
|     Victor 

In [99]:
from pyspark.sql.functions import *

spark.sql("SELECT COUNT(*) FROM  nbaPlayers WHERE NBA_Country = 'USA' ").show()

+--------+
|count(1)|
+--------+
|     418|
+--------+



In [101]:
from pyspark.sql.functions import *

spark.sql("SELECT NBA_Country, COUNT(*) FROM nbaPlayers GROUP BY NBA_Country").show()

+----------------+--------+
|     NBA_Country|count(1)|
+----------------+--------+
|          Russia|       1|
|         Senegal|       1|
|          Sweden|       1|
|          Turkey|       5|
|Democratic Re...|       2|
|         Germany|       5|
|          France|       9|
|          Greece|       2|
|       Argentina|       2|
|         Finland|       1|
|           China|       1|
|         Bahamas|       1|
|          Bosnia|       1|
|     Puerto Rico|       2|
|         Croatia|       6|
|United Kingdo...|       2|
|           Italy|       2|
|       Lithuania|       3|
|           Spain|       7|
|Bosnia & Herz...|       1|
+----------------+--------+
only showing top 20 rows



In [102]:
from pyspark.sql.functions import *

nba_parquet.groupBy("NBA_Country").count()

DataFrame[NBA_Country: string, count: bigint]

## Column Alias

In [104]:
from pyspark.sql.functions import *

nba_parquet.select("Player", "NBA_Country", "Salary", col("Age").alias("Player_Age")).filter("NBA_Country = 'USA'")

DataFrame[Player: string, NBA_Country: string, Salary: int, Player_Age: int]

## Basic Aggregations

Using built-in Spark functions</a>, you can aggregate data in various ways. 

Run the cell below to compute the average of all salaries in the DataFrame for USA.

By default, you get a floating point value.

In [108]:
from pyspark.sql.functions import *

nba_parquet.filter("NBA_Country = 'USA'") \
           .select(avg("Salary").alias("averageSalary")) \
           .show()

+-----------------+
|    averageSalary|
+-----------------+
|5822781.588516747|
+-----------------+



In [113]:
from pyspark.sql.functions import *

nba_parquet.filter("NBA_Country = 'USA'") \
           .select( \
                max("Salary").alias("max"), \
                min("Salary").alias("min"), \
                round(avg("Salary")).alias("averageSalary")) \
           .show()

+--------+---+-------------+
|     max|min|averageSalary|
+--------+---+-------------+
|34682550|  0|    5822782.0|
+--------+---+-------------+



### Joining Two Data Sets

In [121]:
usa_df = nba_first_last_name.filter("NBA_Country = 'USA'")

canada_df = nba_first_last_name.filter("NBA_Country = 'Canada'")

In [123]:
usa_canada_df = usa_df.join(canada_df, usa_df.LastName == canada_df.LastName)

In [124]:
usa_canada_df.show()

+-----------+--------+---+---------+--------+-----------+--------+---+---------+--------+
|NBA_Country|  Salary|Age|FirstName|LastName|NBA_Country|  Salary|Age|FirstName|LastName|
+-----------+--------+---+---------+--------+-----------+--------+---+---------+--------+
|        USA| 1471382| 24|   Norman|  Powell|     Canada| 9003125| 26|   Dwight|  Powell|
|        USA|   86119| 29|  MarShon|  Brooks|     Canada|  815615| 22|   Dillon|  Brooks|
|        USA|17826150| 27|     Klay|Thompson|     Canada|16400000| 26|  Tristan|Thompson|
|        USA| 3028410| 27|    James|   Ennis|     Canada| 1524305| 23|    Tyler|   Ennis|
|        USA| 1312611| 21| Dejounte|  Murray|     Canada| 3355320| 20|    Jamal|  Murray|
|        USA| 2116955| 33|    Aaron|  Brooks|     Canada|  815615| 22|   Dillon|  Brooks|
+-----------+--------+---+---------+--------+-----------+--------+---+---------+--------+



## Review Questions

**Q:** What Scala keyword is used to *create* a DataFrame object?  
**A:** An object is created by invoking the `val` keyword, so `val myDataFrameDF`. 

**Q:** What methods (operations) can you perform on a DataFrame object?  
**A:** The full list is here: <a href="http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html" target="_blank"> pyspark.sql module</a>

**Q:** Why do you chain methods (operations) `myDataFrameDF.select().filter().groupBy()`?  
**A:** To avoid the creation of temporary DataFrames as local variables. 

For example, you could have written the above as 
`val tempDF1 = myDataFrameDF.select()`,  `val tempDF2 = tempDF1.filter()` and
then `tempDF2.groupBy()`. 

This is syntatically equivalent, but, notice how you now have extra local variables.

**Q:** What is the DataFrame syntax to create a temporary view?    
**A:** ```myDF.createOrReplaceTempView("MyTempView")```

**Q:** What is the DataFrame equivalent of the SQL statement SELECT count(*) AS total
**A:**.agg(count("*").alias("total"))

## Additional Topics & Resources

* <a href="https://spark.apache.org/docs/latest/sql-programming-guide.html" target="_blank">Spark SQL, DataFrames and Datasets Guide</a>