# DataFrames as RDD
In the first chapter we learned about Pandas DataFrames and how great they are to work with for data science. Let's see how we can make similar objects with Spark.

## Spark SQL
In the previous sheet, we used a basic SparkContext to access our data. But what if we wanted a more robust Spark interface that could interact with SQL, Hive, and other data storage systems? That's where SparkSession comes into the picture. Part of the `pyspark.sql` package, this constructor can be connected to a wide variaty of data sources and returns a SparkSession which can be used to create RDDs. Luckily, one of the data sources is your local machine so we can access the flight csv similarly to how we did before:

In [1]:
import pyspark
sparkql = pyspark.sql.SparkSession.builder.master('local').getOrCreate()

- `SparkSession.builder` is creating our session
- `.master('local')` tells the Session
- `getOrCreate()` creates a new Session since none has been created already. The builder can retrieve existing session with this function as well.

### Read flights
Now that we've got a session, let's read in the same flight data as before!

In [2]:
flight_file = '../data/flights.csv'
flight_df = sparkql.read.csv(flight_file, header=True)
print(flight_df.columns)
print(flight_df.schema)
flight_df.show(3)

['flight_date', 'airline', 'tailnumber', 'flight_number', 'src', 'dest', 'departure_time', 'arrival_time', 'flight_time', 'distance']
StructType(List(StructField(flight_date,StringType,true),StructField(airline,StringType,true),StructField(tailnumber,StringType,true),StructField(flight_number,StringType,true),StructField(src,StringType,true),StructField(dest,StringType,true),StructField(departure_time,StringType,true),StructField(arrival_time,StringType,true),StructField(flight_time,StringType,true),StructField(distance,StringType,true)))
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+
|flight_date|airline|tailnumber|flight_number|src|dest|departure_time|arrival_time|flight_time|distance|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+
| 2019-11-28|     9E|    N8974C|         3280|CHA| DTW|          1300|        1455|      115.0|   505.0|
| 2019-11-28|     9E|    N901XJ|   

As we can see, this RDD has a lot of the same properties as a Pandas DataFrame! We get columns and a sechmea and can use `show(n)` to look at `n` rows of the data. We also still have RDD properties like `.count()`

In [3]:
print(flight_df.count())

12715


## DataFrame operation with an RDD
This since this `.read.csv()` has created an RDD with DataFrame properties let's try some of the DataFrame operations! They are similar to Pandas options but Pyspark does have some differences
### Change column data type
As we can see from the `flight_df.schema` since no schema information was supplied, everything was read in as `StringType`. Let's change the date columns to the correct type. First we have to import the desired data type, in the case `DateType`

In [4]:
from pyspark.sql.types import DateType
flight_df = flight_df.withColumn('flight_date', flight_df['flight_date'].cast(DateType()))
print(flight_df.schema)

StructType(List(StructField(flight_date,DateType,true),StructField(airline,StringType,true),StructField(tailnumber,StringType,true),StructField(flight_number,StringType,true),StructField(src,StringType,true),StructField(dest,StringType,true),StructField(departure_time,StringType,true),StructField(arrival_time,StringType,true),StructField(flight_time,StringType,true),StructField(distance,StringType,true)))


These also allows us to see the power of the `withColumn` function. The first argument is the 
output column name. If the output column name is an existing column in the RDD this allows us to update columns. If it is a new column name, a new column is created:

In [5]:
from pyspark.sql.types import DoubleType
flight_df = flight_df.withColumn('num_dist', flight_df['distance'].cast(DoubleType()))
flight_df.show(3)

+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+--------+
|flight_date|airline|tailnumber|flight_number|src|dest|departure_time|arrival_time|flight_time|distance|num_dist|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+--------+
| 2019-11-28|     9E|    N8974C|         3280|CHA| DTW|          1300|        1455|      115.0|   505.0|   505.0|
| 2019-11-28|     9E|    N901XJ|         3281|JAX| RDU|           700|         824|       84.0|   407.0|   407.0|
| 2019-11-28|     9E|    N901XJ|         3282|RDU| LGA|           900|        1039|       99.0|   431.0|   431.0|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+--------+
only showing top 3 rows



## Transform single column
In addition to changing data type, we might want to tranform the data within a column. For example, what if we wanted to remove the leading `N` from each tailnumber? We can do this using the `withColumn` function as well. We just need to pass a function that can do this tranformation to an RDD column as the second arguement. To do this, we will use `pyspark.sql.fucntions.regexp_replace`, a prebuilt function for applying a regex pattern to an RDD. This function takes 3 arguments, the name of the column being transformed, the regex pattern to find, and finally what to replace that pattern with. For our case, we will look at the `'tailnumber'` column and replace leading N (`r'^N'`) with the empty string.

In [6]:
from pyspark.sql.functions import regexp_replace
flight_df = flight_df.withColumn('tailnumber', 
                                  regexp_replace('tailnumber', r'^N', r''))
flight_df.show(3)

+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+--------+
|flight_date|airline|tailnumber|flight_number|src|dest|departure_time|arrival_time|flight_time|distance|num_dist|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+--------+
| 2019-11-28|     9E|     8974C|         3280|CHA| DTW|          1300|        1455|      115.0|   505.0|   505.0|
| 2019-11-28|     9E|     901XJ|         3281|JAX| RDU|           700|         824|       84.0|   407.0|   407.0|
| 2019-11-28|     9E|     901XJ|         3282|RDU| LGA|           900|        1039|       99.0|   431.0|   431.0|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+--------+
only showing top 3 rows



## Multiple column value
What if we want to compute a new value based of multiple columns? For example the average speed of a flight would be `distance/flight_time`. As before, we simply need to pass this function as the second argument of `withColumn`:

In [7]:
flight_df = flight_df.withColumn('flight_time', flight_df['flight_time'].cast(DoubleType()))
flight_df = flight_df.withColumn('av_speed',
                                flight_df.flight_time/flight_df.num_dist)
flight_df.show(3)

+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+--------+-------------------+
|flight_date|airline|tailnumber|flight_number|src|dest|departure_time|arrival_time|flight_time|distance|num_dist|           av_speed|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+--------+-------------------+
| 2019-11-28|     9E|     8974C|         3280|CHA| DTW|          1300|        1455|      115.0|   505.0|   505.0|0.22772277227722773|
| 2019-11-28|     9E|     901XJ|         3281|JAX| RDU|           700|         824|       84.0|   407.0|   407.0|0.20638820638820637|
| 2019-11-28|     9E|     901XJ|         3282|RDU| LGA|           900|        1039|       99.0|   431.0|   431.0| 0.2296983758700696|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+--------+-------------------+
only showing top 3 rows



## Drop columns
Similarly, it is easy to drop columns as well. Simply supply the `drop()` function with the names of the columns to remove.

In [8]:
flight_df = flight_df.drop("distance", 'av_speed')
flight_df.show(3)

+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+
|flight_date|airline|tailnumber|flight_number|src|dest|departure_time|arrival_time|flight_time|num_dist|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+
| 2019-11-28|     9E|     8974C|         3280|CHA| DTW|          1300|        1455|      115.0|   505.0|
| 2019-11-28|     9E|     901XJ|         3281|JAX| RDU|           700|         824|       84.0|   407.0|
| 2019-11-28|     9E|     901XJ|         3282|RDU| LGA|           900|        1039|       99.0|   431.0|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+--------+
only showing top 3 rows



If you only want to rename a column with no changes, we can use `withColumnRenamed` instead of a full `withColumn`:

In [9]:
flight_df = flight_df.withColumnRenamed("num_dist", "dist")
flight_df.show(3)

+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+-----+
|flight_date|airline|tailnumber|flight_number|src|dest|departure_time|arrival_time|flight_time| dist|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+-----+
| 2019-11-28|     9E|     8974C|         3280|CHA| DTW|          1300|        1455|      115.0|505.0|
| 2019-11-28|     9E|     901XJ|         3281|JAX| RDU|           700|         824|       84.0|407.0|
| 2019-11-28|     9E|     901XJ|         3282|RDU| LGA|           900|        1039|       99.0|431.0|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+-----+
only showing top 3 rows



### Selection
We can also select specific columns to return or fitler rows similar to how we filtered the simple RDDs. If we supply the `select()` the columns we want, a RDD with just those is returned. For `filter()` we can leverage the structure of these RDDs for filtering similar to how we would use `==` to create a Boolean mask in a Pandas DataFrame. For simple comparisons this is more straightforward than creating a lambda function.


In [10]:
flight_df.select("flight_date", "airline", "dest").show(3)
flight_df.filter(flight_df.src == "PDX").show(3)
flight_df.filter((flight_df.src == "PDX") | (flight_df.dest == "PDX")).show(3)

+-----------+-------+----+
|flight_date|airline|dest|
+-----------+-------+----+
| 2019-11-28|     9E| DTW|
| 2019-11-28|     9E| RDU|
| 2019-11-28|     9E| LGA|
+-----------+-------+----+
only showing top 3 rows

+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+------+
|flight_date|airline|tailnumber|flight_number|src|dest|departure_time|arrival_time|flight_time|  dist|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+------+
| 2019-11-28|     AA|     832NN|         1402|PDX| PHX|           820|        1153|      153.0|1009.0|
| 2019-11-28|     AA|     939AN|         2298|PDX| ORD|           627|        1229|      242.0|1739.0|
| 2019-11-28|     AA|     992AU|         2577|PDX| DFW|           600|        1141|      221.0|1616.0|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+------+
only showing top 3 rows

+-----------+-------+----------+--------

### Aggregation
We can also group our data by a column or a set of columns as showin in the two examples bellow. We use the `count()` option to find the number of rows within each grouping. The grouped RDD does not have a view unless we apply some sort of function to each group; here we chose to count the number of rows.

In [11]:
flight_df.groupBy("dest").count().show(3)
flight_df.groupBy("dest", "airline").count().show(3)

+----+-----+
|dest|count|
+----+-----+
| BGM|    1|
| PSE|    3|
| INL|    1|
+----+-----+
only showing top 3 rows

+----+-------+-----+
|dest|airline|count|
+----+-------+-----+
| MDT|     9E|    1|
| KTN|     AS|    4|
| JAX|     EV|    2|
+----+-------+-----+
only showing top 3 rows



If instead of just counting the number of rows, we wanted to know the average flight time for each airline and look at the longest average flight time? We can do a similar `groupBy` but with a different aggregation function `avg()` which is given the name of the column to average for each group. We can then use `orderBy()` with the option `ascending=False` to see the airlines with the largest average flight time:

In [12]:
flight_df.groupBy('airline')\
.avg('flight_time')\
.orderBy('avg(flight_time)', ascending=False)\
.show(3)

+-------+------------------+
|airline|  avg(flight_time)|
+-------+------------------+
|     AS|212.85046728971963|
|     UA|187.08619173262971|
|     B6|182.17331670822944|
+-------+------------------+
only showing top 3 rows



What if we want to compute a cumstom aggregation? That's where `agg` comes in. Let's say we regret dropping the average speed per flight and now was to find the flight with the highest average speed per airline? We can use the following custom `agg` to first compute the average speed per flight then return the maximum value per airline. To do with we use two helper functions from `pyspark.sql.functions`: `col` and `max`. `col` allows us to reference a specific column within our function and `max` is one of the premade aggergator functions.

In [13]:
from pyspark.sql.functions import col, max
flight_df.groupBy('airline')\
.agg(max(col('dist')/col('flight_time')).alias('max_av_speed'))\
.orderBy('max_av_speed', ascending=False)\
.show(3)

+-------+-----------------+
|airline|     max_av_speed|
+-------+-----------------+
|     UA|8.943529411764706|
|     DL|8.810176125244618|
|     HA| 8.78448275862069|
+-------+-----------------+
only showing top 3 rows



# Joining data
Up till now, we've been working with an RDD created from just a single csv. As with most tabular data, one of the most powerful things we can do is join different datasets on common attributes. For example, let's load in a new csv of airport data and find the departure timezone for each flight:

In [14]:
airport_file = '../data/airports.csv'
airport_df = sparkql.read.csv(airport_file, header=True)
airport_df.show(3)

+----+--------------------+----------+-----+-------+-----------+-------------------+---+-------------------+----------+
|iata|                name|      city|state|country|        lat|               long|dst|                 tz|utc_offset|
+----+--------------------+----------+-----+-------+-----------+-------------------+---+-------------------+----------+
| CRQ|   Caravelas Airport|  Carlsbad|   CA|    USA|33.12822222|-117.28022220000001|  S|America/Los_Angeles|        -7|
| CMA|  Cunnamulla Airport| Camarillo|   CA|    USA|34.21375472|       -119.0943264|  O|America/Los_Angeles|        -7|
| IZA|Zona da Mata Regi...|Santa Ynez|   CA|    USA|34.60682028|-120.07556170000001|  S|America/Los_Angeles|        -7|
+----+--------------------+----------+-----+-------+-----------+-------------------+---+-------------------+----------+
only showing top 3 rows



We can see that the `'iata'` column has the airport code we've seen in our fligh data so let's join using that column:

In [15]:
flight_df.join(airport_df.select('iata', 'tz'), 
               flight_df.src == airport_df.iata, 
               how='left').drop('iata').show(3)

+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+-----+----------------+
|flight_date|airline|tailnumber|flight_number|src|dest|departure_time|arrival_time|flight_time| dist|              tz|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+-----+----------------+
| 2019-11-28|     9E|     8974C|         3280|CHA| DTW|          1300|        1455|      115.0|505.0|America/New_York|
| 2019-11-28|     9E|     901XJ|         3281|JAX| RDU|           700|         824|       84.0|407.0|America/New_York|
| 2019-11-28|     9E|     901XJ|         3282|RDU| LGA|           900|        1039|       99.0|431.0|America/New_York|
+-----------+-------+----------+-------------+---+----+--------------+------------+-----------+-----+----------------+
only showing top 3 rows



Pretty straightforward! The `join()` method is built into the RDD so we simply need to supply the df to join the the join condition and the type of join to perform. In our case we filter `airports_df` down to the join column `'iata'` and the data we want to join `'tz'` in the first agrument because we don't care about the info in the other columns. If the join columns have the same name in both RDDs, we could use `on='column_name'` but in our case they have different names so we provide the join condition directly. Lastly, we don't want to lose flights that we don't have a timezone for so we choose a `left` join.