# DataFrames Hands-on: my first exercise with DataFrames

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

## Introduction to the Flights dataset

According to a 2010 report made by the US Federal Aviation Administration, the economic price of domestic flight delays entails a yearly cost of 32.9 billion dollars to passengers, airlines and other parts of the economy. More than half of that amount comes from passengers' pockets, as they do not only waste time waiting for their planes to leave, but also miss connecting flights, spend money on food and have to sleep on hotel rooms while they're stranded.

The report, focusing on data from year 2007, estimated that air transportation delays put a 4 billion dollar dent in the country's gross domestic product that year. Full report can be found 
<a href="http://www.isr.umd.edu/NEXTOR/pubs/TDI_Report_Final_10_18_10_V3.pdf">here</a>.

But which are the causes for these delays?

In order to answer this question, we are going to analyze the provided dataset, containing up to 1.936.758 different internal flights in the US for 2008 and their causes for delay, diversion and cancellation; if any.

The data comes from the U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics (BTS)

This dataset is composed by the following variables:
1. **Year** 2008
2. **Month** 1
3. **DayofMonth** 1-31
4. **DayOfWeek** 1 (Monday) - 7 (Sunday)
5. **DepTime** actual departure time (local, hhmm)
6. **CRSDepTime** scheduled departure time (local, hhmm)
7. **ArrTime** actual arrival time (local, hhmm)
8. **CRSArrTime** scheduled arrival time (local, hhmm)
9. **UniqueCarrie**r unique carrier code
10. **FlightNum** flight number
11. **TailNum** plane tail number: aircraft registration, unique aircraft identifier
12. **ActualElapsedTime** in minutes
13. **CRSElapsedTime** in minutes
14. **AirTime** in minutes
15. **ArrDelay** arrival delay, in minutes: A flight is counted as "on time" if it operated less than 15 minutes later the scheduled time shown in the carriers' Computerized Reservations Systems (CRS).
16. **DepDelay** departure delay, in minutes
17. **Origin** origin IATA airport code
18. **Dest** destination IATA airport code
19. **Distance** in miles
20. **TaxiIn** taxi in time, in minutes
21. **TaxiOut** taxi out time in minutes
22. **Cancelled** *was the flight cancelled
23. **CancellationCode** reason for cancellation (A = carrier, B = weather, C = NAS, D = security)
24. **Diverted** 1 = yes, 0 = no
25. **CarrierDelay** in minutes: Carrier delay is within the control of the air carrier. Examples of occurrences that may determine carrier delay are: aircraft cleaning, aircraft damage, awaiting the arrival of connecting passengers or crew, baggage, bird strike, cargo loading, catering, computer, outage-carrier equipment, crew legality (pilot or attendant rest), damage by hazardous goods, engineering inspection, fueling, handling disabled passengers, late crew, lavatory servicing, maintenance, oversales, potable water servicing, removal of unruly passenger, slow boarding or seating, stowing carry-on baggage, weight and balance delays.
26. **WeatherDelay** in minutes: Weather delay is caused by extreme or hazardous weather conditions that are forecasted or manifest themselves on point of departure, enroute, or on point of arrival.
27. **NASDelay** in minutes: Delay that is within the control of the National Airspace System (NAS) may include: non-extreme weather conditions, airport operations, heavy traffic volume, air traffic control, etc.
28. **SecurityDelay** in minutes: Security delay is caused by evacuation of a terminal or concourse, re-boarding of aircraft because of security breach, inoperative screening equipment and/or long lines in excess of 29 minutes at screening areas.
29. **LateAircraftDelay** in minutes: Arrival delay at an airport due to the late arrival of the same aircraft at a previous airport. The ripple effect of an earlier delay at downstream airports is referred to as delay propagation

Read the CSV file using Spark's default delimiter (","). The first line contains the headers so it is not part of the data. Hence we set the header option to true.

In [None]:
# This does nothing: Spark is lazy so the read operation will be deferred until an action is executed
flightsDF = spark.read.option("header", "true").csv("flights_jan08.csv")

In [None]:
# Let's see the schema (column types) of the data. This is just metadata, so it is not an action
flightsDF.printSchema()

Everything is a string because we did not specify the data type for each column, neither asked Spark to try to infer the schema of the data. We do not want all columns to be strings because there are a few numeric ones that should be treated as such. Let's request Spark to infer the schema, based on the data types found. Keep in mind this is slower than directly providing a schema for our data, which is the recommended option if we know our data and are sure of the data type each column should have. We will demonstrate this later in this notebook.

In [None]:
flightsDF = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("flights_jan08.csv")

flightsDF.printSchema()

Things look better now. There are a few integer columns involving the year, the month, the day of month and day of the week, as well as distances, flight number and so on.

In [None]:
# Let's run an action on this, such as printing the first few rows to execute the DAG until now (which has just one step - the read)
flightsDF.show(3)

## Basic operations with DataFrames

### Counting the number of rows

One of the very first things we want to know about our data is how big they are in terms of rows and columns. How many examples we have and how many variables we are dealing with. Since we will be performing multiple operations with our flightsDF DataFrame, let's `cache`() it so that Spark keeps it in memory instead of just freeing the memory after each action.

In [None]:
# Extract the column names. This is just metadata
print("Our data have", len(flightsDF.columns), "columns")

flightsDF.cache()        # This does nothing, but Spark takes note to keep the DF in memory after the first time it is materialized
rows = flightsDF.count() # This is an action and therefore, flightsDF will be materialized
print("Our data have", rows, "rows")

So we have 100,000 rows and 29 variables. Not bad! Let's start looking at some simple transformations we can do with our data.

### Select columns by name

In [None]:
flightsDF.select("Year", "Month", "DayofMonth", "ArrTime", "FlightNum")\
         .show() # column names are case-sensitive

### Filter (retain) rows according to the values of one or more columns

Since most of the DataFrame transformations are defined in package `pyspark.sql.functions`, it is common to import the entire package with an alias like `F`, instead of importing each individual function. Then, we use `F.` before each function to tell
python where to look for that function.

In [None]:
import pyspark.sql.functions as F  

# function col is used to indicate that we are referring to a column whose name is the argument

flightsJanuary20 = flightsDF\
                      .where(F.col("DayofMonth") == 20)\
                      .select("Year", "Month", "ArrTime") # this does not launch anything because Spark is lazy

# Let's see how many flights we have on January 20, 2008
# count() is an action so it launches the computations. If we hadn't cached the data, it would read the CSV again.
rowsJanuary20 = flightsJanuary20.count()
print("We have", rowsJanuary20, "flights that traveled on January 20, 2008")

# This is another action on the flightsJanuary DataFrame. It is NOT cached, 
# hence the "where" and "select" operations are launched again.
flightsJanuary20.show(3)

Don't forget to cache your DataFrame if you will be doing multiple operations on it, or you will be re-doing 
the same computations many times!

In [None]:
# filter() is an alias for where() and they both do exactly the same:
flightsDF.filter(F.col("DayofMonth") == 20)

# It is interesting to note that you can indicate the filter condition just as a SQL fragment:
flightsJanuary31 = flightsDF.where("DayofMonth = 31")
flightsJanuary31.count()

<div class="alert alert-block alert-info">
<b>REMEMBER:</b> Spark is doing these operations in a distributed fashion in the cluster, so each node of the cluster is filtering rows locally among those present in that node and counts the rows retained in that node. Every node (more precisely, every <i>executor</i>) sends the count to the driver where the partial results are aggregated to obtain the final count.
</div>

<div class="alert alert-block alert-success">
<b>YOUR TURN</b>: show the arrival delay, origin and destination airports of those flights scheduled on Sundays having an 
arrival delay greater than 15 minutes. Check the schema again for the exact name of those columns.
</div>

In [None]:
# This is a complex condition consisting of two conditions that must be fulfilled simultaneously
delayedOnSundaysDF = flightsDF.where((<FILL IN HERE>) &
                                     (<FILL IN HERE>))\
                              .select(<FILL IN HERE>)\
                              .show()

delayedOnSundaysDF.show()

### Selecting unique rows

We might be interested to know how many distinct values a column has, or even how many distinct rows our complete dataset has. If we consider all the columns together, it is unlikely that we have two rows that are exactly the same, but if we select only a few columns, then it is much more likely to have two or more rows having the same value in all the selected columns.

In [None]:
distinctFlights = flightsDF.distinct()  # distinct is a transformation returning a new DataFrame without duplicated rows
distinctFlightsCount = distinctFlights.count()

print("There are", distinctFlightsCount, "distinct rows")

As we could expect, there are no two rows with the exact same values. However, if we select only two columns, say `Origin` and `Dest`, then we get a DataFrame of only two columns which may have repeated rows as there are multiple flights having the same origin and destination, although they differ in other columns like the date, flight number, etc, but those columns are not in the subset.

In [None]:
flightsDF.select("Origin", "Dest").show()

<div class="alert alert-block alert-success">
<b>YOUR TURN</b>: let's see how many combinations we have of Origin and Dest, i.e. how many distinct routes there are
</div>

**Answer in the next cell:**

In [None]:
# Let's see how many combinations we have of Origin and Dest
distinctFlightsOriginDest = flightsDF.<FILL IN HERE>(<FILL IN HERE>).<FILL IN HERE>

originDestCombinations = distinctFlightsOriginDest.<FILL IN HERE>

print("There are", originDestCombinations, "combinations of an origin airport and a dest airport")

Although we have 100,000 rows, there are only 1009 distinct combinations of an origin an a destination airport.

If we select only one column and do the same, we get the number of distinct values of that column. Let's do that for the origin airport.

In [None]:
distinctOrigins = flightsDF.<FILL IN HERE>.....
print("There are", distinctOrigins, "airports from which flights may depart")

<div class="alert alert-block alert-success">
<b>YOUR TURN</b>: let's do this for every column, in a loop, in order to have an idea of the kind of data we have. Probably this makes sense for some columns (those that are categorical) and does not make sense for numeric ones since every row may have a different value. But let's try and explore the results.
</div>

In [None]:
for columnName in flights.columns:
    distinctValues = flights.select(<FILL IN HERE>)\
                            .<FILL IN HERE>()\
                            .<FILL IN HERE>()
    
    # Don't forget to indent this line to indicate it is also inside the loop
    print("There are", distinctValues, "distinct values in column", columnName)


### Creating or replacing a column by operating with existing columns

We have the flight distances in miles. Let's convert them to kilometers. 1 mile equals 1.61 kilometers so we just need to multiply the miles by 1.61. Again, each node will do this operation locally over the rows located in that node. No data movement, no information needed about the data of other nodes.

In [None]:
# withColumn is a transformation returing a new DataFrame with one extra column appended on the right
flightsWithKm = flightsDF.withColumn("DistanceKm", F.col("Distance") * 1.61)

flightsWithKm.printSchema()

flightsWithKm.show(3)

The day of the week is encoded as an integer variable. Using an integer or using a categorical variable is a decision that depends on how the variable will be used when fitting a model. Does it make sense to consider "greater" or "lower" days, i.e., something that increases or occurs more as the day of the week increases from Monday to Sunday? Just for demonstration, we are going to **replace** column DayOfWeek by a string with the day of the week, from Monday to Sunday. We just call `withColumn` but pass the name of an existing column. In that case we are replacing a column, not  creating a new one.

In [None]:
flightsCategoricalDay = flightsDF.withColumn("DayOfWeek", F.when(F.col("DayOfWeek") == 1, "Monday")\
                                                           .when(F.col("DayOfWeek") == 2, "Tuesday")\
                                                           .when(F.col("DayOfWeek") == 3, "Wednesday")\
                                                           .when(F.col("DayOfWeek") == 4, "Thursday")\
                                                           .when(F.col("DayOfWeek") == 5, "Friday")\
                                                           .when(F.col("DayOfWeek") == 6, "Saturday")\
                                                           .otherwise("Sunday"))

flightsCategoricalDay.printSchema() # the column is still in the same position but has now string type

flightsCategoricalDay.select("DayOfWeek", "DepTime", "ArrTime").show()

<div class="alert alert-block alert-info">
<b>TIP:</b> The process of creating new variables (called <i>features</i>) from existing ones, or incorporating features from external data sources (sometimes public data), as well as cleaning or replacing features after applying some normalization technique is known as <b>feature engineering</b>. Sometimes it has a lot to do with domain-specific knowledge, but also with some mathematical and statistical tricks concerning feature normalization or well-kown general-purpose transformations.
</div>

Function `when` is very common to re-categorize a variable, or to create variables (especially categorical variables) whose values depend on one or multiple conditions on the values of other columns.

<div class="alert alert-block alert-success">
<b>YOUR TURN</b>: let's create a (categorical) string variable with two categories, indicating whether the day of the week was a working day ("working") or a weekend day ("weekend"). Use `withColumn` and `when` to create a new column with the name of your choice (should be clear and concise). A working day is that whose DayOfWeek is between 1 and 5, both included. Otherwise it is a weekend day.
</div>

In [None]:
flightsDayPart = flightsDF.withColumn("dayPart", F.when(<FILL IN HERE>, "morning")\
                                                  .when(<FILL IN HERE>)\
                                                  .when(<FILL IN HERE>)\
                                                  .otherwise("<FILL IN HERE>"))

### Creating new columns and selecting them on-the-fly

`withColumn` is not the only way to create columns. We can use `select` not only to select existing columns, but to create a new column and select it for the resulting DataFrame, all at the same time. Let's see an example: we are going to select the origin and destination airports, which already exist in our data, and a newly created column with the distance in kilometers. When selecting a new column, it is usual to give it a name using function `alias`. Otherwise, Spark will give it an auto-generated name from the operations employed (an ugly name like "1.6 * Distance")

In [None]:
flightsAirportsAndKm = flightsDF.select(F.col("Origin"),\
                                        F.col("Dest"),\
                                        (1.6 * F.col("Distance")).alias("DistanceKm"))

flightsAirportsAndKm.show(5)

<div class="alert alert-block alert-success">
<b>YOUR TURN</b>: create a new DataFrame of three columns by selecting columns "FlightNum", "TailNum" and a newly created string column by concatenating the contents of column "FlightNum" and "TailNum" with a "-". Use function <i>concat_ws</i>(concatenate with separator) with the syntax concat_ws("-", column1, column2) from pyspark.sql.functions.
</div>

In [None]:
flightsNumTail = flightsDF.select(<FILL IN HERE>,
                                  <FILL IN HERE>,
                                  F.concat_ws("-", <FILL IN HERE>).alias("<FILL IN HERE>"))

### Casting incorrectly parsed columns to the correct data type

This is very important. Most often, Spark will not correctly infer the data type of every column. Something very common is that a column that should be numerical is not recognized as such, due to the data having a label like "NA" that was meant to indicate a missing value. However, Spark does not acknowledge the string "NA" as representing a missing value so it infers string type for a column whenever it finds a mixture of numbers and strings. For that reason, columns like `ArrDelay` and `DepDelay` have been inferred as String while they are numeric.

In [None]:
naCount = flightsDF.where("ArrDelay = 'NA'").count()
# The where function could also have been specified as .where(F.col("ArrDelay") == "NA"). Both are equivalent.

print("There are ", naCount, "rows with NA in ArrDelay")

<div class="alert alert-block alert-success">
<b>YOUR TURN</b>: replace flightsDF by the result of selecting the rows in which ArrDelay is not 'NA' AND DepDelay is not 'NA'. Use a where operation with a condition containing the & of two simple conditions. Then, cast ArrDelay and DepDelay to integer using the code provided. NOTE: operator "not equals" is != in python.
</div>

In [None]:
from pyspark.sql.types import IntegerType

flightsDF = flightsDF.where((<FILL IN HERE>) &\
                            (<FILL IN HERE>))\
                     .withColumn("ArrDelay", F.col("ArrDelay").cast(IntegerType()))\
                     .withColumn("DepDelay", F.col("DepDelay").cast(IntegerType()))

### Aggregation functions on the whole DataFrame

Spark provides distributed implementations of common aggregation measures such as the mean, min, max, and standard deviation among others. All the functions receive a single argument that should be the column where the function must be applied.

Let's select the flights with an ArrDelay greater than 15 minutes and from them, let's show the min, max, mean and standard deviation of the `ArrDelay`. We will do it by creating and selecting those columns at once.

In [None]:
# First we select those flights with at least 16 minutes of delay and then compute the aggregations
flightsDF.where(F.col("ArrDelay") > 15)\
          .select(F.mean("ArrDelay").alias("MeanArrDelay"),\
                  F.min("ArrDelay").alias("MinArrDelay"),\
                  F.max("ArrDelay").alias("MaxArrDelay"),
                  F.stddev("ArrDelay").alias("StddevArrDelay")).show()

There is a Spark function that already does this for us, called `summary`. It does it for every numeric column it finds in the dataset. It is a transformation so it returns a new DataFrame that contains the summaries.

In [None]:
summariesDF = flightsDF.summary()
summariesDF.show()

### Casting a Spark DataFrame to a Pandas dataframe

It is difficult to understand the results. Hence we are going to convert this Spark DataFrame to a Pandas dataframe which has a pretty output when printing and it is easier to visualize its contents. 

<div class="alert alert-block alert-info">
<b>TIP</b>: Note that the concept of dataframe as a table with rows and columns exists in many programming languages and libraries (Python, R, pyspark). However, Spark deals with physically distributed DataFrames, nothing to do with Python or R which run on a single machine. Converting a Spark DataFrame into a Pandas dataframe entails collecting all rows to the driver, which could potentially result in an Out-of-Memory exception since the complete contents may be larger than the RAM memory of a single machine where the driver process is running. In this case, we are collecting just the summaries, which are very small so there is no risk.
</div>

In [None]:
flightsDF.summary().toPandas()