## Docker Spark setup

This notebook is meant to run on a spark 2 docker container. First i'll describe the steps to set it up.

On a Linux based system install Docker and Docker-compose.Create this file : docker-compose.yml. The contents is listed below. Notice originally both spark master and worker use the singularities/spark im image. Spark-image is the new new after docker commit statements.  Then run: docker-compose build . Afterwards run this command : docker-compose build -d



```
version: "2"

services:
  master:
    image: spark-michel
    command: start-spark master
    hostname: master
    ports:
      - "6066:6066"
      - "7070:7070"
      - "8080:8080"
      - "50070:50070"
      - "8888:8888"
    volumes:
      - /Users/michelnossin/Downloads/fr24:/root/fr24
  worker:
    image: singularities/spark
    command: start-spark worker master
    environment:
      SPARK_WORKER_CORES: 1
      SPARK_WORKER_MEMORY: 2g
    links:
      - master

```

```
With docker ps , check if the master and worker containers are running.
Connect to the master node:
docker exec -it [container id master] bash
On the master node continue with setting up as described below.
```

## Spark and conda env setup

```
First install Anaconda 4 (latest version) on the Docker container with Spark Master. Then install a new Conda environment for Spark, using python 3.5 (3.6 has a bug).  

conda create -n spark python=3.5
source activate spark
conda install notebook ipykernel
ipython kernel install --user --name spark --display-name spark

Make jupyter start script, and run it:
PYSPARK_PYTHON=/root/anaconda3/envs/spark/bin/python
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip=0.0.0.0 --port=8888' $SPARK_HOME/bin/pyspark

Now go to the url it gives (http://0.0.0.0:8888/<some code>)
, Run the nodebook sections.
```


In [1]:
#Start this in spark conda env to test
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
import pyspark.sql.functions as fn
import pyspark.sql.types as typ

## Example data

```
This example works if you clone https://github.com/PacktPublishing/Learning-PySpark

and make sure its in /root/learningPySpark on the Docker container with Spark Master. 

To install git on this container run command: apt-get install git
, on github (or bitbucket) create a repository so you can save changes from the container and push it to Github. Use the following commands on the Docker container to init and push the data :

git init
git add <your file>
git commit -m "first commit"
git remote add origin https://github.com/michelnossin/pyspark_training_docker.git
git push -u origin master
```

In [2]:
#RANDOM FLIGHTS SET, AND WORLD AIRPORT SET
flights = "file:/root/learningPySpark/Chapter03/flight-data/departuredelays.csv" 
airports = "file:/root/learningPySpark/Chapter03/flight-data/airport-codes-na.txt" 
airports_df = spark.read.csv(airports,header='true',inferSchema='true',sep='\t')
airports_df.createOrReplaceTempView("airports")
flights_df = spark.read.csv(flights,header='true')
flights_df.createOrReplaceTempView("flights")
flights_df.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [3]:
#RADAR TRACK
track_file = "file:/root/fr24/fr24_20160624.csv"
header=['na','altitude','dest','heading','flight','fltid','landed','time','lat',\
         'lon','na3','org','na4','registration','flight2','speed','na6','planetype', 'altitude_delta']
fields = [ *[
           typ.StructField(h, typ.StringType(), True)
           for h in header
       ]
   ]
schema = typ.StructType(fields)
schema   
tracks_df = spark.read.csv(track_file,header='false',schema=schema)

#filter tracks early to make it speed up
tracks_df = tracks_df.where("dest == 'AMS'") #14 milj -> 114k
tracks_df.createOrReplaceTempView("tracks")
tracks_df.cache()

DataFrame[na: string, altitude: string, dest: string, heading: string, flight: string, fltid: string, landed: string, time: string, lat: string, lon: string, na3: string, org: string, na4: string, registration: string, flight2: string, speed: string, na6: string, planetype: string, altitude_delta: string]

```
Alternatively use correct schema from beginning:
import pyspark.sql.types as typ
   labels = [
       ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
       ('BIRTH_PLACE', typ.StringType()),
       ('MOTHER_AGE_YEARS', typ.IntegerType()),
       ('FATHER_COMBINED_AGE', typ.IntegerType()),
       ('CIG_BEFORE', typ.IntegerType()),
       ('CIG_1_TRI', typ.IntegerType()),
       ('CIG_2_TRI', typ.IntegerType()),
       ('CIG_3_TRI', typ.IntegerType()),
       ('MOTHER_HEIGHT_IN', typ.IntegerType()),
       ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
       ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
       ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
       ('DIABETES_PRE', typ.IntegerType()),
       ('DIABETES_GEST', typ.IntegerType()),
       ('HYP_TENS_PRE', typ.IntegerType()),
       ('HYP_TENS_GEST', typ.IntegerType()),
       ('PREV_BIRTH_PRETERM', typ.IntegerType())
   ]
   schema = typ.StructType([
       typ.StructField(e[0], e[1], False) for e in labels
   ])
   births = spark.read.csv('births_transformed.csv.gz',
                           header=True,
                           schema=schema)
```

## First look at data:

```
source activate spark
python -m pip install pandas
```

In [None]:
import pandas as pd
import matplotlib

In [None]:
spark.sql("select count(1) from flights").show()
spark.sql("select count(1) from airports").show()
spark.sql("select count(1) from tracks").show()

In [None]:
flights_df.limit(5).toPandas()

In [None]:
airports_df.limit(5).toPandas()

In [None]:
tracks_df.limit(5).toPandas()

In [None]:
airports_df.printSchema()

In [None]:
flights_df.printSchema() #date, delay and distance should change to int

In [None]:
tracks_df.printSchema()

## Cleaning data

Your data can be stained with duplicates, missing observations and outliers, non- existent addresses, wrong phone numbers and area codes, inaccurate geographical coordinates, wrong dates, incorrect labels, mixtures of upper and lower cases, trailing spaces, and many other more subtle problems. It is your job to clean it, irrespective of whether you are a data scientist or data engineer,

### Duplicate rows check and remove
First lets define some our spark util functions

In [4]:
def showDuplicateRowsCount(df):
    'Show row count with full duplicated rows'
    print("====Checking table duplicate rows =====")
    print('Count of rows: {0}'.format(df.count()))
    print('Count of distinct rows: {0}'.format(df.distinct().count()))
    print('===> nr of duplicate rows {0}'.format(df.count()-df.distinct().count()))
def showDuplicatesColumnCount(df,col):
    'Show duplicate rows based on a specific (id) col.'
    print("=====Checking col {0}".format(col))
    print('Count of values: {0}'.format(df.count()))
    distinct_col_count = df.select([
           c for c in df.columns if c != col
       ]).distinct().count()
    print('Count of distinct column values: {0}'.format(distinct_col_count))
    print ("====> duplicate count {0}".format(df.count() - distinct_col_count))
def showDuplicatesColumnCountSpark(df,col):
    'spark version of Showduplicatescolumncount()'
    df.agg(
       fn.count(col).alias('count'),
       fn.countDistinct(col).alias('distinct')
    ).show()
def showDuplicateColumnsCount(df):
    'Show duplicate rows based on all columns in a dataframe'
    for col in df.columns:
        showDuplicatesColumn(df,col)
def dropDuplicateColumn(df,col):
    'drop rows with duplicate columns based on certain (id) column'
    df = df.dropDuplicates(subset=[
       c for c in df.columns if c != col
    ])
#   
#def getDFDuplicateColumns(df,col,new_col):
#    uniq_df = df.select([
#           c for c in df.columns if c != col
#       ]).distinct()
#    duplicate_df = df.subtract(uniq_df)
#    
#    return(duplicate_df.withColumn(new_col, \
#                            fn.monotonically_increasing_id()))
#   **/ 
def showMissingDataPercent(df_miss):
    'show each column and percentage of missing data, 0 - 1 , 0 means no missing data'
    df_miss.agg(*[
       (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing')
       for c in df_miss.columns
    ]).show()
def getDFDropColumn(df_miss,col):
    'Get a new dataframe based on another without given column'
    return(df_miss.select([
       c for c in df_miss.columns if c != col
    ]))
def getDFDropMissingRows(df_miss):
    'Drop rows with any missing column field'
    return(df_miss.dropna())
def fillMissingMeanColumn(df,col):
    'Fill in missing values in a certain column containing numerical data'
    means = df.agg(
       *[fn.mean(col).alias(col)
           for c in df.columns if c != col]
   ).toPandas().to_dict('records')[0]
def getDFFillMissingCategoryColumn(df,col):
    'Fill in missing values in a column containing a category and return df'
    miss_dict = {col: "missing"}
    return(df.fillna(miss_dict))
def getDictOutliers(df_outliers,col_list):
    'return dictionary with outliers boundaries , based on columns in list'
    bounds = {}
    for col in col_list:
        quantiles = df_outliers.approxQuantile(
           col, [0.25, 0.75], 0.05
       )
        IQR = quantiles[1] - quantiles[0]
        bounds[col] = [
           quantiles[0] - 1.5 * IQR,
           quantiles[1] + 1.5 * IQR
     ]
    return bounds
def getDFOutliers(df_outliers,bounds,cols,id_col):
    'print all outlier rows based on dictionary with outlier bounderies dict, for columns in column list'
    outliers = df_outliers.select(*[id_col] + [
       (
           (df_outliers[c] < bounds[c][0]) |
           (df_outliers[c] > bounds[c][1])
       ).alias(c + '_o') for c in cols
    ])
    return outliers

    

In [None]:
#Check duplicates rows, same value?
flights_df = spark.sql("select * from flights") #507 out of 1.4 milj
showDuplicateRowsCount(flights_df)
airports_df = spark.sql("select * from airports") #0
showDuplicateRowsCount(airports_df)
tracks_df = spark.sql("select * from tracks") #192k out of 14m, 735 out of 114k after filtering for AMS arrival
showDuplicateRowsCount(tracks_df) #takes 10 mins

In [5]:
#Change type of integer based columns , so we check outliers later on
flights_df = flights_df.withColumn("delay",flights_df["delay"].cast(typ.IntegerType()))
flights_df = flights_df.withColumn("distance",flights_df["distance"].cast(typ.IntegerType()))
                   
tracks_df = tracks_df.withColumn("altitude",tracks_df["altitude"].cast(typ.IntegerType()))  
tracks_df = tracks_df.withColumn("altitude_delta",tracks_df["altitude_delta"].cast(typ.IntegerType()))
tracks_df = tracks_df.withColumn("speed",tracks_df["speed"].cast(typ.IntegerType()))      
tracks_df = tracks_df.withColumn("heading",tracks_df["heading"].cast(typ.IntegerType()))   
tracks_df = tracks_df.withColumn("lat",tracks_df["lat"].cast(typ.FloatType()))  
tracks_df = tracks_df.withColumn("lon",tracks_df["lon"].cast(typ.FloatType())) 
tracks_df = tracks_df.withColumn("time",tracks_df["time"].cast(typ.LongType())) 
tracks_df = tracks_df.withColumn("landed",tracks_df["landed"].cast(typ.IntegerType()))

#Lets add a id columns for the flights
flights_df = flights_df.withColumn('id',fn.monotonically_increasing_id())

In [6]:
#pure duplicates just drop these, but the flights tables might be different flights. We donts know without id
tracks_df =tracks_df.dropDuplicates()

### Duplicate columns check

Some times there are columns identifying a row, and which are different.
However in case you know the rest of the columns is the same you might want to remove these rows. eg , Michel , 1.90, hoofddorp , and michel2, 1.90, hoofddorp . Its the same person but id is incorrect. 

In [None]:
#airports IATA should be uniq. It seems 15 rows have identical data 
#but different IATA code
showDuplicatesColumnCount(airports_df,'IATA')
showDuplicatesColumnCountSpark(airports_df,'IATA')
#TODO WHY ARE RESULT DIFFERENT!!!!!! SHOULD BE BOTH 511 OR 524!!

In [None]:
showDuplicatesColumnCount(tracks_df,'flight')

```
We could call dropDuplicateColumn(df_airports,'IATA')

However this would delete rows without knowing the correct IATA. 
The Flights tables does not have uniq field like flightname,
so will not delete any rows there either.
```

In [None]:
#TODO: Make function to show these rows so we know which are duplicates
#df_duplicate_airports = getDFDuplicateColumns(airports_df,'IATA','new_id')
#df_duplicate_airports.toPandas()

### Missing data

```
Drop data row if possible in case of missing. if datasize. < 50% check which features are missing, and just drop these.
Alternative impute missing:
Boolean: add missing category
categorial already: add multiple extra levels and and missing there
numeric and ordinal: mean, median etc to fill in
```

In [None]:
#0 = perfect, 1 = all is missing
showMissingDataPercent(airports_df) #State misses some data
showMissingDataPercent(flights_df)
showMissingDataPercent(tracks_df) #We miss some, flight a bit, but is important to have these,

In [None]:
#We could just drop column state, we keep all our rows, and have no missing data
df_no_state = getDFDropColumn(airports_df,'State')
showMissingDataPercent(df_no_state)

df_no_flight = getDFDropColumn(tracks_df,'flight')
showMissingDataPercent(df_no_flight)

df_no_flight.count() #113993 out of 114k

In [7]:
#Or drop only rows with any missing data
df_without_missing = getDFDropMissingRows(airports_df)
showMissingDataPercent(df_without_missing)

df_without_missing_flight = getDFDropMissingRows(tracks_df)
showMissingDataPercent(df_without_missing_flight)

df_without_missing_flight.count() #Also 113167 , so we could just use this for the tracker

+------------+-------------+---------------+------------+
|City_missing|State_missing|Country_missing|IATA_missing|
+------------+-------------+---------------+------------+
|         0.0|          0.0|            0.0|         0.0|
+------------+-------------+---------------+------------+

+----------+----------------+------------+---------------+--------------+-------------+--------------+------------+-----------+-----------+-----------+-----------+-----------+--------------------+---------------+-------------+-----------+-----------------+----------------------+
|na_missing|altitude_missing|dest_missing|heading_missing|flight_missing|fltid_missing|landed_missing|time_missing|lat_missing|lon_missing|na3_missing|org_missing|na4_missing|registration_missing|flight2_missing|speed_missing|na6_missing|planetype_missing|altitude_delta_missing|
+----------+----------------+------------+---------------+--------------+-------------+--------------+------------+-----------+-----------+----------

113167

In [8]:
#Or we can impute values, as this is a category we will add a missing category
df_missing_state = getDFFillMissingCategoryColumn(airports_df,'State')
df_missing_state.where("State == 'missing'").show() #3
df_missing_state.count() #526
showMissingDataPercent(df_missing_state)

+-------------+-------+-------+----+
|         City|  State|Country|IATA|
+-------------+-------+-------+----+
|Washington DC|missing|    USA| IAD|
|Washington DC|missing|    USA| DCA|
|Washington DC|missing|    USA| WAS|
+-------------+-------+-------+----+

+------------+-------------+---------------+------------+
|City_missing|State_missing|Country_missing|IATA_missing|
+------------+-------------+---------------+------------+
|         0.0|          0.0|            0.0|         0.0|
+------------+-------------+---------------+------------+



In [9]:
#LETS PICK LAST OPTION for Airports and trackers
airports_df = df_missing_state
tracks_df = df_without_missing_flight

### outliers

Outliers are those observations that deviate signi cantly from the distribution of the rest of your sample. The de nitions of signi cance vary, but in the most general form, you can accept that there are no outliers if all the values are roughly within the Q1−1.5IQR and Q3+1.5IQR range

In [None]:
#Show the ouytlier ranges for our integer based columns
col_list = ['delay','distance'] 

#Run cast code in the beginning again (dont no why thats needed?)
outliers_dict = getDictOutliers(flights_df,col_list)
print(outliers_dict) 

#Show the ouytlier ranges for our integer based columns
col_flights_list = ['lat','lon','altitude','heading','time','landed'] 

outliers_flights_dict = getDictOutliers(tracks_df,col_flights_list)
print(outliers_flights_dict) #Not really handy way to check outliers.



In [None]:
#Flag rows
#Run the id add function again, for some reason..
df_outliers = getDFOutliers(flights_df,outliers_dict,col_list,'id')
df_outliers.show()

df_flight_outliers = getDFOutliers(tracks_df,outliers_flights_dict,col_flights_list,'flight')
df_flight_outliers.show()

In [None]:
#Show outlier  flights
#1.4 milj flights, about 162k has outlier delays. And 75k outlier distance
df_out= flights_df.join(df_outliers, on='id')
print(df_out.filter('delay_o').select('id', 'delay').count())
print(df_out.filter('distance_o').select('id', 'distance').count())
df_out.filter('delay_o').select('id', 'delay').show()
df_out.filter('distance_o').select('id', 'distance').show()
    

In [None]:
#Show outlier  tracks, dont understand the result yet .. todo
df_out=tracks_df.join(df_flight_outliers, on='flight')
print(df_out.filter('heading_o').select('flight', 'heading').count())  #None, however heading has strange values
print(df_out.filter('altitude_o').select('flight', 'altitude').count()) #None, but shows some strange numbers
print(df_out.filter('lat_o').select('flight', 'lat').count()) #32, < 33 but still good value
print(df_out.filter('lon_o').select('flight', 'lon').count()) #-66 also good
print(df_out.filter('landed_o').select('flight', 'landed').count())  # 0 , <> 0.0 .. 

df_out.filter('heading_o').select('flight', 'heading').show()
df_out.filter('altitude_o').select('flight', 'altitude').show()
df_out.filter('lat_o').select('flight', 'lat').show()
df_out.filter('lon_o').select('flight', 'lon').show()
df_out.filter('landed_o').select('flight', 'landed').show()

In [None]:
#Lets filters in between known ranges:
#landed 0 or 1
#heading 0 - 360
#altitude < 100 , > 50000 
#the valid range of latitude in degrees is -90 and +90 . Longitude is in the range -180 and +180 
tracks_df = tracks_df.where("landed == 0 or landed == 1")
tracks_df = tracks_df.where("heading >= 0 and heading < 360")
tracks_df = tracks_df.where("altitude > -100 and altitude < 50000")
tracks_df = tracks_df.where("lat >= -90 and lat <= 90")
tracks_df = tracks_df.where("lon >= -180 and lon <= 180")
tracks_df.count() #only 1 row removed 113992

tracks_df.describe().toPandas() #looks fine


## Example flight

In [None]:
df_sel = tracks_df.where("flight == 'KL836'").toPandas().sort_values(['time']).reset_index() #515 rows
df_sel.head(10)

In [None]:
from IPython.display import HTML
import folium

%matplotlib inline
def inline_map(map):
    """
    Embeds the HTML source of the map directly into the IPython notebook.
    
    This method will not work if the map depends on any files (json data). Also this uses
    the HTML5 srcdoc attribute, which may not be supported in all browsers.
    """
    map._build_map()
    return HTML('<iframe srcdoc="{srcdoc}" style="width: 100%; height: 510px; border: none"></iframe>'.format(srcdoc=map.HTML.replace('"', '&quot;')))
 
def embed_map(map, path="map.html"):
    """
    Embeds a linked iframe to the map into the IPython notebook.
    
    Note: this method will not capture the source of the map into the notebook.
    This method should work for all maps (as long as they use relative urls).
    """
    map.create_map(path=path)
    return HTML('<iframe src="files/{path}" style="width: 100%; height: 510px; border: none"></iframe>'.format(path=path))

In [None]:
import matplotlib
#matplotlib.style.use('ggplot')
df_sel[['altitude','speed']].plot()

In [None]:

import sys
import folium

from IPython.display import HTML


def display(m, height=300):
    """Takes a folium instance and embed HTML."""
    m._build_map()
    srcdoc = m.HTML.replace('"', '&quot;')
    embed = HTML('<iframe srcdoc="{0}" '
                 'style="width: 100%; height: {1}px; '
                 'border: none"></iframe>'.format(srcdoc, height))
    return embed

def inline_map(map):
    """
    Embeds the HTML source of the map directly into the IPython notebook.
    
    This method will not work if the map depends on any files (json data). Also this uses
    the HTML5 srcdoc attribute, which may not be supported in all browsers.
    """
    map._build_map()
    return HTML('<iframe srcdoc="{srcdoc}" style="width: 100%; height: 510px; border: none"></iframe>'.format(srcdoc=map.HTML.replace('"', '&quot;')))

def embed_map(map, path="map.html"):
    """
    Embeds a linked iframe to the map into the IPython notebook.
    
    Note: this method will not capture the source of the map into the notebook.
    This method should work for all maps (as long as they use relative urls).
    """
    #map.create_map(path=path)
    return HTML('<iframe src="files/{path}" style="width: 100%; height: 510px; border: none"></iframe>'.format(path=path))

In [None]:
import folium
from IPython.display import HTML
def plotFlight(flight):
    #df_sel = tracks_df.where("flight == '" + flight + "'").toPandas().sort_values(['time']).reset_index()
    df_sel = join_df.toPandas().query("flight == '" + flight + "'").sort_values(['time']).reset_index()
    fmap=folium.Map(location=[52.308871, 4.761392], zoom_start=4)
    #for row in df_sel.iterrows():
     #   latlon = [ row[1]['lat'], row[1]['lon'] ]
    #   folium.Marker(latlon, popup=str(row[1]['time'])).add_to(fmap)
     #   fmap.add_children
    
    
    latlist = df_sel['lat'].tolist()
    lonlist = df_sel['lon'].tolist()
    coordinates = zip(latlist[:], lonlist[:])
    line=folium.PolyLine(locations=coordinates,weight=3,color = 'red')
    fmap.add_children(line)
    fmap.save('osm.html')
    return HTML('<iframe src="files/{path}" style="width: 100%; height: 510px; border: none"></iframe>'.format(path='osm.html'))

def plotPoint(lon,lat):
    fmap=folium.Map(location=[52.308871, 4.761392], zoom_start=13)
    latlon = [ lat, lon ]
    folium.Marker(latlon, popup=str("test")).add_to(fmap)
    fmap.add_children
    fmap.save('osm.html')
    return HTML('<iframe src="files/{path}" style="width: 100%; height: 510px; border: none"></iframe>'.format(path='osm.html'))



In [None]:
#plotFlight('Y87486')
#plotFlight('MP6742') #freight
plotFlight('KL1326')
#plotFlight('KL214')
#plotFlight('KL836')
#plotFlight('U26771')

In [None]:
runway_list = [   { "runway" : 1, "lat" : 52.350202, "lon" : 4.710732 , "heading1" : 180 , "heading2" : 360, "name" : "polderbaan" },
    { "runway" : 2, "lat" : 52.316110, "lon" : 4.738369 , "heading1" : 180 , "heading2" : 360, "name" : "zwanenburgbaan" },
    { "runway" : 3, "lat" : 52.317579, "lon" : 4.772186 , "heading1" : 90 , "heading2" : 270, "name" : "buitenveldertbaan" },
    { "runway" : 4, "lat" : 52.297217, "lon" : 4.757938 , "heading1" : 60 , "heading2" : 240, "name" : "kaagbaan" },
    { "runway" : 5, "lat" : 52.307714, "lon" : 4.778881 , "heading1" : 180 , "heading2" : 360, "name" : "aalsmeerbaan" },
    { "runway" : 6, "lat" : 52.308659, "lon" : 4.795361 , "heading1" : 40 , "heading2" : 220, "name" : "oostbaan" }
]

pier_list = [
        {"id" : 0 , "pier" : "A" , "lon" : 4.753781 , "lat" : 52.300381},
        {"id" : 1 , "pier" : "B" , "lon" : 4.759363 , "lat" : 52.302362},
        {"id" : 2 , "pier" : "C" , "lon" : 4.766188 , "lat" : 52.305380},
        {"id" : 3 , "pier" : "D" , "lon" : 4.771575 , "lat" : 52.309147},
        {"id" : 4 , "pier" : "E" , "lon" : 4.767366 , "lat" : 52.312182},
        {"id" : 5 , "pier" : "F" , "lon" : 4.761679 , "lat" : 52.313040},
        {"id" : 6 , "pier" : "G" , "lon" : 4.755998 , "lat" : 52.312574},
        {"id" : 7 , "pier" : "H" , "lon" : 4.754054 , "lat" : 52.310135}
    ]
#pt = runway_list[5]
pt = pier_list[6]
plotPoint(pt["lon"],pt["lat"])

### Statistics

In [None]:
#read in the file
flights = "file:/root/learningPySpark/Chapter03/flight-data/departuredelays.csv" 
fl = sc.textFile(flights) #you can use .gz, so better then spark sql
header = fl.first()
header

In [None]:

#Filter numeric columns in flights
fl_filter = fl.filter(lambda row: row != header) \
       .map(lambda row: [int(elem) for elem in row.split(',') if (elem.isdigit() or elem.lstrip("-").isdigit()) ])
fl_filter.take(5) #.foreach(println)

In [None]:
#create schema
fields = [ *[
           typ.StructField(h, typ.IntegerType(), True)
           for h in header.split(',')
       ]
   ]
schema = typ.StructType(fields)
schema   

In [None]:
#create DF Spark
fli_df = spark.createDataFrame(fl_filter, schema)
fli_df.printSchema()
#fli_df.show() Some columns are not integer so crash, to fix later

In [None]:
#to group by values within a column
tracks_df.groupby('flight').count().show()

In [None]:
#grouping the whole set and perform function
tracks_df.agg({'speed' : 'skewness'}).show() #ratio mean to sd is very low, wide spread observation negatively

#can also use: avg(), count(), countDistinct(), first(), kurtosis(), max(), mean(), min(), skewness(), stddev(), stddev_pop(), stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance().



In [None]:
#correlation is simple (only pearson , and in pairs)
tracks_df.corr('landed','speed') #quit some relation which you expect

In [None]:
def showCorrelationMatrix(df,numerical):
    'for a DF print matrix with correlations between all numerical columns'
    n_numerical = len(numerical)
    corr = []
    for i in range(0, n_numerical):
        temp = [None] * i
        for j in range(i, n_numerical):
            temp.append(df.corr(numerical[i], numerical[j]))
        corr.append(temp)
        
    print(corr)

In [None]:
showCorrelationMatrix(tracks_df,['speed','landed','altitude','heading'])

In [None]:
tracks_df.corr('speed','altitude') #very high correlation

### Lets try to make some features
```
Flight,time,speed,distance_to_ams,time_till_actual_landing

distance to amsterdam , will be lat/lon comparison to lat/lon ams airport
for each row
```

In [10]:
from math import radians, cos, sin, asin, sqrt
def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    km = 6367 * c
    return km

#This will be slow in pyspark due to context switching JVM and pyspark
#Better to use UDF, or use Scala etc
def dist_to_ams(lat,lon):
    return haversine(float(4.761392), float(52.308871), float(lon),float(lat))

In [11]:

def closest_runway(lon,lat,heading):
    """ Get closest runway at amsterdam based on plane location and heading. return code 1-12 (6 runways * 2 headings)"""
    
    runway_list = [   { "runway" : 1, "lat" : 52.350202, "lon" : 4.710732 , "heading1" : 180 , "heading2" : 360, "name" : "polderbaan" },
    { "runway" : 2, "lat" : 52.316110, "lon" : 4.738369 , "heading1" : 180 , "heading2" : 360, "name" : "zwanenburgbaan" },
    { "runway" : 3, "lat" : 52.317579, "lon" : 4.772186 , "heading1" : 90 , "heading2" : 270, "name" : "buitenveldertbaan" },
    { "runway" : 4, "lat" : 52.297217, "lon" : 4.757938 , "heading1" : 60 , "heading2" : 240, "name" : "kaagbaan" },
    { "runway" : 5, "lat" : 52.307714, "lon" : 4.778881 , "heading1" : 180 , "heading2" : 360, "name" : "aalsmeerbaan" },
    { "runway" : 6, "lat" : 52.308659, "lon" : 4.795361 , "heading1" : 40 , "heading2" : 220, "name" : "oostbaan" }
]
    
    smallest_dist_runway = -1
    smallest_dist = -1
    for runway in runway_list:
        dist = haversine(float(runway["lon"]), float(runway["lat"]), float(lon),float(lat))
        if smallest_dist == -1 or dist < smallest_dist:
            smallest_dist = dist
            smallest_dist_runway = runway
    
    angle1 = 180 - abs(abs(heading - smallest_dist_runway["heading1"]) - 180); 
    angle2 = 180 - abs(abs(heading - smallest_dist_runway["heading2"]) - 180); 
    runway_code = smallest_dist_runway["runway"]
    
    if angle1 < angle2 :
        return runway_code
    return runway_code + 6

def distance_to_runway(lon,lat,runway):
    """ Get distance based on lat lon to runway given its id"""
    runway_list = [   { "runway" : 1, "lat" : 52.350202, "lon" : 4.710732 , "heading1" : 180 , "heading2" : 360, "name" : "polderbaan" },
    { "runway" : 2, "lat" : 52.316110, "lon" : 4.738369 , "heading1" : 180 , "heading2" : 360, "name" : "zwanenburgbaan" },
    { "runway" : 3, "lat" : 52.317579, "lon" : 4.772186 , "heading1" : 90 , "heading2" : 270, "name" : "buitenveldertbaan" },
    { "runway" : 4, "lat" : 52.297217, "lon" : 4.757938 , "heading1" : 60 , "heading2" : 240, "name" : "kaagbaan" },
    { "runway" : 5, "lat" : 52.307714, "lon" : 4.778881 , "heading1" : 180 , "heading2" : 360, "name" : "aalsmeerbaan" },
    { "runway" : 6, "lat" : 52.308659, "lon" : 4.795361 , "heading1" : 40 , "heading2" : 220, "name" : "oostbaan" }
]
    #runway code can be 1-12 (6 runways, 2 directions). So 7 means 1 + 6 , 8 means 2 + 6 etc
    if runway > 6:
        runway = runway - 6
        
    runway_lat = runway_list[runway-1]["lat"]
    runway_lon = runway_list[runway-1]["lon"]
    
    return haversine(float(lon),float(lat),float(runway_lon),float(runway_lat))

def distance_to_pier(lon,lat,pier):
    """ Get distance based on lat lon to pier given its id"""
    pier_list = [
        {"id" : 0 , "pier" : "A" , "lon" : 4.753781 , "lat" : 52.300381},
        {"id" : 1 , "pier" : "B" , "lon" : 4.759363 , "lat" : 52.302362},
        {"id" : 2 , "pier" : "C" , "lon" : 4.766188 , "lat" : 52.305380},
        {"id" : 3 , "pier" : "D" , "lon" : 4.771575 , "lat" : 52.309147},
        {"id" : 4 , "pier" : "E" , "lon" : 4.767366 , "lat" : 52.312182},
        {"id" : 5 , "pier" : "F" , "lon" : 4.761679 , "lat" : 52.313040},
        {"id" : 6 , "pier" : "G" , "lon" : 4.755998 , "lat" : 52.312574},
        {"id" : 7 , "pier" : "H" , "lon" : 4.754054 , "lat" : 52.310135}
    ]
        
    pier_lat = pier_list[pier]["lat"]
    pier_lon = pier_list[pier]["lon"]
    
    return haversine(float(lon),float(lat),float(pier_lon),float(pier_lat))

In [12]:
def closest_pier(lon,lat):
    """ Get closest pier based on a location, used to predict on block time"""
    pier_list = [
        {"id" : 0 , "pier" : "A" , "lon" : 4.753781 , "lat" : 52.300381},
        {"id" : 1 , "pier" : "B" , "lon" : 4.759363 , "lat" : 52.302362},
        {"id" : 2 , "pier" : "C" , "lon" : 4.766188 , "lat" : 52.305380},
        {"id" : 3 , "pier" : "D" , "lon" : 4.771575 , "lat" : 52.309147},
        {"id" : 4 , "pier" : "E" , "lon" : 4.767366 , "lat" : 52.312182},
        {"id" : 5 , "pier" : "F" , "lon" : 4.761679 , "lat" : 52.313040},
        {"id" : 6 , "pier" : "G" , "lon" : 4.755998 , "lat" : 52.312574},
        {"id" : 7 , "pier" : "H" , "lon" : 4.754054 , "lat" : 52.310135}
    ]

    smallest_dist_pier = -1
    smallest_dist = -1
    for pier in pier_list:
        dist = haversine(float(pier["lon"]), float(pier["lat"]), float(lon),float(lat))
        if smallest_dist == -1 or dist < smallest_dist:
            smallest_dist = dist
            smallest_dist_pier = pier
            
    return smallest_dist_pier["id"]

In [13]:
def negative_to_zero(some_number):
    if some_number < 0:
        return 0
    return some_number

In [14]:
import datetime

def timestamp_to_hour(my_time):
    d = datetime.datetime.utcfromtimestamp(my_time)
    return d.hour
    #return d.hour + (d.minute + 30 + (d.second + 30) / 60) / 60 #nearest hour
    
#h = d.hour + d.minute / 60. + d.second / 3600  #round down

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType

udf_func = udf(dist_to_ams, FloatType())
tracks_df = tracks_df.withColumn("distance_to_ams", \
                            udf_func(tracks_df.lat,tracks_df.lon))

udf_func2 = udf(closest_runway, IntegerType())
udf_func3 = udf(negative_to_zero,IntegerType())
udf_func4 = udf(closest_pier, IntegerType())
udf_func5 = udf(distance_to_runway,FloatType())
udf_func6 = udf(distance_to_pier,FloatType())
udf_func7 = udf(timestamp_to_hour,IntegerType())

In [None]:
#Distance is great, however, after landing, the next row is the NEXT
# flight
df = tracks_df.where("flight == 'KL214'").toPandas().sort_values(['time']).reset_index()
df.head(5)

In [None]:
#just a test to compare rows
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

df = (
    sc.parallelize([
        (134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
        (125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
    ]).toDF(["itemid", "eventid", "timestamp"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
)

w = Window.partitionBy("itemid").orderBy("timestamp")

diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")

df = df.withColumn("diff", diff)
df.show()

In [None]:
#Lets create a delta to get the landing times
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

df = (
    sc.parallelize([
        ('kl123', 0, "2016-07-02 12:01:40"), ('kl123', 0, "2016-07-02 12:21:23"),
        ('kl123', 1, "2016-07-02 13:22:56"), ('kl123', 1, "2016-07-02 13:27:07"),
    ]).toDF(["itemid", "landed", "timestamp"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
)

w = Window.partitionBy("itemid").orderBy("timestamp")

diff = col("landed").cast("int") - lag("landed", 1).over(w).cast("int")

df = df.withColumn("diff", diff)
df.show()

In [16]:
#now lets adds column to show the landing moment, so delta of the landed column should be +1 .
#
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window


w = Window.partitionBy("flight").orderBy("time")

diff = col("landed").cast("int") - lag("landed", 1).over(w).cast("int")
tracks_touchdown_df = tracks_df.select(["flight","time","lat","lon","heading","landed","registration"]).withColumn("touchdown", diff)
tracks_touchdown_df = tracks_touchdown_df.withColumn("runway", \
                            udf_func2(tracks_touchdown_df.lon,tracks_touchdown_df.lat,tracks_touchdown_df.heading))
tracks_touchdown_df = tracks_touchdown_df.where("touchdown == 1").select(col("flight").alias("flight_touchdown"), \
                                                                         col("runway").alias("runway_touchdown"), \
                                                                         col("time").alias("time_touchdown") )
tracks_touchdown_df.show()

+----------------+----------------+--------------+
|flight_touchdown|runway_touchdown|time_touchdown|
+----------------+----------------+--------------+
|          CND518|               8|    1466792627|
|           DL138|               1|    1466760692|
|          MP6742|               1|    1466778285|
|          KL1742|               2|    1466775981|
|          U28881|               1|    1466793964|
|          KL1134|               1|    1466796510|
|          KL1800|               1|    1466801922|
|          KL1858|               1|    1466777442|
|           KL888|               1|    1466785238|
|           OR288|               1|    1466767433|
|          KL1618|               2|    1466749916|
|          KL1790|               2|    1466749726|
|           KL736|               1|    1466743064|
|          U27908|               1|    1466800107|
|          KL1168|               2|    1466775337|
|          KL1010|               1|    1466769724|
|           LH992|             

In [None]:
tracks_touchdown_df.count()

In [17]:
#check for multiple landings 1 flight
#tracks_touchdown_df.groupby(["flight","registration"]).count().where("count > 1").show()
tracks_touchdown_df.groupby(["flight_touchdown"]).count().where("count > 1").show()

+----------------+-----+
|flight_touchdown|count|
+----------------+-----+
|          HV6118|    2|
|          HV5134|    2|
|          KL1412|    2|
|          HV6332|    2|
|          HV5356|   19|
|          HV5314|    2|
|           TP668|    2|
|          HV6146|    2|
+----------------+-----+



In [18]:
#remove them
tracks_touchdown_df = tracks_touchdown_df.where("flight_touchdown != 'HV6118' and flight_touchdown != 'HV5134' and flight_touchdown != 'KL1412' and flight_touchdown != 'HV6332' and flight_touchdown != 'HV5356' and flight_touchdown != 'HV5314' and flight_touchdown != 'TP668' and flight_touchdown != 'HV6146' ")

In [19]:
#check again
tracks_touchdown_df.groupby(["flight_touchdown"]).count().where("count > 1").show()

+----------------+-----+
|flight_touchdown|count|
+----------------+-----+
+----------------+-----+



In [20]:
#Lets make a new features that calculates nr of landings per hour
tracks_touchdown_df = tracks_touchdown_df.withColumn("hour_touchdown", udf_func7(col("time_touchdown")))
touchdown_sum_df = tracks_touchdown_df.groupby("hour_touchdown").sum().select(col("hour_touchdown").alias("hour_touchdown_grp"),col("sum(hour_touchdown)").alias("sum_hour_touchdown"))
tracks_touchdown_df = tracks_touchdown_df.join(touchdown_sum_df,tracks_touchdown_df.hour_touchdown == touchdown_sum_df.hour_touchdown_grp)

In [None]:
tracks_touchdown_df.show()

In [21]:
#join tracks with our landing table and add column showing time till land
join_df = tracks_df.join(tracks_touchdown_df,tracks_df.flight == tracks_touchdown_df.flight_touchdown)
join_df = join_df.withColumn("time_till_landing",col("time_touchdown") - col("time"))

In [22]:
#get rid of > 7000 sec , so we make sure flights on the next day are not using the landing of current day
join_df = join_df.where("time_till_landing > -7000")

#ML require positive, but we dont want to remove that, we want to calc in block time later on
join_df = join_df.withColumn("time_till_landing", udf_func3(join_df.time_till_landing))  
join_df = join_df.withColumn("time_till_landing_minutes",join_df.time_till_landing / 60)

In [23]:
from pyspark.sql.functions import max,min

#LEts us join_df to add the pier used to park, so we calculate on block
maxtime_df = join_df.groupby(col("flight").alias("flight_maxtime")).agg(max("time").alias("time_onblock"))
#join_maxtime_df = join_df.join(maxtime_df,join_df.flight == maxtime_df.flight_maxtime)
#join_maxtime_df = join_maxtime_df.withColumn("time_till_onblock_minutes",(col("time_onblock") - col("time")) / 60)

#join_maxtime_df.toPandas().head(10) #show(10)

In [24]:
onblock_df = maxtime_df.join(join_df,(maxtime_df.flight_maxtime == join_df.flight) & (maxtime_df.time_onblock == join_df.time))

In [25]:
onblock_df = onblock_df.select(col("flight").alias("flight_onblock"), \
                               col("lat").alias("lat_onblock"), \
                               col("lon").alias("lon_onblock"), \
                               col("time_onblock"))
#onblock_df.show(10)
                               

In [26]:
onblock_df = onblock_df.withColumn("pier_onblock",udf_func4(col("lon_onblock"),col("lat_onblock")) )
#onblock_df.show(10)

In [27]:
join_maxtime_df = join_df.join(onblock_df,join_df.flight == onblock_df.flight_onblock)
join_maxtime_df = join_maxtime_df.withColumn("time_till_onblock_minutes",(col("time_onblock") - col("time")) / 60)

#join_maxtime_df.toPandas().head(10) #show(10)

In [28]:
join_df = join_maxtime_df

In [None]:
#join_df.toPandas().head(10) 

In [None]:
join_df.toPandas()['runway_touchdown'].unique()

In [29]:
join_df = join_df.withColumn("distance_to_runway",udf_func5(col("lon"),col("lat"),col("runway_touchdown")))

In [30]:
join_df = join_df.withColumn("distance_to_pier",udf_func6(col("lon"),col("lat"),col("pier_onblock")))

In [None]:
join_df.toPandas().head(10) 

### some minor random check

In [None]:
tracks_df.select("flight").distinct().count() #747 planes, might be all

In [None]:
join_df.select("flight").distinct().count()

In [None]:
df = join_df.toPandas().query("flight == 'KL214'").sort_values(['time']).reset_index()
join_df.take(1)

### This section is used to filter flights close to pier centre, as our on block coverage is not good enough right now
The ok_df set will contain flights good enough to test with, and will be joined later on to our main dataset


In [None]:
test_df = join_df.select(['flight','time_till_onblock_minutes','distance_to_pier','speed','pier_onblock','lat','lon'])
test_df.cache()

In [None]:
#Lets get rid of planes parking at locations outside our current coverage (like cargo planes)
ok_df = test_df.where("time_till_onblock_minutes < 1").filter("distance_to_pier < 1").select(col("flight").alias("flight_ok"))

In [None]:
ok_df.count()

In [None]:
ok_df.toPandas().head(5)

### Machine learning set

In [31]:

#Create our machine learing dataset
ml_df = join_df.select(['flight','sum_hour_touchdown','time_till_landing_minutes','time_till_onblock_minutes','distance_to_ams','distance_to_runway','distance_to_pier','altitude','speed','heading','runway_touchdown','pier_onblock'])
#ml_df = ml_df.withColumn('id',fn.monotonically_increasing_id())
#ml_df.toPandas()

In [None]:
#remove later!!!! just to test, we remove all flights to far away from our onblock locations
ml_df = ml_df.join(ok_df,ok_df.flight_ok == ml_df.flight)

In [32]:
training_data, testing_data = ml_df.randomSplit([0.7, 0.3])

In [None]:
#ml_df.toPandas().to_csv('/root/fr24/fr24.csv', sep=',',index=False)

### Predict landing

In [33]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

#Change label to time_till_landing_minutes if you want to predict that field

#temp removed ,"heading" features so sum_hour_touchdown can be added (prevent pc crash)
assembler = VectorAssembler(inputCols=["sum_hour_touchdown", "distance_to_runway","altitude","speed","runway_touchdown"],outputCol="features")
Regressor = DecisionTreeRegressor(featuresCol="features",labelCol="time_till_landing_minutes",maxDepth=25)
pipeline = Pipeline(stages=[assembler,Regressor])
model = pipeline.fit(training_data)



In [34]:
predictions = model.transform(testing_data)
modelEvaluator = RegressionEvaluator(labelCol="time_till_landing_minutes")
modelError = modelEvaluator.evaluate(predictions) #rmse by default
modelError = modelEvaluator.evaluate(predictions,{modelEvaluator.metricName: "mae"})

In [35]:
modelError #time landing 4,5 minutes, changing heading feature to sum_hour_touchdown gives same result

4.570515650024528

In [None]:
predictions.toPandas().head(100) #show(100)

### Predict on block time



In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

#We only predict onblock and have extra field pier_onblock
assembler = VectorAssembler(inputCols=["distance_to_pier","speed","altitude","heading","runway_touchdown"],outputCol="features")
Regressor = DecisionTreeRegressor(featuresCol="features",labelCol="time_till_onblock_minutes",maxDepth=25)
pipeline = Pipeline(stages=[assembler,Regressor])
model = pipeline.fit(training_data)

In [None]:
predictions = model.transform(testing_data)
modelEvaluator = RegressionEvaluator(labelCol="time_till_onblock_minutes")
modelError = modelEvaluator.evaluate(predictions,{modelEvaluator.metricName: "mae"})

In [None]:
modelError  # WOW!! 4.8 when selecting flights < 1 km from centre of pier 

In [None]:
predictions.toPandas().query("time_till_onblock_minutes < 5").head(100)

### lets try random forest

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

assembler = VectorAssembler(inputCols=["distance_to_ams","speed","altitude","heading","runway"],outputCol="features")
Regressor = RandomForestRegressor(featuresCol="features",labelCol="time_till_landing_minutes",maxDepth=5)

#pipeline = Pipeline(stages=[isLondonIndexer,durationIndexer,typeIndexer,assembler,Regressor])
pipeline = Pipeline(stages=[assembler,Regressor])
model = pipeline.fit(training_data)

In [None]:
#test again with new model
predictions = model.transform(testing_data)
modelEvaluator = RegressionEvaluator(labelCol="time_till_landing_minutes")
modelError = modelEvaluator.evaluate(predictions) #rmse by default
modelError = modelEvaluator.evaluate(predictions,{modelEvaluator.metricName: "mae"})

In [None]:
#22 mins with default settings depth 5, 9 mins with depth 10.
#6,4 with depth 15. Crashed with 20 depth on mac
modelError

### Tuning parameters

In [None]:
#Grid-search first , later train validation
grid = tune.ParamGridBuilder() \
       .addGrid(logistic.maxIter,
                [2, 10, 50]) \
       .addGrid(logistic.regParam,
                [0.01, 0.05, 0.3]) \
       .build()
    
evaluator = ev.BinaryClassificationEvaluator( \
       rawPredictionCol='probability', \
       labelCol='INFANT_ALIVE_AT_REPORT')
    
cv = tune.CrossValidator( \
       estimator=logistic, \
       estimatorParamMaps=grid, \
       evaluator=evaluator
)
    
pipeline = Pipeline(stages=[encoder ,featuresCreator])
data_transformer = pipeline.fit(births_train)
cvModel = cv.fit(data_transformer.transform(births_train))

data_train = data_transformer \
       .transform(births_test)
results = cvModel.transform(data_train)
print(evaluator.evaluate(results, \
        {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, \
        {evaluator.metricName: 'areaUnderPR'}))

### Classification example

```
import pyspark.ml.classification as cl
from pyspark.ml import Pipeline
import pyspark.ml.evaluation as ev

encoder = ft.OneHotEncoder(
       inputCol='BIRTH_PLACE_INT',
       outputCol='BIRTH_PLACE_VEC')
       

logistic = cl.LogisticRegression(
       maxIter=10,
       regParam=0.01,
       labelCol='some_classifation')


pipeline = Pipeline(stages=[
           encoder,
           featuresCreator,logistic ])
    
births_train, births_test = births \
       .randomSplit([0.7, 0.3], seed=666)
    
model = pipeline.fit(births_train)
test_model = model.transform(births_test)  

valuator = ev.BinaryClassificationEvaluator(
       rawPredictionCol='probability',
       labelCol='INFANT_ALIVE_AT_REPORT')
       
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

to load:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline \
       .fit(births_train)\
       .transform(births_test)\
       .take(1)
       
       
```

```
What would happen if we have a string based column?

Make it a number
births = births \
       .withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] \
       .cast(typ.IntegerType()))
       
       
   encoder = ft.OneHotEncoder(
       inputCol='BIRTH_PLACE_INT',
       outputCol='BIRTH_PLACE_VEC')
       
Like before create the assembler, but we use getoutputcol so we 
dont care about the real column name:
   featuresCreator = ft.VectorAssembler(
       inputCols=[
           col[0]
           for col
           in labels[2:]] + \
       [encoder.getOutputCol()],
     outputCol='features'
)
```

### Visualisations

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')

import bokeh.charts as chrt
from bokeh.io import output_notebook
output_notebook()

In [None]:
ml_df.where("time_till_landing_minutes < 0").show()

In [None]:
hists = ml_df.select('time_till_onblock_minutes').rdd.flatMap(
       lambda row: row
).histogram(20)

In [None]:
data_hist = {
       'bins': hists[0][:-1],
       'freq': hists[1]
   }
plt.bar(data_hist['bins'], data_hist['freq'], width=20)
plt.title('Histogram of \'time_till_onblock\'')


In [None]:
b_hist = chrt.Bar(
       data_hist,
       values='freq', label='bins',
       title='Histogram of \'on block minutes\'')
chrt.show(b_hist)

In [None]:
percent_back = 0.001

# use this if you want an (almost) exact number of samples
# sample_count = 200
# percent_back = sample_count / posts.count()

frac = dict(
    (e.time_till_landing_minutes, percent_back) 
    for e 
    in ml_df.select('time_till_landing_minutes').distinct().collect()
)
sampled = ml_df.sampleBy('time_till_landing_minutes', fractions=frac)


In [None]:
#sampled.show()