# Apache Spark to Spark

### What is Spark?

Apache Spark is an in-memory distributed computing system with high-level APIs in Java, Scala, Python and R, and a graph-based optimization engine. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming. Unlike Hadoop, Spark does not write to disk but keeps data in memory and so is typically much faster for performing data engineering, analysis, and modeling tasks within Big Data environments and with large data sets.

### Spark Basics and Spark SQL

First we will initialize the [SparkContext and SparkSession](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#initializing-spark).

Once the session is initialized, we can view running jobs and other related information in the [Spark Web UI](https://spark.apache.org/docs/latest/monitoring.html#web-interfaces). 

By default, the UI is accessed through port 4040, and can be accessed by either navigating the Web UI URL on that port (default: http://localhost:4040) or by accessing through the SparkSession object details. (Note: On EMR, viewing the Spark UI and Driver log links below requires SSH tunneling with port-forwarding).

In [None]:
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1657656595047_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fb71b46ca90>

In [None]:
sc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-2>

### Working with Spark RDDs

The main abstraction Spark uses are objects called *resilient distributed datasets*, which partitions and distributes data across the nodes of the cluster such that it can be operated on in parallel.

There are two ways to create an RDD: either by parallelizing existing data in memory, or by reading in / referencing a data set.

For the former, if we have regular data in python, _e.g._ in a list, we can distribute it across the memory of the nodes by using `sc.parallelize`. This will return an object of class RDD:

In [None]:
import numpy as np
data = np.random.rand(1000)
distdata = sc.parallelize(data)
print(distdata.__class__)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'pyspark.rdd.RDD'>

We can also read data into an RDD directly from a datasource. Here we will work with the [Open Air Quality dataset](https://registry.opendata.aws/openaq/) from OpenAQ, which is publicly readable from an S3 bucket:

In [None]:
data = sc.textFile('s3://brainstation-dsft/sparklab/openaq-2017-09-05.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Inspecting Data

To examine the first few rows of an RDD, we can call the `take` function:

In [None]:
data.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['"Wynyard","1","µg/m³","pm25","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:54:19.000Z","2017-09-05T09:54:19+10:00","government","false","-40.9918","145.719","0.25","hours"', '"Wynyard","2","µg/m³","pm10","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:54:19.000Z","2017-09-05T09:54:19+10:00","government","false","-40.9918","145.719","0.25","hours"', '"Emu River","0","µg/m³","pm25","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:54:17.000Z","2017-09-05T09:54:17+10:00","government","false","-41.1081","145.883","0.25","hours"', '"Emu River","1","µg/m³","pm10","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:54:17.000Z","2017-09-05T09:54:17+10:00","government","false","-41.1081","145.883","0.25","hours"', '"West Ulverstone","0","µg/m³","pm25","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:54:22.000Z","2017-09-05T09:54:22+10:00","government","false","-41.1586","146.146","0.25","hours"']

You can see the data is stored as raw text and`take` returns a list of strings. We can also count the number of rows in the RDD to see how much data we're working with:

In [None]:
data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

553383

We can get information about how the data is partitioned:

In [None]:
data.getNumPartitions()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2

and modify the number of partitions to optimize how the data are parallelized:

In [None]:
data = data.repartition(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
data.getNumPartitions()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3

In [None]:
data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

553383

### Transformations and Actions

In Spark, _transformations_ create new data sets from existing data, whereas _actions_ return a value after doing a computation. The important very important distinction between the two is while transformations will add computations to the plan without executing it immediately, it is only when an action is called that all the computations will be done. This is known as _lazy evaluation_.


#### RDD Transformations

Spark RDD functions relies heavily on functional programming concepts and the use of lambda functions to apply operations across all elements in an RDD. The `map` function will apply the lambda function to each row of an RDD and return a new RDD of the transformed data as a result:

In [None]:
# Map function
print("Raw data:")
print(data.take(5))
print("")

print("Mapped data:")
# Split each row (string) of the RDD by commas and take the 10th element (timestamp)
print(data.map(lambda x: x.split(",")[8]).take(10))
print("")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Raw data:
['"George Town","1","µg/m³","pm25","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:52:22.000Z","2017-09-05T09:52:22+10:00","government","false","-41.1109","146.836","0.25","hours"', '"George Town","3","µg/m³","pm10","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:52:22.000Z","2017-09-05T09:52:22+10:00","government","false","-41.1109","146.836","0.25","hours"', '"Exeter","0","µg/m³","pm25","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:52:17.000Z","2017-09-05T09:52:17+10:00","government","false","-41.3056","146.951","0.25","hours"', '"Exeter","1","µg/m³","pm10","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:52:17.000Z","2017-09-05T09:52:17+10:00","government","false","-41.3056","146.951","0.25","hours"', '"Ti Tree Bend","0","µg/m³","pm25","AU","Tasmania Region","Australia - Tasmania","2017-09-04T23:52:06.000Z","2017-09-05T09:52:06+10:00","government","false","-41.4174","147.125","0.25","hours"']

Mapped data:
['"2017-09-05T

The filter function iterates over the rows and returns a new dataframe matching the condition in the lambda function:

In [None]:
# Get the country name field using map and split
print(data.map(lambda x: x.split(",")[6]).take(1))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['"Australia - Tasmania"']

In [None]:
# Get the country name field using map and split
print(data.map(lambda x: x.split(",")[6]).take(1))

# Filter only rows where the country name is equal to Turkey
print(data.filter(lambda x: x.split(",")[6] == '"Turkey"').take(1))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['"Australia - Tasmania"']
['"Doğubeyazıt","35","µg/m³","pm10","TR","Ağrı","Turkey","2017-09-05T00:00:00.000Z","2017-09-05T03:00:00+03:00","government","false","39.5476","44.083595","1","hours"']

There is also a groupBy function to perform aggregations. Calling `.collect()` is an action that returns the result, so the plan is executed.

In [None]:
#GroupBy
grouped = data.groupBy(lambda x: x.split(",")[6])
for (k,v) in grouped.collect():
    print(k, len(v))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

"Australia - Tasmania" 8352
"Bosnia" 685
"CPCB" 9840
"Thiruvananthapuram" 270
"Chandrapur" 192
"StateAir_Kathmandu" 48
"AirNow" 55489
"EEA Denmark" 574
"StateAir_Chennai" 23
"StateAir_Hyderabad" 24
"StateAir_Kolkata" 24
"StateAir_Mumbai" 24
"Kolkata" 183
"Yala" 85
"Nan" 215
"Shanghai" 23
"StateAir_Dhaka" 22
"Slovenian Environment Agency" 141
"Muscatine" 40
"Thailand" 40
"Samut Prakan" 113
"Bergen" 72
"Nonthaburi" 12
"StateAir_Addis_Ababa_School" 22
"Nashik" 199
"Phayao" 111
"Nakhon Sawan" 110
"Andalucia" 20952
"Iowa City" 20
"Durgapur" 104
"Chachoengsao" 4
"Sao Paulo" 4356
"Chile - SINCA" 4548
"Israel" 401836
"Vijayawara" 234
"Ludhiana" 438
"Netherlands" 6118
"Sweden" 494
"Lampang" 383
"Australia - Queensland" 1760
"Chillán" 44
"StateAir_NewDelhi" 22
"Thane" 135
"Amritsar" 396
"StateAir_KuwaitCity" 19
"Hamar" 24
"Saraburi" 176
"Ratchaburi" 111
"Mae Hong Son" 21
"Prachin Buri" 84
"Beijing US Embassy" 23
"Chengdu" 23
"Shenyang" 23
"Kanchanaburi" 95
"StateAir_Manama" 44
"StateAir_JakartaC

### RDD Actions

Unlike RDD transformations, RDD actions perform the computation and actually "do work" on the cluster. Adding many different transformations creates a chain which will be executed in sequence, and Spark translates into the appropriate steps in computation and data movement (the "plan").

One of the most common actions is `collect()` which we saw above. It does the computations on the cluster and then draws the result into memory on the head node. Care should be taken when calling this, as if the size of the data is larger then what is available on the head node than it will fail or overload the head node!

In the example below, no computation is done until the final action (`collect`) is called:

In [None]:
# Transformations - no computation is done
timestrings = data.map(lambda x: x.split(",")[8])
years = timestrings.map(lambda x: x[1:5]) 
grouped = years.groupBy(lambda x: x)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Work is performed when collect is called
for (k,v) in grouped.collect():
    print(k, len(v))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2017 553383

We can also perform standard mathematical aggregation functions on numerical data stored in RDDs. Since these require scanning over the entirety of the data, they are also actions.

In [None]:
# Mathematical functions on RDDs
# Extract the field length of the 6th field (should be country name)
field_length = data.map(lambda x: x.split(",")[6]).map(lambda x: len(x))

# Max
max_length = field_length.max()
print(max_length)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

35

In [None]:
# Sum
field_length.sum()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4800293

In [None]:
# Mean
field_length.mean()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8.674449703008598

In [None]:
# Stdev
field_length.stdev()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2.7332966182718477

 We can also apply generalized aggregation-type functions (as part of the MapReduce process) using `reduce` and `reduceByKey`. In this case, the lambda function takes two arguments, x and y, where x is the result of the running reduce operation, and y is the next value:

In [None]:
keyed = field_length.map(lambda x: (x, 1))
counted = keyed.reduceByKey(lambda x, y: x + y)

# Action - work is done
reduced = counted.collect()

# Print output
for i in sorted(reduced):
    print(i[0], i[1])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5 215
6 9997
7 6999
8 472145
9 3960
10 1753
11 29417
12 1054
13 6807
14 1920
15 4572
16 45
17 110
18 764
19 44
20 365
21 19
22 8478
23 22
24 1760
25 21
29 2617
30 259
35 40

The above is equivalent to using the `countByKey` function:

In [None]:
keyed = field_length.map(lambda x: (x, 1))

# Reduce Action - work is done
counted = keyed.countByKey()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
print(counted)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defaultdict(<class 'int'>, {22: 8478, 8: 472145, 11: 29417, 15: 4572, 6: 9997, 20: 365, 12: 1054, 9: 3960, 13: 6807, 18: 764, 10: 1753, 29: 2617, 24: 1760, 7: 6999, 19: 44, 21: 19, 17: 110, 5: 215, 14: 1920, 16: 45, 23: 22, 25: 21, 30: 259, 35: 40})

### Working with Spark DataFrames

In practice, working with RDDs is cumbersome and as a data scientist (or even data engineer) it is usually easier to use [DataFrames and Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) which are higher level APIs built on top of RDDs.

The DataFrames API closely mirrors functionality and syntax from pandas, making it easy for a data scientist or developer to transition from working with smaller datasets in a centralized environment (_i.e._ your laptop) to working with large datasets in a distributed Big Data environment. Spark can read data locally from files, databases, large datasets from HDFS, or even from S3 buckets! There is a full list of compatible data sources [here](https://spark.apache.org/docs/latest/sql-data-sources.html).

In [None]:
# Read in some data locally
df = spark.read.csv('s3://brainstation-dsft/sparklab/uber_nyc_data.csv', header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

There is also an `inferSchema` parameter that if set to True tells Spark to infer the data types in each column, which requires another pass through the data which can be expensive when working with large datasets. If set to `None` or `False` (default) then all columns are read in as strings.

We can print out the schema and data types in human-readable format using `printSchema()`:

In [None]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- origin_taz: string (nullable = true)
 |-- destination_taz: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- trip_duration: string (nullable = true)

You can also get a list of the columns using `.columns`:

In [None]:
df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['id', 'origin_taz', 'destination_taz', 'pickup_datetime', 'trip_distance', 'trip_duration']

Let's take a look at our data. There is a `head` command for Spark DataFrames just like for those in pandas.

In [None]:
# Just like for RDDs, we can show the first few lines using take (or equivalently, df.head(n_rows))
df.head(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(id='252581', origin_taz='7C', destination_taz='6A', pickup_datetime='2014-09-01 09:00:00', trip_distance='4.25', trip_duration='0:15:11'), Row(id='252582', origin_taz='7B', destination_taz='15', pickup_datetime='2014-09-01 18:00:00', trip_distance='10.17', trip_duration='0:34:05'), Row(id='252583', origin_taz='11', destination_taz='2A', pickup_datetime='2014-09-01 17:00:00', trip_distance='4.02', trip_duration='0:17:06'), Row(id='252584', origin_taz='3B', destination_taz='4A', pickup_datetime='2014-09-01 13:00:00', trip_distance='1.46', trip_duration='0:06:32'), Row(id='252585', origin_taz='2A', destination_taz='10', pickup_datetime='2014-09-01 14:00:00', trip_distance='8.31', trip_duration='0:26:17')]

A Spark DataFrame is actually nothing more than a list made up of special Spark `Row` objects:

In [None]:
df.take(1)[0].__class__

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'pyspark.sql.types.Row'>

In [None]:
df.take(1)[0]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(id='252581', origin_taz='7C', destination_taz='6A', pickup_datetime='2014-09-01 09:00:00', trip_distance='4.25', trip_duration='0:15:11')

If we use `take` or `head`, we get back lists of Row objects that are not very easy to read. Instead we can call `show` to get nicely formatted tabular data:

In [None]:
df.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+---------------+-------------------+-------------+-------------+
|    id|origin_taz|destination_taz|    pickup_datetime|trip_distance|trip_duration|
+------+----------+---------------+-------------------+-------------+-------------+
|252581|        7C|             6A|2014-09-01 09:00:00|         4.25|      0:15:11|
|252582|        7B|             15|2014-09-01 18:00:00|        10.17|      0:34:05|
|252583|        11|             2A|2014-09-01 17:00:00|         4.02|      0:17:06|
|252584|        3B|             4A|2014-09-01 13:00:00|         1.46|      0:06:32|
|252585|        2A|             10|2014-09-01 14:00:00|         8.31|      0:26:17|
|252586|        5B|             4C|2014-09-01 12:00:00|         1.04|      0:08:35|
|252587|        10|             10|2014-09-01 14:00:00|         1.44|      0:19:36|
|252588|        2A|             7A|2014-09-01 03:00:00|         5.08|      0:18:17|
|252589|        2B|             3C|2014-09-01 11:00:00|         1.64|      0

We can also get the total count of rows for the whole dataframe using `.count()`, just like for an RDD:

In [None]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

30925738

For large datasets, even counting the number of rows can be an expensive operation! We can see that there are almost 31 million Uber rides in the dataset!

Dataframes in Spark are _immutable_, which means that data in individual columns cannot be updated / modified as it would be in other cases (like pandas or R). However, new columns can be created from old columns (and also overwrite/replace entire existing columns) with the `withColumn` function, and unneeded columns dropped using `drop`.

While the `drop` function just takes a comma-separated list of column names, the `withColumn` function, like many functions in Spark, takes a DataFrame column as an argument:

In [None]:
# Correct the data type of the trip_distance column and datetime columns and replace the entire columns
df = df.withColumn("trip_distance", df["trip_distance"].cast('float'))
df = df.withColumn("pickup_datetime", df["pickup_datetime"].cast('timestamp'))

# Check
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- origin_taz: string (nullable = true)
 |-- destination_taz: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- trip_duration: string (nullable = true)

We can create a boolean columns using expressions just as we would with a pandas series or numpy array:

In [None]:
# Create a new boolean column for trips with distance greater than 5 miles
df = df.withColumn('morethan5mi', df['trip_distance'] > 5)
df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+---------------+-------------------+-------------+-------------+-----------+
|    id|origin_taz|destination_taz|    pickup_datetime|trip_distance|trip_duration|morethan5mi|
+------+----------+---------------+-------------------+-------------+-------------+-----------+
|252581|        7C|             6A|2014-09-01 09:00:00|         4.25|      0:15:11|      false|
|252582|        7B|             15|2014-09-01 18:00:00|        10.17|      0:34:05|       true|
|252583|        11|             2A|2014-09-01 17:00:00|         4.02|      0:17:06|      false|
|252584|        3B|             4A|2014-09-01 13:00:00|         1.46|      0:06:32|      false|
|252585|        2A|             10|2014-09-01 14:00:00|         8.31|      0:26:17|       true|
|252586|        5B|             4C|2014-09-01 12:00:00|         1.04|      0:08:35|      false|
|252587|        10|             10|2014-09-01 14:00:00|         1.44|      0:19:36|      false|
|252588|        2A|             7A|2014-

In [None]:
# Drop the id and trip_duration columns and display result
df.drop('id', 'trip_duration').show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------+-------------------+-------------+-----------+
|origin_taz|destination_taz|    pickup_datetime|trip_distance|morethan5mi|
+----------+---------------+-------------------+-------------+-----------+
|        7C|             6A|2014-09-01 09:00:00|         4.25|      false|
|        7B|             15|2014-09-01 18:00:00|        10.17|       true|
|        11|             2A|2014-09-01 17:00:00|         4.02|      false|
|        3B|             4A|2014-09-01 13:00:00|         1.46|      false|
|        2A|             10|2014-09-01 14:00:00|         8.31|       true|
|        5B|             4C|2014-09-01 12:00:00|         1.04|      false|
|        10|             10|2014-09-01 14:00:00|         1.44|      false|
|        2A|             7A|2014-09-01 03:00:00|         5.08|       true|
|        2B|             3C|2014-09-01 11:00:00|         1.64|      false|
|         9|             5B|2014-09-01 20:00:00|        16.63|       true|
+----------+-------------

Finally, we can split the string into components and create a new column to get the trip duration in seconds:

In [None]:
from pyspark.sql.functions import split
df = df.withColumn("split_col", split(df['trip_duration'], ':'))
df = df.withColumn('hours', df['split_col'].getItem(0).cast('int'))  
df = df.withColumn('minutes', df['split_col'].getItem(1).cast('int'))
df = df.withColumn('seconds', df['split_col'].getItem(2).cast('int'))
df = df.withColumn('duration_secs', df['hours']*3600.0 + df['minutes']*60.0 + df['seconds'])
df = df.drop('split_col', 'hours', 'minutes','seconds')
df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+---------------+-------------------+-------------+-------------+-----------+-------------+
|    id|origin_taz|destination_taz|    pickup_datetime|trip_distance|trip_duration|morethan5mi|duration_secs|
+------+----------+---------------+-------------------+-------------+-------------+-----------+-------------+
|252581|        7C|             6A|2014-09-01 09:00:00|         4.25|      0:15:11|      false|        911.0|
|252582|        7B|             15|2014-09-01 18:00:00|        10.17|      0:34:05|       true|       2045.0|
|252583|        11|             2A|2014-09-01 17:00:00|         4.02|      0:17:06|      false|       1026.0|
|252584|        3B|             4A|2014-09-01 13:00:00|         1.46|      0:06:32|      false|        392.0|
|252585|        2A|             10|2014-09-01 14:00:00|         8.31|      0:26:17|       true|       1577.0|
|252586|        5B|             4C|2014-09-01 12:00:00|         1.04|      0:08:35|      false|        515.0|
|252587|  

And then we can summarize the duration_secs column to see the distribution of trip lengths:

In [None]:
df.describe('duration_secs').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+
|summary|     duration_secs|
+-------+------------------+
|  count|          30925700|
|   mean|1340.0212437551938|
| stddev|222733.92597508588|
|    min|               0.0|
|    max|      6.19317543E8|
+-------+------------------+

In a working Big Data environment, large datasets would be initially read into Spark and worked with using the distributed (cluster) environment and then can be converted to pandas once they've been reduced down by filtering, aggregation, and other processing. 

If pandas is present on the head node, there is also a `toPandas` function which will even convert a Spark DataFrame into a pandas one! However, this is not the default setup for clusters on EMR, so we will not do this here. It should also be noted that if available, care should be taken when using the `toPandas`, as it must be guaranteed that the data in the Spark DataFrame will fit into memory in a pandas one!

A more common approach would be to write reduced data out to either HDFS or another filesystem (S3, Google Cloud Storage, Azure Blob Storage, etc.) or even a database as part of a Big Data analysis workflow.

### Spark SQL

Spark SQL refers both to the aggregation functions available to work with data within DataFrames as one would with RDDs, as well as the ability to conveniently query and aggregate data using ordinary SQL queries via use of the `spark.sql` module. Spark SQL can also be used to query any data that is available in Hive tables directly.

It should be emphasized that Spark is not a database (!) however the internals can take SQL queries and converts them into transformations which are then optimized and executed by the Spark engine.

Actions and transformations that can be applied using Spark SQL functions have equivalent representations as a SQL query that can be executed using `spark.sql` and may be executed in either fashion.

In [None]:
df.select('origin_taz', 'trip_distance').show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------+
|origin_taz|trip_distance|
+----------+-------------+
|        7C|         4.25|
|        7B|        10.17|
|        11|         4.02|
|        3B|         1.46|
|        2A|         8.31|
|        5B|         1.04|
|        10|         1.44|
|        2A|         5.08|
|        2B|         1.64|
|         9|        16.63|
+----------+-------------+
only showing top 10 rows

Using SQL requires the `CreateorReplaceTempView` function, which registers the data as a view in the Spark session. We can then query against that view with SQL, with the view name being the table name:

In [None]:
df.createOrReplaceTempView("uber_nyc")
spark.sql("SELECT origin_taz, trip_distance FROM uber_nyc").show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------+
|origin_taz|trip_distance|
+----------+-------------+
|        7C|         4.25|
|        7B|        10.17|
|        11|         4.02|
|        3B|         1.46|
|        2A|         8.31|
|        5B|         1.04|
|        10|         1.44|
|        2A|         5.08|
|        2B|         1.64|
|         9|        16.63|
+----------+-------------+
only showing top 10 rows

Let's do a simple `GROUP BY` to count the number of trips present in the data set per year:

In [None]:
spark.sql("SELECT YEAR(pickup_datetime), COUNT(*) FROM uber_nyc GROUP BY YEAR(pickup_datetime)").show(10);

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------------+--------+
|year(CAST(pickup_datetime AS DATE))|count(1)|
+-----------------------------------+--------+
|                               2015|23499302|
|                               2014| 7426436|
+-----------------------------------+--------+

We can use Spark SQL functions to calculate the average distance from the area of origin:

In [None]:
# Perform a transformation using Spark SQL
# Group by the InvoiceNo and sum the UnitPrice
df.groupby('origin_taz').avg('trip_distance').show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+
|origin_taz|avg(trip_distance)|
+----------+------------------+
|        15| 5.186272880957412|
|        11| 4.543250304394352|
|         8| 11.48813942374257|
|        3B|4.0397300822100926|
|        4B| 4.923632563383967|
|        7A| 5.260484663656681|
|        16| 6.783649924989385|
|        7B| 5.255862307165336|
|        7C| 6.210172574180387|
|        17| 9.869550069758546|
+----------+------------------+
only showing top 10 rows

This can be written equivalently using `spark.sql` and a SQL query:

In [None]:
# Equivalently my use a SQL query and spark.sql to do the same
spark.sql('SELECT origin_taz, AVG(trip_distance) AS avg_distance FROM uber_nyc GROUP BY origin_taz').show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+
|origin_taz|      avg_distance|
+----------+------------------+
|        15| 5.186272880957412|
|        11| 4.543250304394352|
|         8| 11.48813942374257|
|        3B|4.0397300822100926|
|        4B| 4.923632563383967|
|        7A| 5.260484663656681|
|        16| 6.783649924989385|
|        7B| 5.255862307165336|
|        7C| 6.210172574180387|
|        17| 9.869550069758546|
+----------+------------------+
only showing top 10 rows

We can also use the `withColumnRenamed` function to rename columns (useful for aggregated columns):

In [None]:
df.groupby('origin_taz').avg('trip_distance').withColumnRenamed('avg(trip_distance)', 'avg_distance').show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------+
|origin_taz|      avg_distance|
+----------+------------------+
|        15| 5.186272880957412|
|        11| 4.543250304394352|
|         8| 11.48813942374257|
|        4B| 4.923632563383967|
|        3B|4.0397300822100926|
|        7A| 5.260484663656681|
|        16| 6.783649924989385|
|        7B| 5.255862307165336|
|        7C| 6.210172574180387|
|        17| 9.869550069758546|
+----------+------------------+
only showing top 10 rows

Finally when finished, a Spark session can be halted with `stop()`.

In [None]:
# Do not run! (we're going to keep working)
#spark.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Exercise

1. Read in the [NYC Yellow Cab trip data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) from the following S3 bucket:
`s3://brainstation-dsft/sparklab/yellow_tripdata_2018-10.csv`
2. Print schema of the DataFrame - how does Spark read in all columns by default?
3. Display the first row of the data and the first 10 rows of data.
4. What is the total number of rows in the dataset? How many Yellow Trips were there in NYC in total in October 2018?
5. Using `withColumn` and `cast`, convert the following columns to the correct data types:
    - **tpep_pickup_datetime**: `timestamp`
    - **passenger_count**: `int`
    - **trip_distance**: `float`
    - **fare_amount**: `float`
    - **tip_amount**: `float`
    - **extra**: `float`
6. Calculate a new column, `tip_pct`, which is the tip percentage relative to the base fare, show the top 10 records of this new column.
7. Calculate the average tip percentage across all rides.
8. Register a view to allow the use of `spark.sql`, and calculate the number of rides per hour of day across all days in the dataset. Which are the most popular times of day?

#### Solutions

When we read in the data and look at the schema, we see that the columns are all strings.

In [None]:
#q1 read in the data

yellow_df = spark.read.csv('s3://brainstation-dsft/sparklab/yellow_tripdata_2018-10.csv', header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# q2 print schema
yellow_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)

We can use the `.take(n)` or `.show(n)` method to present the first `n` rows: 

In [None]:
# q3

# display first row and display first 10 rows

yellow_df.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(VendorID='1', tpep_pickup_datetime='2018-10-01 00:23:34', tpep_dropoff_datetime='2018-10-01 00:44:50', passenger_count='1.0', trip_distance='6.2', RatecodeID='1.0', store_and_fwd_flag='N', PULocationID='68', DOLocationID='7', payment_type='2', fare_amount='20.5', extra='0.5', mta_tax='0.5', tip_amount='0.0', tolls_amount='0.0', improvement_surcharge='0.3', total_amount='21.8', congestion_surcharge=None, airport_fee=None)]

In [None]:
yellow_df.show(10, vertical = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2018-10-01 00:23:34 
 tpep_dropoff_datetime | 2018-10-01 00:44:50 
 passenger_count       | 1.0                 
 trip_distance         | 6.2                 
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 68                  
 DOLocationID          | 7                   
 payment_type          | 2                   
 fare_amount           | 20.5                
 extra                 | 0.5                 
 mta_tax               | 0.5                 
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 21.8                
 congestion_surcharge  | null                
 airport_fee           | null                
-RECORD 1------------------------------------
 VendorID              | 1        

Next, the total row count:

In [None]:
# q4

# Number of rows
yellow_df.count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8834520

We can see there are ~8.8M rows in the dataset, assuming each row is a trip and this file is complete, there were ~8.8 M yellow cab trips in October of 2018.

Using `withColumn` and `cast`, we convert the following columns to the correct data types:
- **tpep_pickup_datetime**: `datetime`
- **passenger_count**: `int`
- **trip_distance**: `float`
- **fare_amount**: `float`
- **tip_amount**: `string`
- **extra**: `float`

In [None]:
# q5 Date casting to conversion
yellow_df = yellow_df.withColumn("tpep_pickup_datetime", yellow_df["tpep_pickup_datetime"].cast('timestamp'))
yellow_df = yellow_df.withColumn("tpep_dropoff_datetime", yellow_df["tpep_dropoff_datetime"].cast('timestamp'))
yellow_df = yellow_df.withColumn("passenger_count", yellow_df["passenger_count"].cast('int'))
yellow_df = yellow_df.withColumn("tip_amount", yellow_df["tip_amount"].cast('double'))
yellow_df = yellow_df.withColumn("fare_amount", yellow_df["fare_amount"].cast('double'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Check
yellow_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)

Inspecting the result, we can see the columns have now been updated to the correct formats.

Next, we calculate the tip percentage:

In [None]:
# q6

yellow_df = yellow_df.withColumn("tip_pct", yellow_df['tip_amount']/yellow_df['fare_amount'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# q7
yellow_df.groupby().avg('tip_pct').show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+
|       avg(tip_pct)|
+-------------------+
|0.16957932834850672|
+-------------------+

As we can see, the average tip is 16.98%, pretty close to 15%!

In [None]:
# q8

# Register the view

yellow_df.createOrReplaceTempView('yellowtrip')

# do the query

result_df = spark.sql('SELECT HOUR(tpep_pickup_datetime), COUNT(*) FROM yellowtrip GROUP BY HOUR(tpep_pickup_datetime) \
    ORDER BY HOUR(tpep_pickup_datetime) ASC')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Finally, we will create a view to calculate the number of rides per hour of day across all days in the dataset.

In [None]:
# Register the view
yellow_df.createOrReplaceTempView('yellowtrip')

result_df = spark.sql('SELECT HOUR(tpep_pickup_datetime), COUNT(*) FROM yellowtrip GROUP BY HOUR(tpep_pickup_datetime) \
    ORDER BY HOUR(tpep_pickup_datetime) ASC')




VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
result_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[hour(tpep_pickup_datetime): int, count(1): bigint]

In [None]:
result_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------+--------+
|hour(tpep_pickup_datetime)|count(1)|
+--------------------------+--------+
|                         0|  264136|
|                         1|  182713|
|                         2|  129745|
|                         3|   91696|
|                         4|   71749|
|                         5|   87663|
|                         6|  204261|
|                         7|  352318|
|                         8|  414895|
|                         9|  416338|
|                        10|  416463|
|                        11|  431679|
|                        12|  450033|
|                        13|  446059|
|                        14|  473948|
|                        15|  473042|
|                        16|  440553|
|                        17|  509599|
|                        18|  566149|
|                        19|  556830|
+--------------------------+--------+
only showing top 20 rows

We can then save the results to an S3 bucket:

In [None]:
# ADD your own S3 bucket here 
result_df.write.csv('s3://mybucket/yellow_cab_results.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<div id="container" style="position:relative;">
<div style="position:relative; float:right"><img style="height:25px""width: 50px" src ="https://drive.google.com/uc?export=view&id=14VoXUJftgptWtdNhtNYVm6cjVmEWpki1" />
</div>
</div>