# Porting an analysis from local to distributed

<a href = "http://yogen.io"><img src="http://yogen.io/assets/logo.svg" alt="yogen" style="width: 200px; float: right;"/></a>

Now comes the opportunity to put in practice what we have just learned!

# Guided exercise

Recreate the boxplot we did in the pandas section, in Spark!

Since matplotlib boxplot needs all the data and that would be unfeasible with Big Data, we will calculate the quartiles ourselves.

Once the analysis is ported, we will be able to run it on the whole historical series! You can find it at https://transtats.bts.gov (On time performance reporting carrier).

##  Workflow

The basic idea is the same that we applied in the Amadeus Challenge:

* Build prototype with small data: in this section, we will be using `06-intro_to_pandas_practical.ipynb` as our already made prototype

* Modify your prototype so that it works with Big Data: In this case, it means porting it to Spark

* Test your "Big Data" prototype with small data: We will first test it with a sample locally, then upload it to a cluster and test it with Big Data.

    * You can run your analyses building your own cluster and storage bucket in Google Cloud Storage. More in notebook #4!

* Run your prototype with Big Data.

    


## Modify the prototype so that it works with Big Data

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.uvigo.es/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar -xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark pyspark==2.4.6
import os
import findspark
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"
findspark.init()
spark = SparkSession.builder.master("local[*]").getOrCreate()

[K     |████████████████████████████████| 218.4MB 63kB/s 
[K     |████████████████████████████████| 204kB 44.0MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Read csv

We'll use the `SparkSession.read.csv` method.

In [None]:
!ls -lh 

total 252M
-rw-r--r--  1 root root  29M Jun 20 09:57 On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_12.zip
drwxr-xr-x  1 root root 4.0K Jun 17 16:18 sample_data
drwxr-xr-x 13 1000 1000 4.0K May 30 00:02 spark-2.4.6-bin-hadoop2.7
-rw-r--r--  1 root root 223M May 30 00:54 spark-2.4.6-bin-hadoop2.7.tgz


In [None]:
!unzip -o On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_12.zip

Archive:  On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_12.zip
  inflating: On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_12.csv  
  inflating: readme.html             


In [None]:
df = spark.read.csv('On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_12.csv', header=True, inferSchema=True)
df

DataFrame[Year: int, Quarter: int, Month: int, DayofMonth: int, DayOfWeek: int, FlightDate: timestamp, Reporting_Airline: string, DOT_ID_Reporting_Airline: int, IATA_CODE_Reporting_Airline: string, Tail_Number: string, Flight_Number_Reporting_Airline: int, OriginAirportID: int, OriginAirportSeqID: int, OriginCityMarketID: int, Origin: string, OriginCityName: string, OriginState: string, OriginStateFips: int, OriginStateName: string, OriginWac: int, DestAirportID: int, DestAirportSeqID: int, DestCityMarketID: int, Dest: string, DestCityName: string, DestState: string, DestStateFips: int, DestStateName: string, DestWac: int, CRSDepTime: int, DepTime: int, DepDelay: double, DepDelayMinutes: double, DepDel15: double, DepartureDelayGroups: int, DepTimeBlk: string, TaxiOut: double, WheelsOff: int, WheelsOn: int, TaxiIn: double, CRSArrTime: int, ArrTime: int, ArrDelay: double, ArrDelayMinutes: double, ArrDel15: double, ArrivalDelayGroups: int, ArrTimeBlk: string, Cancelled: double, Cancellati

In [None]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: timestamp (nullable = true)
 |-- Reporting_Airline: string (nullable = true)
 |-- DOT_ID_Reporting_Airline: integer (nullable = true)
 |-- IATA_CODE_Reporting_Airline: string (nullable = true)
 |-- Tail_Number: string (nullable = true)
 |-- Flight_Number_Reporting_Airline: integer (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- OriginAirportSeqID: integer (nullable = true)
 |-- OriginCityMarketID: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCityName: string (nullable = true)
 |-- OriginState: string (nullable = true)
 |-- OriginStateFips: integer (nullable = true)
 |-- OriginStateName: string (nullable = true)
 |-- OriginWac: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DestAirportSe

## Select relevant columns

Literally the same syntax as Pandas!

```python
df = df.select(['FlightDate', 'DayOfWeek', 'Reporting_Airline', 'Tail_Number', 'Flight_Number_Reporting_Airline', 'Origin', 
                'OriginCityName', 'OriginStateName', 'Dest', 'DestCityName', 'DestStateName',
                'DepTime', 'DepDelay', 'AirTime', 'Distance'])

df
```

In [None]:
!head -n 2 On_Time_Reporting_Carrier_On_Time_Performance_\(1987_present\)_2018_12.csv

"Year","Quarter","Month","DayofMonth","DayOfWeek","FlightDate","Reporting_Airline","DOT_ID_Reporting_Airline","IATA_CODE_Reporting_Airline","Tail_Number","Flight_Number_Reporting_Airline","OriginAirportID","OriginAirportSeqID","OriginCityMarketID","Origin","OriginCityName","OriginState","OriginStateFips","OriginStateName","OriginWac","DestAirportID","DestAirportSeqID","DestCityMarketID","Dest","DestCityName","DestState","DestStateFips","DestStateName","DestWac","CRSDepTime","DepTime","DepDelay","DepDelayMinutes","DepDel15","DepartureDelayGroups","DepTimeBlk","TaxiOut","WheelsOff","WheelsOn","TaxiIn","CRSArrTime","ArrTime","ArrDelay","ArrDelayMinutes","ArrDel15","ArrivalDelayGroups","ArrTimeBlk","Cancelled","CancellationCode","Diverted","CRSElapsedTime","ActualElapsedTime","AirTime","Flights","Distance","DistanceGroup","CarrierDelay","WeatherDelay","NASDelay","SecurityDelay","LateAircraftDelay","FirstDepTime","TotalAddGTime","LongestAddGTime","DivAirportLandings","DivReachedDest","DivAc

In [None]:
df = df[['FlightDate', 'DayOfWeek', 'Reporting_Airline', 'Tail_Number', 'Flight_Number_Reporting_Airline', 'Origin', 
                'OriginCityName', 'OriginStateName', 'Dest', 'DestCityName', 'DestStateName',
                'DepTime', 'DepDelay', 'AirTime', 'Distance']]
 
df

DataFrame[FlightDate: timestamp, DayOfWeek: int, Reporting_Airline: string, Tail_Number: string, Flight_Number_Reporting_Airline: int, Origin: string, OriginCityName: string, OriginStateName: string, Dest: string, DestCityName: string, DestStateName: string, DepTime: int, DepDelay: double, AirTime: double, Distance: double]

### Extract "Hour" variable

The DepTimes have been inferred to be floats. We need them as ints, representing each o fthe 24 hours in a day.

In [None]:
df[['DepTime']].show(6)

+-------+
|DepTime|
+-------+
|   1048|
|    638|
|   1710|
|   1318|
|    953|
|   1646|
+-------+
only showing top 6 rows



In [None]:
from pyspark.sql import functions as f

def hour_str(hour):
  return str(hour)[:-2]

hour_str(755)

hour_str = f.udf(lambda hour: str(hour)[:-2])
df.select(hour_str('DepTime')).show(5)

+-----------------+
|<lambda>(DepTime)|
+-----------------+
|               10|
|                6|
|               17|
|               13|
|                9|
+-----------------+
only showing top 5 rows



In [None]:
df.select('DepTime')

DataFrame[DepTime: int]

In [None]:
df['DepTime']

Column<b'DepTime'>

In [None]:
from pyspark.sql import types

df.select(df['DepTime'].astype(types.StringType())[0:-2]).show()

+-----------------------------------------+
|substring(CAST(DepTime AS STRING), 0, -2)|
+-----------------------------------------+
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                                         |
|                               

In [None]:
df2 = df.withColumn('Hour', (df['DepTime'] / 100).cast(types.IntegerType()))
df2.show()

## Generate the relative distributions

In order to be able to handle the data, we need to reduce its dimensionality. Since we want to describe a discrete distribution, we can just count how many values of each level of the 'DepDelay' variable we find for each hour (24 different discrete distributions). We also want the totals in order to do the relative distribution.

### Totals

In [None]:
totals_per_hour = df2.groupby('Hour').count()
totals_per_hour.show()

+----+-----+
|Hour|count|
+----+-----+
|  12|36925|
|  22|15646|
|null| 6526|
|   1|  846|
|  13|33163|
|   6|37398|
|  16|34761|
|   3|  200|
|  20|28354|
|   5|22597|
|  19|30966|
|  15|35691|
|  17|36591|
|   9|34355|
|   4| 1374|
|   8|36357|
|  23| 5835|
|   7|35336|
|  10|34600|
|  24|   40|
+----+-----+
only showing top 20 rows



### Distributions

In [None]:
per_delay_per_hour = df2.groupBy(['Hour', 'DepDelay']).count()
per_delay_per_hour.show()

+----+--------+-----+
|Hour|DepDelay|count|
+----+--------+-----+
|  17|     8.0|  388|
|  13|    19.0|  197|
|  22|     9.0|  158|
|  16|    76.0|   31|
|  17|    72.0|   32|
|   6|    89.0|    4|
|  23|   108.0|   14|
|  22|    73.0|   24|
|  16|    84.0|   33|
|  18|   -11.0|  435|
|  17|    86.0|   21|
|   0|    95.0|   10|
|  10|    70.0|   18|
|  20|   173.0|    9|
|   7|    52.0|   22|
|  20|   226.0|    4|
|   4|    -8.0|   74|
|   8|   178.0|    2|
|  13|   211.0|    2|
|  22|   383.0|    1|
+----+--------+-----+
only showing top 20 rows



Now we join both and calculate what fraction of the total for each hour each level of DepDelay represents.

In [None]:
with_totals = per_delay_per_hour.join(totals_per_hour, on='Hour')
with_totals.show()

+----+--------+-----+-----+
|Hour|DepDelay|count|count|
+----+--------+-----+-----+
|  17|     8.0|  388|36591|
|  13|    19.0|  197|33163|
|  22|     9.0|  158|15646|
|  16|    76.0|   31|34761|
|  17|    72.0|   32|36591|
|   6|    89.0|    4|37398|
|  23|   108.0|   14| 5835|
|  22|    73.0|   24|15646|
|  16|    84.0|   33|34761|
|  18|   -11.0|  435|33335|
|  17|    86.0|   21|36591|
|   0|    95.0|   10| 2283|
|  10|    70.0|   18|34600|
|  20|   173.0|    9|28354|
|   7|    52.0|   22|35336|
|  20|   226.0|    4|28354|
|   4|    -8.0|   74| 1374|
|   8|   178.0|    2|36357|
|  13|   211.0|    2|33163|
|  22|   383.0|    1|15646|
+----+--------+-----+-----+
only showing top 20 rows



In [None]:
with_totals['count'] / with_totals['count']

AnalysisException: ignored

In [None]:
with_totals.select(per_delay_per_hour['count'] / totals_per_hour['count'])

DataFrame[(count / count): double]

In [None]:
relative_freqs = with_totals.withColumn('relative', per_delay_per_hour['count'] / totals_per_hour['count'])
relative_freqs

DataFrame[Hour: int, DepDelay: double, count: bigint, count: bigint, relative: double]

In [None]:
relative_freqs[relative_freqs['Hour']==20].show()

+----+--------+-----+-----+--------------------+
|Hour|DepDelay|count|count|            relative|
+----+--------+-----+-----+--------------------+
|  20|   173.0|    9|28354|3.174155322000423E-4|
|  20|   226.0|    4|28354|1.410735698666854...|
|  20|   530.0|    1|28354|3.526839246667136...|
|  20|    82.0|   32|28354|0.001128588558933...|
|  20|   218.0|    3|28354|1.058051774000141E-4|
|  20|   205.0|    4|28354|1.410735698666854...|
|  20|   237.0|    2|28354|7.053678493334273E-5|
|  20|   103.0|   19|28354| 6.70099456866756E-4|
|  20|   160.0|   11|28354|3.879523171333850...|
|  20|   224.0|    2|28354|7.053678493334273E-5|
|  20|   343.0|    3|28354|1.058051774000141E-4|
|  20|   118.0|   24|28354|8.464414192001129E-4|
|  20|   449.0|    1|28354|3.526839246667136...|
|  20|   465.0|    1|28354|3.526839246667136...|
|  20|    71.0|   31|28354|0.001093320166466...|
|  20|   -20.0|   19|28354| 6.70099456866756E-4|
|  20|   107.0|   15|28354|5.290258870000705E-4|
|  20|    85.0|   29

### Generate distributions

We have to group on the hour. Each group will be a bunch of delays and the corresponding frequencies.

In [None]:
distributions = relative_freqs.groupby('Hour').agg(f.collect_list('relative'), f.collect_list('DepDelay')).cache()
distributions.show()

+----+----------------------+----------------------+
|Hour|collect_list(relative)|collect_list(DepDelay)|
+----+----------------------+----------------------+
|  12|  [2.70819228165199...|  [1315.0, 280.0, 3...|
|  22|  [0.01009842771315...|  [9.0, 73.0, 383.0...|
|   1|  [0.00118203309692...|  [270.0, 223.0, 30...|
|  13|  [0.00594035521514...|  [19.0, 211.0, -18...|
|  16|  [8.91804033255660...|  [76.0, 84.0, 289....|
|   6|  [1.06957591315043...|  [89.0, 678.0, 540...|
|   3|  [0.005, 0.01, 0.0...|  [157.0, 11.0, 139...|
|  20|  [3.17415532200042...|  [173.0, 226.0, 53...|
|   5|  [4.42536619905297...|  [64.0, 42.0, -19....|
|  19|  [2.90641348575857...|  [131.0, 330.0, 14...|
|  15|  [2.80182679106777...|  [238.0, 47.0, 584...|
|  17|  [0.01060370036347...|  [8.0, 72.0, 86.0,...|
|   9|  [5.82156891282200...|  [170.0, 60.0, 67....|
|   4|  [0.05385735080058...|  [-8.0, 3.0, 24.0,...|
|   8|  [5.50100393321781...|  [178.0, 1236.0, 1...|
|  23|  [0.00239931448157...|  [108.0, 129.0, 

These groups are definitely manageable: the number of levels will be on the order of a few hundreds to a couple thousands. We can combine them into lists straight away.

In [None]:
rels = [.05, 0.14, .2, .07, .15, .31, .08]
delays = [ 0, 2, -3, 1, -5, 3, 4 ]

def quartiles(rels, delays):

  accumulated = 0
  result = []

  for delay, relative in sorted(zip(delays, rels)):
    prev = accumulated
    accumulated += relative

    if prev == 0 and accumulated >= 0:
      result.append(delay)
    if prev < .25 and accumulated >= .25:
      result.append(delay)
    if prev < .5 and accumulated >= .5:
      result.append(delay)
    if prev < .75 and accumulated >= .75:
      result.append(delay)

  result.append(delay)
  
  return result

quartiles(rels, delays)

[-5, -3, 2, 3, 4]

Now it's be easy to use a UDF to merge the two lists and sort them.

Careful! If we keep that string return type, it might be problematic later.

### Calculating the quartiles

We are finally ready to calculate the quartiles! We will use a UDF.

The input to our custom function will be one of the distributions coded like we did: as a list of tuples `(value, relative_frequency)`. The quartiles are defined as the values at which we cross the 0.0, .25, .5, .75 and 1.00 relative frequencies. Since the distributions are ordered, we can just iterate over one while keeping track of what portion of the total distribution we have seen, and annotate where we cross the thresholds.

In [None]:
quartiles_udf = f.udf(quartiles)

distributions.withColumn('quartiles', quartiles_udf('collect_list(relative)', 'collect_list(DepDelay)')).show()

+----+----------------------+----------------------+--------------------+
|Hour|collect_list(relative)|collect_list(DepDelay)|           quartiles|
+----+----------------------+----------------------+--------------------+
|  12|  [2.70819228165199...|  [1315.0, 280.0, 3...|[-38.0, -5.0, -2....|
|  22|  [0.01009842771315...|  [9.0, 73.0, 383.0...|[-39.0, -5.0, 0.0...|
|   1|  [0.00118203309692...|  [270.0, 223.0, 30...|[-22.0, -3.0, 13....|
|  13|  [0.00594035521514...|  [19.0, 211.0, -18...|[-32.0, -5.0, -2....|
|  16|  [8.91804033255660...|  [76.0, 84.0, 289....|[-37.0, -5.0, -1....|
|   6|  [1.06957591315043...|  [89.0, 678.0, 540...|[-32.0, -6.0, -3....|
|   3|  [0.005, 0.01, 0.0...|  [157.0, 11.0, 139...|[-19.0, -7.0, -1....|
|  20|  [3.17415532200042...|  [173.0, 226.0, 53...|[-35.0, -5.0, 0.0...|
|   5|  [4.42536619905297...|  [64.0, 42.0, -19....|[-24.0, -7.0, -4....|
|  19|  [2.90641348575857...|  [131.0, 330.0, 14...|[-32.0, -5.0, -1....|
|  15|  [2.80182679106777...|  [238.0,

Apply to the dataframe:

### Plotting

We got it! Let's move this over to Pandas for convenient handling

And we are ready to plot!

## Test your "Big Data" prototype with small data

### Summary

This is the whole process, collected in one place as is:

### Pyspark job

In order to run the process in a cluster, we need to transform it into a pyspark job file. 

We need to tidy up the function definitions, add the relevant imports, and modify the input and output to use command-line arguments.

We will put the result in a file called mysparkjob.py:

```python
from __future__ import print_function
from pyspark.sql import types, functions, SparkSession
import sys

def zipsort(a, b):
    return sorted(zip(a, b))

def quartiles(histogram):
    area = 0
    result = []
    
    for value, percentage in histogram:
        if area == 0:
            result.append(value)
        elif area <= .25 and area + percentage > .25:
            result.append(value)
        elif area <= .5 and area + percentage > .5:
            result.append(value)
        elif area <= .75 and area + percentage > .75:
            result.append(value)
        area += percentage
    
    result.append(value)
    return result

if __name__=='__main__':
    
    file = sys.argv[1]
    out = sys.argv[2]
    
    spark = SparkSession.builder.getOrCreate()
    df = spark.read.csv(file, header= True, inferSchema=True)
    df = df.select(['FlightDate', 'DayOfWeek', 'Reporting_Airline', 'Tail_Number', 'Flight_Number_Reporting_Airline', 'Origin', 
                    'OriginCityName', 'OriginStateName', 'Dest', 'DestCityName', 'DestStateName',
                    'DepTime', 'DepDelay', 'AirTime', 'Distance'])

    df2 = df.withColumn('Hour', (df['DepTime'] / 100).cast(types.IntegerType()))
    totals = df2.groupBy('Hour').count()
    distributions = df2.groupBy(['Hour', 'DepDelay']).count()
    annotated = distributions.join(totals, on='Hour')
    frequencies = annotated.withColumn('relative', distributions['count'] / totals['count'])
    groups = frequencies.groupBy(totals['Hour'])\
                        .agg(functions.collect_list('DepDelay').alias('delays'),
                             functions.collect_list('relative').alias('relatives'))



    zipsort_typed = functions.udf(zipsort, types.ArrayType(types.ArrayType(types.FloatType())))
    distributions = groups.withColumn('distributions', zipsort_typed('delays', 'relatives'))



    quartiles_udf = functions.udf(quartiles, returnType=types.ArrayType(types.FloatType()))

    result = distributions.select('Hour',
                                  quartiles_udf('distributions').alias('quartiles'))

    result.write.json(out)
    spark.stop()
```

### Running with spark-submit

If the following works, we are ready to test it in the cluster!

```python
unset PYSPARK_DRIVER_PYTHON
spark-submit mysparkjob.py On_Time_On_Time_Performance_2015_8.csv out.csv
```