In [57]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/titanic-and-flight-data-set/titanic_lyst2161.csv
/kaggle/input/titanic-and-flight-data-set/flights_lyst3061.csv
/kaggle/input/pyspark-datasets/sms.csv
/kaggle/input/pyspark-datasets/flights-larger.csv


# Machine Learning with PySpark
## Introduction

In [58]:
!pip install pyspark

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
[0m

#### Create a SparkSession

In [59]:
# Import the SparkSession class
from pyspark.sql import SparkSession

# Create SparkSession object (Local Cluster)
spark = SparkSession.builder.master('local[*]').appName('test').getOrCreate()


# Remote Cluster using Spark URL - spark://<IP address | DNS name>:<port>
# Example:spark://13.59.151.161:7077

# What version of Spark?
print(spark)
print(spark.version)


# spark.stop()

<pyspark.sql.session.SparkSession object at 0x7fa99c5f3d50>
3.3.2


#### Loading flights data

In [60]:
# Read data from CSV file
flights = spark.read.csv('/kaggle/input/titanic-and-flight-data-set/flights_lyst3061.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print(f"The data contain {flights.count()} records.")

# View the first five records
flights.show(5)

# Check column data types
print(flights.dtypes)

The data contain 50000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


#### Loading SMS spam data
The file sms.csv contains a selection of SMS messages which have been classified as either 'spam' or 'ham'. These data have been adapted from the UCI Machine Learning Repository. There are a total of 5574 SMS, of which 747 have been labelled as spam.

Notes on CSV format:

- no header record and
- fields are separated by a semicolon (this is not the default separator).

In [61]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv('/kaggle/input/pyspark-datasets/sms.csv', sep=';', header=False, schema=schema)

# Print schema of DataFrame
sms.printSchema()

sms.show(5, truncate=False)

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)

+---+---------------------------------------------------------------------------------------------------------------+-----+
|id |text                                                                                                           |label|
+---+---------------------------------------------------------------------------------------------------------------+-----+
|1  |Sorry, I'll call later in meeting                                                                              |0    |
|2  |Dont worry. I guess he's busy.                                                                                 |0    |
|3  |Call FREEPHONE 0800 542 0578 now!                                                                              |1    |
|4  |Win a 1000 cash prize or a prize worth 5000                                                                    |1    |
|5  |Go until jur

## Classification
Now that you are familiar with getting data into Spark, you'll move onto building two types of classification model: Decision Trees and Logistic Regression. You'll also find out about a few approaches to data preparation.
### Data Preparation

#### Remove columns and rows

In [62]:
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')

# Number of records
print(f"Number of records: {flights_drop_column.count()}")

# Number of records with missing 'delay' values
print(f"Number of records with missing 'delay' values: {flights_drop_column.filter('delay IS NULL').count()}")

# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()
print(flights_none_missing.count())

Number of records: 50000
Number of records with missing 'delay' values: 2978
47022


#### Column manipulation
The Federal Aviation Administration (FAA) considers a flight to be "delayed" when it arrives 15 minutes or more after its scheduled time.

The next step of preparing the flight data has two parts:

- convert the units of distance, replacing the mile column with a kmcolumn; and
- create a Boolean column indicating whether or not a flight was delayed.

In [63]:
# Import the required function
from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column (1 mile is equivalent to 1.60934 km)
flights_km = flights_none_missing.withColumn('km', round(flights.mile * 1.60934, 0)) \
                    .drop('mile')

# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

# Check first five records
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



#### Categorical columns
In the flights data there are two columns, carrier and org, which hold categorical data. You need to transform those columns into indexed numerical values.

In [64]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights_km)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights_km)

# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)
flights_indexed.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|        0.0|    0.0|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|        0.0|    1.0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|        1.0|    0.0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|        0.0|    1.0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|        1.0|    0.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



#### Assembling columns
The final stage of data preparation is to consolidate all of the predictor columns into a single column.

In [65]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'
], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights_indexed)

# Check the resulting column
flights_assembled.show(5, truncate=False)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+-----------------------------------------+
|mon|dom|dow|carrier|org|depart|duration|delay|km    |label|carrier_idx|org_idx|features                                 |
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+-----------------------------------------+
|0  |22 |2  |UA     |ORD|16.33 |82      |30   |509.0 |1    |0.0        |0.0    |[0.0,22.0,2.0,0.0,0.0,509.0,16.33,82.0]  |
|2  |20 |4  |UA     |SFO|6.17  |82      |-8   |542.0 |0    |0.0        |1.0    |[2.0,20.0,4.0,0.0,1.0,542.0,6.17,82.0]   |
|9  |13 |1  |AA     |ORD|10.33 |195     |-5   |1989.0|0    |1.0        |0.0    |[9.0,13.0,1.0,1.0,0.0,1989.0,10.33,195.0]|
|5  |2  |1  |UA     |SFO|7.98  |102     |2    |885.0 |0    |0.0        |1.0    |[5.0,2.0,1.0,0.0,1.0,885.0,7.98,102.0]   |
|7  |2  |6  |AA     |ORD|10.83 |135     |54   |1180.0|1    |1.0        |0.0    |[7.0,2.0,6.0,1.0,0.0,1180.0,10.83,135.0] |
+---+---+---+---

#### Train/test split

In [66]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=42)

print(flights_train.count())
print(flights_test.count())

# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights.count()
print(training_ratio)

                                                                                

37601
9421
0.75202


#### Build a Decision Tree

In [67]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flights_test)
prediction.show(5)
prediction.select('label', 'prediction', 'probability').show(5, False)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+----------------+--------------------+----------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|   rawPrediction|         probability|prediction|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+----------------+--------------------+----------+
|  0|  1|  2|     AA|JFK|  12.0|     370|   11|3983.0|    0|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|[5851.0,10507.0]|[0.35768431348575...|       1.0|
|  0|  1|  2|     AA|LGA|  9.92|     170|   -9|1180.0|    0|        1.0|    3.0|[0.0,1.0,2.0,1.0,...|[5851.0,10507.0]|[0.35768431348575...|       1.0|
|  0|  1|  2|     AA|LGA| 20.42|     185|   31|1765.0|    1|        1.0|    3.0|[0.0,1.0,2.0,1.0,...|[5851.0,10507.0]|[0.35768431348575...|       1.0|
|  0|  1|  2|     AA|ORD|  9.08|     560|   39|6828.0|    1|        1.0|    0.0|[0.0,1.0,2.0,1

#### Evaluate the Decision Tree
You can assess the quality of your model by evaluating how well it performs on the testing data. Because the model was not trained on these data, this represents an objective assessment of the model.

A confusion matrix gives a useful breakdown of predictions versus known values

In [68]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN +TP + FN + FP)
print(f'accuracy: {accuracy}')

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1344|
|    0|       0.0| 2523|
|    1|       1.0| 3542|
|    0|       1.0| 2012|
+-----+----------+-----+



                                                                                

accuracy: 0.6437745462265152


#### Build a Logistic Regression model
You've already built a Decision Tree model using the flights data. Now you're going to create a Logistic Regression model on the same data.

The objective is to predict whether a flight is likely to be delayed by at least 15 minutes (label 1) or not (label 0).

Although you have a variety of predictors at your disposal, you'll only use the mon, depart and duration columns for the moment. These are numerical features which can immediately be used for a Logistic Regression model

In [69]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Select numerical cols
flights = flights_indexed.select('mon', 'depart', 'duration', 'label')

# Create an assembler object
assembler = VectorAssembler(inputCols=['mon', 'depart', 'duration'], outputCol='features')

# Consolidate predictor columns
flights = assembler.transform(flights)

# Reorganize cols
flights = flights.select('mon', 'depart', 'duration', 'features', 'label')

flights.show(5)

+---+------+--------+-----------------+-----+
|mon|depart|duration|         features|label|
+---+------+--------+-----------------+-----+
|  0| 16.33|      82| [0.0,16.33,82.0]|    1|
|  2|  6.17|      82|  [2.0,6.17,82.0]|    0|
|  9| 10.33|     195|[9.0,10.33,195.0]|    0|
|  5|  7.98|     102| [5.0,7.98,102.0]|    0|
|  7| 10.83|     135|[7.0,10.83,135.0]|    1|
+---+------+--------+-----------------+-----+
only showing top 5 rows



In [70]:
# Import the logistic regression class
from pyspark.ml.classification import LogisticRegression

# Split data
flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=42)

# Create a classifier object and train on training data
logistic = LogisticRegression().fit(flights_train)

# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(flights_test)
prediction.groupBy('label', 'prediction').count().show()

                                                                                

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1785|
|    0|       0.0| 2563|
|    1|       1.0| 3031|
|    0|       1.0| 2042|
+-----+----------+-----+



#### Evaluate the Logistic Regression model
Accuracy is generally not a very reliable metric because it can be biased by the most common target class.

There are two other useful metrics:

- precision and
- recall.

Precision is the proportion of positive predictions which are correct. For all flights which are predicted to be delayed, what proportion is actually delayed?

Recall is the proportion of positives outcomes which are correctly predicted. For all delayed flights, what proportion is correctly predicted by the model?

The precision and recall are generally formulated in terms of the positive target class. But it's also possible to calculate weighted versions of these metrics which look at both target classes.

In [71]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

def evaluate_binary_classifier(prediction):
    '''Calculates accuracy, precision, reacall, weighted precision and area under ROC evaluation metrics for a binary classifier'''
    # Calculate the elements of the confusion matrix
    TN = prediction.filter('prediction = 0 AND label = prediction').count()
    TP = prediction.filter('prediction = 1 AND label = prediction').count()
    FN = prediction.filter('prediction = 0 AND label != prediction').count()
    FP = prediction.filter('prediction = 1 AND label != prediction').count()

    # Accuracy measures the proportion of correct predictions
    accuracy = (TN + TP) / (TN +TP + FN + FP)
    print(f'accuracy: {accuracy}')
    
    # Calculate precision and recall
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    print(f'precision = {precision}')
    print(f'recall = {recall}')

    # Find weighted precision
    multi_evaluator = MulticlassClassificationEvaluator()
    weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})
    print(f'weighted_precision = {weighted_precision}')

    # Find AUC
    binary_evaluator = BinaryClassificationEvaluator()
    auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})
    print(f'auc = {auc}')

In [72]:
# Calculate evaluation metrics
evaluate_binary_classifier(prediction)

accuracy: 0.5937798535187347
precision = 0.5974768381628228
recall = 0.6293604651162791
weighted_precision = 0.5935613334972236
auc = 0.6267470718307773


### Turning Text into Tables
#### Punctuation, numbers and tokens
At the end of the previous chapter you loaded a dataset of SMS messages which had been labeled as either "spam" (label 1) or "ham" (label 0). You're now going to use those data to build a classifier model.

But first you'll need to prepare the SMS messages as follows:

- remove punctuation and numbers
- tokenize (split into individual words)
- remove stop words
- apply the hashing trick
- convert to TF-IDF representation.
In this exercise you'll remove punctuation and numbers, then tokenize the messages.

In [73]:
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer

# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))

# Merge multiple spaces
sms_clean = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

# Split the text into words
sms_clean_tokenized = Tokenizer(inputCol='text', outputCol='words').transform(sms_clean)

sms_clean_tokenized.show(5)

+---+--------------------+-----+--------------------+
| id|                text|label|               words|
+---+--------------------+-----+--------------------+
|  1|Sorry I'll call l...|    0|[sorry, i'll, cal...|
|  2|Dont worry I gues...|    0|[dont, worry, i, ...|
|  3| Call FREEPHONE now |    1|[call, freephone,...|
|  4|Win a cash prize ...|    1|[win, a, cash, pr...|
|  5|Go until jurong p...|    0|[go, until, juron...|
+---+--------------------+-----+--------------------+
only showing top 5 rows



#### Stop words and hashing
The next steps will be to remove stop words and then apply the hashing trick, converting the results into a TF-IDF.

A quick reminder about these concepts:

- The hashing trick provides a fast and space-efficient way to map a very large (possibly infinite) set of items (in this case, all words contained in the SMS messages) onto a smaller, finite number of values.
- The TF-IDF matrix reflects how important a word is to each document. It takes into account both the frequency of the word within each document but also the frequency of the word across all of the documents in the collection.

In [74]:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF

# Remove stop words.
wrangled_sw = StopWordsRemover(inputCol='words', outputCol='terms').transform(sms_clean_tokenized)

# Apply the hashing trick
wrangled_hash = (HashingTF(inputCol='terms', outputCol='hash', numFeatures=1024)
                 .transform(wrangled_sw))

# Convert hashed symbols to TF-IDF
tf_idf = (IDF(inputCol='hash', outputCol='features')
          .fit(wrangled_hash)
          .transform(wrangled_hash))
      
tf_idf.select('terms', 'features').show(4, truncate=False)

+--------------------------------+----------------------------------------------------------------------------------------------------+
|terms                           |features                                                                                            |
+--------------------------------+----------------------------------------------------------------------------------------------------+
|[sorry, call, later, meeting]   |(1024,[138,384,577,996],[2.273418200008753,3.6288353225642043,3.5890949939146903,4.104259019279279])|
|[dont, worry, guess, busy]      |(1024,[215,233,276,329],[3.9913186080986836,3.3790235241678332,4.734227298217693,4.58299632849377]) |
|[call, freephone]               |(1024,[133,138],[5.367951058306837,2.273418200008753])                                              |
|[win, cash, prize, prize, worth]|(1024,[31,47,62,389],[3.6632029660684124,4.754846585420428,4.072170704727778,7.064594791043114])    |
+--------------------------------+--------------

#### Training a spam classifier
The SMS data have now been prepared for building a classifier.
Next you'll need to split the TF-IDF data into training and testing sets. Then you'll use the training data to fit a Logistic Regression model and finally evaluate the performance of that model on the testing data.

In [75]:
sms = tf_idf.select('label', 'features')
sms.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(1024,[138,384,57...|
|    0|(1024,[215,233,27...|
|    1|(1024,[133,138],[...|
|    1|(1024,[31,47,62,3...|
|    0|(1024,[12,171,191...|
+-----+--------------------+
only showing top 5 rows



In [76]:
from pyspark.ml.classification import LogisticRegression

# Split the data into training and testing sets
sms_train, sms_test = sms.randomSplit([0.8,0.2], seed=13 )

# Fit a Logistic Regression model to the training data
logistic = LogisticRegression(regParam=0.2).fit(sms_train)

# Make predictions on the testing data
prediction = logistic.transform(sms_test)

# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   39|
|    0|       0.0|  932|
|    1|       1.0|  121|
|    0|       1.0|    4|
+-----+----------+-----+



#### Evaluate model

In [77]:
# Calculate evaluation metrics
evaluate_binary_classifier(prediction)

accuracy: 0.9607664233576643
precision = 0.968
recall = 0.75625
weighted_precision = 0.9610271598998699
auc = 0.9912793803418793


## Regression
Next you'll learn to create Linear Regression models. You'll also find out how to augment your data by engineering new predictors as well as a robust approach to selecting only the most relevant predictors.
### One Hot Encoding
#### Ecoding flight origin
The data are in a variable called flights. You have already used a string indexer to create a column of indexed values corresponding to the strings in org.

In [78]:
flights_indexed.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|        0.0|    0.0|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|        0.0|    1.0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|        1.0|    0.0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|        0.0|    1.0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|        1.0|    0.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



In [79]:
# Import the one hot encoder class
from pyspark.ml.feature import OneHotEncoder

# Create an instance of the one hot encoder
onehot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])

# Apply the one hot encoder to the flights data
onehot = onehot.fit(flights_indexed)
flights_onehot = onehot.transform(flights_indexed)

# Check the results
flights_onehot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SMF|    4.0|(7,[4],[1.0])|
|SJC|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



#### Flight duration model: Just distance
In this exercise you'll build a regression model to predict flight duration (the duration column).

For the moment you'll keep the model simple, including only the distance of the flight (the km column) as a predictor.

In [80]:
from pyspark.ml.feature import VectorAssembler

flights_distance = flights_onehot.select('km', 'duration')

# Create an assembler object
assembler = VectorAssembler(inputCols=['km'], outputCol='features')

# Consolidate predictor columns
flights = assembler.transform(flights_distance)

# Split data
flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=42)

In [81]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression = LinearRegression(labelCol='duration').fit(flights_train)

# Create predictions for the testing data and take a look at the predictions
predictions = regression.transform(flights_test)
predictions.select('duration', 'prediction').show(5, False)

# Calculate the RMSE
rmse = RegressionEvaluator(labelCol='duration').evaluate(predictions)
print("The test RMSE is", rmse)

23/03/01 10:45:45 WARN Instrumentation: [e95f333b] regParam is zero, which might cause numerical instability and overfitting.
+--------+-----------------+
|duration|prediction       |
+--------+-----------------+
|44      |52.32045015618229|
|44      |52.32045015618229|
|44      |52.32045015618229|
|46      |52.32045015618229|
|46      |52.32045015618229|
+--------+-----------------+
only showing top 5 rows

The test RMSE is 16.881752082844525


#### Interpreting the coefficients
The linear regression model for flight duration as a function of distance takes the form

alpha + beta * distance

where

 - alpha — intercept (component of duration which does not depend on distance) and
 - beta — coefficient (rate at which duration increases as a function of distance; also called the slope).
By looking at the coefficients of your model you will be able to infer

how much of the average flight duration is actually spent on the ground and
what the average speed is during a flight.

In [82]:
# Intercept (average minutes on ground)
inter = regression.intercept
print(inter)

# Coefficients
coefs = regression.coefficients
print(coefs)

# Average minutes per km
minutes_per_km = regression.coefficients[0]
print(minutes_per_km)

# Average speed in km per hour
avg_speed = 60 / minutes_per_km
print(avg_speed)

44.147190398097976
[0.07567833109337331]
0.07567833109337331
792.8293229137269


#### Flight duration model: Adding origin airport
Some airports are busier than others. Some airports are bigger than others too. Flights departing from large or busy airports are likely to spend more time taxiing or waiting for their takeoff slot. So it stands to reason that the duration of a flight might depend not only on the distance being covered but also the airport from which the flight departs.

You are going to make the regression model a little more sophisticated by including the departure airport as a predictor.

In [83]:
from pyspark.ml.feature import VectorAssembler

flights = flights_onehot.select('km', 'org_dummy', 'duration')

# Create an assembler object
assembler = VectorAssembler(inputCols=['km', 'org_dummy'], outputCol='features')

# Consolidate predictor columns
flights = assembler.transform(flights)

# Split data
flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=42)

In [84]:
flights.show(10, truncate=False)

+------+-------------+--------+----------------------+
|km    |org_dummy    |duration|features              |
+------+-------------+--------+----------------------+
|509.0 |(7,[0],[1.0])|82      |(8,[0,1],[509.0,1.0]) |
|542.0 |(7,[1],[1.0])|82      |(8,[0,2],[542.0,1.0]) |
|1989.0|(7,[0],[1.0])|195     |(8,[0,1],[1989.0,1.0])|
|885.0 |(7,[1],[1.0])|102     |(8,[0,2],[885.0,1.0]) |
|1180.0|(7,[0],[1.0])|135     |(8,[0,1],[1180.0,1.0])|
|2317.0|(7,[0],[1.0])|232     |(8,[0,1],[2317.0,1.0])|
|2943.0|(7,[5],[1.0])|250     |(8,[0,6],[2943.0,1.0])|
|254.0 |(7,[1],[1.0])|60      |(8,[0,2],[254.0,1.0]) |
|2356.0|(7,[1],[1.0])|210     |(8,[0,2],[2356.0,1.0])|
|1574.0|(7,[0],[1.0])|160     |(8,[0,1],[1574.0,1.0])|
+------+-------------+--------+----------------------+
only showing top 10 rows



In [85]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression = LinearRegression(labelCol='duration').fit(flights_train)

# Create predictions for the testing data
predictions = regression.transform(flights_test)

# Calculate the RMSE on testing data
rmse = RegressionEvaluator(labelCol='duration').evaluate(predictions)
print("The test RMSE is", rmse)

23/03/01 10:45:47 WARN Instrumentation: [3d349ad7] regParam is zero, which might cause numerical instability and overfitting.
The test RMSE is 10.965027847682059


#### Interpreting coefficients
Remember that origin airport, org, has eight possible values (ORD, SFO, JFK, LGA, SMF, SJC, TUS and OGG) which have been one-hot encoded to seven dummy variables in org_dummy.

The values for km and org_dummy have been assembled into features, which has eight columns with sparse representation. Column indices in features are as follows:

0 — km
1 — ORD
2 — SFO
3 — JFK
4 — LGA
5 — SMF
6 — SJC and
7 — TUS.
Note that OGG does not appear in this list because it is the reference level for the origin airport category.

In this exercise you'll be using the intercept and coefficients attributes to interpret the model.

The coefficients attribute is a list, where the first element indicates how flight duration changes with flight distance.

In [86]:
regression.coefficients

DenseVector([0.0743, 28.849, 20.8879, 52.6755, 46.8885, 15.9608, 18.5126, 18.2813])

In [87]:
# Average speed in km per hour
avg_speed_hour = 60 / regression.coefficients[0]
print(avg_speed_hour)

# Average minutes on ground at OGG
inter = regression.intercept
print(inter)

# Average minutes on ground at JFK
avg_ground_jfk = inter + regression.coefficients[3]
print(avg_ground_jfk)

# Average minutes on ground at LGA
avg_ground_lga = inter + regression.coefficients[4]
print(avg_ground_lga)

807.7693956642806
15.548047084067454
68.22354950340903
62.43652918727565


### Bucketing and Engineering
#### Bucketing departure time
Time of day data are a challenge with regression models. They are also a great candidate for bucketing.

In this lesson you will convert the flight departure times from numeric values between 0 (corresponding to 00:00) and 24 (corresponding to 24:00) to binned values. You'll then take those binned values and one-hot encode them.

In [88]:
flights_onehot.show()

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+-------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|    org_dummy|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+-------------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|        0.0|    0.0|(7,[0],[1.0])|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|        0.0|    1.0|(7,[1],[1.0])|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|        1.0|    0.0|(7,[0],[1.0])|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|        0.0|    1.0|(7,[1],[1.0])|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|        1.0|    0.0|(7,[0],[1.0])|
|  1| 16|  6|     UA|ORD|   8.0|     232|   -7|2317.0|    0|        0.0|    0.0|(7,[0],[1.0])|
|  1| 22|  5|     UA|SJC|  7.98|     250|  -13|2943.0|    0|        0.0|    5.0|(7,[5],[1.0])|
| 11|  8|  1|     OO|SFO|  7.77|      60|   88| 25

In [89]:
from pyspark.ml.feature import Bucketizer, OneHotEncoder

# Create buckets at 3 hour intervals through the day
buckets = Bucketizer(splits=[0, 3, 6, 9, 12, 15, 18, 21, 24], inputCol='depart', outputCol='depart_bucket')

# Bucket the departure times
bucketed = buckets.transform(flights_onehot)
bucketed.select('depart', 'depart_bucket').show(5)

# Create a one-hot encoder
onehot = OneHotEncoder(inputCols=['depart_bucket'], outputCols=['depart_dummy'])

# One-hot encode the bucketed departure times
flights_onehot = onehot.fit(bucketed).transform(bucketed)
flights_onehot.select('depart', 'depart_bucket', 'depart_dummy').show(5)

+------+-------------+
|depart|depart_bucket|
+------+-------------+
| 16.33|          5.0|
|  6.17|          2.0|
| 10.33|          3.0|
|  7.98|          2.0|
| 10.83|          3.0|
+------+-------------+
only showing top 5 rows

+------+-------------+-------------+
|depart|depart_bucket| depart_dummy|
+------+-------------+-------------+
| 16.33|          5.0|(7,[5],[1.0])|
|  6.17|          2.0|(7,[2],[1.0])|
| 10.33|          3.0|(7,[3],[1.0])|
|  7.98|          2.0|(7,[2],[1.0])|
| 10.83|          3.0|(7,[3],[1.0])|
+------+-------------+-------------+
only showing top 5 rows



#### Flight duration model: Adding departure time
In the previous exercise the departure time was bucketed and converted to dummy variables. Now you're going to include those dummy variables in a regression model for flight duration.

The data are in flights. The km, org_dummy and depart_dummy columns have been assembled into features, where km is index 0, org_dummy runs from index 1 to 7 and depart_dummy from index 8 to 14.

In [90]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

flights = flights_onehot.select('km', 'org_dummy', 'depart_dummy', 'duration')

# Create an assembler object
assembler = VectorAssembler(inputCols=['km', 'org_dummy',  'depart_dummy'], outputCol='features')

# Consolidate predictor columns
flights = assembler.transform(flights)

# Split data
flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=42)

# Create a regression object and train on training data
regression = LinearRegression(labelCol='duration').fit(flights_train)

# Create predictions for the testing data
predictions = regression.transform(flights_test)

# Calculate the RMSE on testing data
rmse = RegressionEvaluator(labelCol='duration').evaluate(predictions)
print("The test RMSE is", rmse)


23/03/01 10:45:49 WARN Instrumentation: [f1e35b7d] regParam is zero, which might cause numerical instability and overfitting.


                                                                                

The test RMSE is 10.799923937159482


                                                                                

In [91]:
regression.coefficients

DenseVector([0.0743, 27.4974, 20.5218, 51.658, 45.7574, 15.2996, 17.6986, 17.8124, -13.5611, 1.3989, 3.877, 6.8854, 4.6242, 8.9851, 8.7575])

In [92]:
# Average minutes on ground at OGG for flights departing between 21:00 and 24:00
avg_eve_ogg = regression.coefficients[14]
print(avg_eve_ogg)

# Average minutes on ground at OGG for flights departing between 03:00 and 06:00
avg_night_ogg = regression.intercept + regression.coefficients[8]
print(avg_night_ogg)

# Average minutes on ground at JFK for flights departing between 03:00 and 06:00
avg_night_jfk = regression.intercept + regression.coefficients[3] + regression.coefficients[9]
print(avg_night_jfk)

8.757533089141338
-3.3037768914878853
63.31414109793158


#### Flight duration model: More features!
Let's add more features to our model. This will not necessarily result in a better model. Adding some features might improve the model. Adding other features might make it worse.

More features will always make the model more complicated and difficult to interpret.

These are the features you'll include in the next model:

- km
- org (origin airport, one-hot encoded, 8 levels)
- depart (departure time, binned in 3 hour intervals, one-hot encoded, 8 levels)
- dow (departure day of week, one-hot encoded, 7 levels) and
- mon (departure month, one-hot encoded, 12 levels).
These have been assembled into the features column, which is a sparse representation of 32 columns (remember one-hot encoding produces a number of columns which is one fewer than the number of levels).

In [93]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

flights_copy = flights_onehot

# Index features
flights_copy = StringIndexer(inputCol='dow', outputCol='dow_idx').fit(flights_copy).transform(flights_copy)
flights_copy = StringIndexer(inputCol='mon', outputCol='mon_idx').fit(flights_copy).transform(flights_copy)

# One hot encode indexes
flights_encoded = OneHotEncoder(inputCols=['dow_idx', 'mon_idx'], outputCols=['dow_dummy', 'mon_dummy']).fit(flights_copy).transform(flights_copy)

flights_encoded.show(5)

# Create an assembler object
assembler = VectorAssembler(inputCols=['km', 'org_dummy',  'depart_dummy', 'dow_dummy', 'mon_dummy'], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights_encoded)

flights_assembled.show(5)

# Split data
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=42)

# Fit linear regression model to training data
regression = LinearRegression(labelCol='duration').fit(flights_train)

# Make predictions on testing data
predictions = regression.transform(flights_test)

# Calculate the RMSE on testing data
rmse = RegressionEvaluator(labelCol='duration').evaluate(predictions)
print("The test RMSE is", rmse)

# Look at the model coefficients
coeffs = regression.coefficients
print(coeffs)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+-------------+-------------+-------------+-------+-------+-------------+---------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|    org_dummy|depart_bucket| depart_dummy|dow_idx|mon_idx|    dow_dummy|      mon_dummy|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+-------------+-------------+-------------+-------+-------+-------------+---------------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|        0.0|    0.0|(7,[0],[1.0])|          5.0|(7,[5],[1.0])|    3.0|    2.0|(6,[3],[1.0])| (11,[2],[1.0])|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|        0.0|    1.0|(7,[1],[1.0])|          2.0|(7,[2],[1.0])|    2.0|    3.0|(6,[2],[1.0])| (11,[3],[1.0])|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|        1.0|    0.0|(7,[0],[1.0])|          3.0|(7,[3],[1.0])|    1.0|   10.0|(6,[1],[1.0])|(11

                                                                                

The test RMSE is 10.554263427234302
[0.0744414737608538,27.476492289083733,20.318851150596387,51.823424122390286,45.89694430098392,15.295887358002648,17.71215150132338,17.355112314965666,-15.117129298334117,1.5041021032111557,3.9491015790394504,6.891971690057159,4.563688708471149,8.720145274986558,8.657433176655996,0.27174673946862643,0.2536668170751725,0.5382243945078907,-0.0699376470661024,0.46418453532858206,0.3531761617513729,-3.254252659270541,-3.569024907839878,-0.8826327834030728,-1.072237629084685,-1.1940173986581706,-3.4094581481702977,1.1243430652559239,-3.339162805437828,-2.4737835594203688,-2.9268883610944285,-1.925643162240846]


                                                                                

#### Flight duration model: Regularization!
In the previous exercise you added more predictors to the flight duration model. The model performed well on testing data, but with so many coefficients it was difficult to interpret.

In this exercise you'll use Lasso regression (regularized with a L1 penalty) to create a more parsimonious model. Many of the coefficients in the resulting model will be set to zero. This means that only a subset of the predictors actually contribute to the model. Despite the simpler model, it still produces a good RMSE on the testing data.

You'll use a specific value for the regularization strength. Later you'll learn how to find the best value using cross validation.

The data (same as previous exercise) are available as flights, randomly split into flights_train and flights_test.

There are two parameters for this model, λ (regParam) and α (elasticNetParam), where α determines the type of regularization and λ gives the strength of regularization.

In [94]:
# Fit Lasso model (λ = 1, α = 1) to training data
lasso_regression = LinearRegression(labelCol='duration', regParam=1, elasticNetParam=1).fit(flights_train)

# Calculate the RMSE on testing data
rmse = RegressionEvaluator(labelCol='duration').evaluate(lasso_regression.transform(flights_test))
print("The test RMSE is", rmse)

# Look at the model coefficients
coeffs = lasso_regression.coefficients
print(coeffs)

# Number of zero coefficients
zero_coeff = sum([beta == 0 for beta in lasso_regression.coefficients])
print("Number of coefficients equal to 0:", zero_coeff)

                                                                                

The test RMSE is 11.522562173422466
[0.07356192230683124,5.6416318102541325,0.0,28.99065161814343,21.909967609980196,-2.177446628137316,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0174428165447469,1.0811004812733236,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]
Number of coefficients equal to 0: 25


                                                                                

In [95]:
# Fit Ridge model (λ = 0.1, α = 0) to training data
ridge_regression = LinearRegression(labelCol='duration', regParam=0.1, elasticNetParam=0).fit(flights_train)

# Calculate the RMSE on testing data
rmse = RegressionEvaluator(labelCol='duration').evaluate(ridge_regression.transform(flights_test))
print("The test RMSE is", rmse)

# Look at the model coefficients
coeffs = ridge_regression.coefficients
print(coeffs)

# Number of zero coefficients
zero_coeff = sum([beta == 0 for beta in ridge_regression.coefficients])
print("Number of coefficients equal to 0:", zero_coeff)

                                                                                

The test RMSE is 10.55504799541556
[0.07434569538792768,26.21854651731295,19.07127978448294,50.58281732606495,44.59554456720016,14.001241138038225,16.421992474435378,16.06704775259543,-14.945782307288091,1.4871982735804619,3.942745596495914,6.878034210590624,4.5082723908256535,8.676042454615756,8.616431879393879,0.27092608669889934,0.25729490789209575,0.5381851120168684,-0.0667540835662893,0.4665379508412464,0.354318088371748,-3.207050593454767,-3.529862189055684,-0.8502077348164557,-1.0273455787525143,-1.1639824033459127,-3.3665597469646724,1.15797413850462,-3.293649682701467,-2.4303212874852784,-2.8866098464029912,-1.8959748789360564]
Number of coefficients equal to 0: 0


## Ensembles and Pipelines
#### Flight duration model: Pipeline stages
You're going to create the stages for the flights duration model pipeline. You will use these in the next exercise to build a pipeline and to create a regression model.

In [96]:
flights_data = flights_km

# Split data
flights_train, flights_test = flights_data.randomSplit([0.8, 0.2], seed=42)

In [97]:
# Convert categorical strings to index values
indexer = StringIndexer(inputCol='org', outputCol = 'org_idx')

# One-hot encode index values
onehot = OneHotEncoder(inputCols=['org_idx', 'dow'],outputCols=['org_dummy', 'dow_dummy'])

# Assemble predictors into a single column
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'dow_dummy'], outputCol='features')

# A linear regression object
regression = LinearRegression(labelCol='duration')

#### Flight duration model: Pipeline model
You're now ready to put those stages together in a pipeline.

You'll construct the pipeline and then train the pipeline on the training data. This will apply each of the individual stages in the pipeline to the training data in turn. None of the stages will be exposed to the testing data at all: there will be no leakage!

Once the entire pipeline has been trained it will then be used to make predictions on the testing data.

In [98]:
# Import class for creating a pipeline
from pyspark.ml import Pipeline

# Construct a pipeline
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])

# Train the pipeline on the training data
pipeline = pipeline.fit(flights_train)

# Make predictions on the testing data
predictions = pipeline.transform(flights_test)

23/03/01 10:46:01 WARN Instrumentation: [7cb938fd] regParam is zero, which might cause numerical instability and overfitting.


#### SMS spam pipeline

In [99]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol='terms', outputCol="hash")
idf = IDF(inputCol='hash', outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

### Cross-Validation
#### Cross validating simple flight duration model
You've already built a few models for predicting flight duration and evaluated them with a simple train/test split. However, cross-validation provides a much better way to evaluate model performance.

In this exercise you're going to train a simple model for flight duration using cross-validation. Travel time is usually strongly correlated with distance, so using the km column alone should give a decent model.

In [100]:
flights_data = flights_km.select('km','duration')

# Assemble predictors into a single column
assembler = VectorAssembler(inputCols=['km'], outputCol='features')

# Consolidate predictor columns
flights = assembler.transform(flights_data)

# Split data
flights_train, flights_test = flights.randomSplit([0.8, 0.2], seed=42)

In [101]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create an empty parameter grid
params = ParamGridBuilder().build()

# Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')

# Create a cross validator
cv = CrossValidator(estimator=regression, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

# Train and test model on multiple folds of the training data
cv = cv.fit(flights_train)

# RMSE across the folds
cv.avgMetrics

23/03/01 10:46:03 WARN Instrumentation: [e4c8fca4] regParam is zero, which might cause numerical instability and overfitting.
23/03/01 10:46:04 WARN Instrumentation: [f0a01a92] regParam is zero, which might cause numerical instability and overfitting.
23/03/01 10:46:05 WARN Instrumentation: [f40bc990] regParam is zero, which might cause numerical instability and overfitting.
23/03/01 10:46:06 WARN Instrumentation: [ec402a61] regParam is zero, which might cause numerical instability and overfitting.
23/03/01 10:46:07 WARN Instrumentation: [2f3b5039] regParam is zero, which might cause numerical instability and overfitting.
23/03/01 10:46:09 WARN Instrumentation: [cffa30bd] regParam is zero, which might cause numerical instability and overfitting.


[16.991223864250294]

#### Cross validating flight duration model pipeline
The cross-validated model that you just built was simple, using km alone to predict duration.

Another important predictor of flight duration is the origin airport. Flights generally take longer to get into the air from busy airports. Let's see if adding this predictor improves the model!

In this exercise you'll add the org field to the model. However, since org is categorical, there's more work to be done before it can be included: it must first be transformed to an index and then one-hot encoded before being assembled with km and used to build the regression model. We'll wrap these operations up in a pipeline.

In [102]:
flights_data = flights_km

flights_train, flights_test = flights_data.randomSplit([0.8, 0.2], seed=42)

In [103]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# Create an indexer for the org field
indexer = StringIndexer(inputCol='org', outputCol='org_idx')

# Create an one-hot encoder for the indexed org field
onehot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])

# Assemble the km and one-hot encoded fields
assembler = VectorAssembler(inputCols=['km', 'org_dummy'], outputCol='features')

# Create an empty parameter grid
params = ParamGridBuilder().build()

# Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')


# Wrap operations in a pipeline
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])

# Crossvalidate
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator)

### Grid Search
#### Optimizing flights linear regression
Up until now you've been using the default hyper-parameters when building your models. In this exercise you'll use cross validation to choose an optimal (or close to optimal) set of model hyper-parameters.

In [104]:
# Create parameter grid
params = ParamGridBuilder()

# Add grids for two parameters
params = params.addGrid(regression.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(regression.elasticNetParam, [0.0, 0.5, 1.0])

# Build the parameter grid
params = params.build()
print('Number of models to be tested: ', len(params))

# Create cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

cv = cv.fit(flights_train)

Number of models to be tested:  12


In [105]:
cv.avgMetrics

[11.059163348452294,
 11.059527999145178,
 11.060280227523423,
 11.061124008158265,
 11.092177407517045,
 11.16680002324171,
 11.183337633637432,
 11.528746711721118,
 11.707184046885299,
 14.546046920457792,
 17.02111596336177,
 19.159509155583866]

#### Dissecting the best flight duration model
You just set up a CrossValidator to find good parameters for the linear regression model predicting flight duration.

The model pipeline has multiple stages (objects of type StringIndexer, OneHotEncoder, VectorAssembler and LinearRegression), which operate in sequence. The stages are available as the stages attribute on the pipeline object. They are represented by a list and the stages are executed in the sequence in which they appear in the list.

Now you're going to take a closer look at the pipeline, split out the stages and use it to make predictions on the testing data.

In [106]:
# Get the best model from cross validation
best_model = cv.bestModel

# Look at the stages in the best model
print(best_model.stages)

# Get the parameters for the LinearRegression object in the best model
best_model.stages[3].extractParamMap()

# Generate predictions on testing data using the best model then calculate RMSE
predictions = best_model.transform(flights_test)
print("RMSE =", evaluator.evaluate(predictions))

[StringIndexerModel: uid=StringIndexer_c6b1796fb1b4, handleInvalid=error, OneHotEncoderModel: uid=OneHotEncoder_62978778669f, dropLast=true, handleInvalid=error, numInputCols=1, numOutputCols=1, VectorAssembler_876542c1b3b0, LinearRegressionModel: uid=LinearRegression_2f505d3c7a51, numFeatures=8]
RMSE = 10.958444084573124


#### SMS spam optimised
The pipeline you built earlier for the SMS spam model used the default parameters for all of the elements in the pipeline. It's very unlikely that these parameters will give a particularly good model though. In this exercise you're going to run the pipeline for a selection of parameter values. We're going to do this in a systematic way: the values for each of the parameters will be laid out on a grid and then pipeline will systematically run across each point in the grid.

In this exercise you'll set up a parameter grid which can be used with cross validation to choose a good set of parameters for the SMS spam classifier.

In [108]:
sms_clean_tokenized.show()

+---+--------------------+-----+--------------------+
| id|                text|label|               words|
+---+--------------------+-----+--------------------+
|  1|Sorry I'll call l...|    0|[sorry, i'll, cal...|
|  2|Dont worry I gues...|    0|[dont, worry, i, ...|
|  3| Call FREEPHONE now |    1|[call, freephone,...|
|  4|Win a cash prize ...|    1|[win, a, cash, pr...|
|  5|Go until jurong p...|    0|[go, until, juron...|
|  6|Ok lar Joking wif...|    0|[ok, lar, joking,...|
|  7|Free entry in a w...|    1|[free, entry, in,...|
|  8|U dun say so earl...|    0|[u, dun, say, so,...|
|  9|Nah I don't think...|    0|[nah, i, don't, t...|
| 10|FreeMsg Hey there...|    1|[freemsg, hey, th...|
| 11|Even my brother i...|    0|[even, my, brothe...|
| 12|As per your reque...|    0|[as, per, your, r...|
| 13|WINNER As a value...|    1|[winner, as, a, v...|
| 14|Had your mobile m...|    1|[had, your, mobil...|
| 15|I'm gonna be home...|    0|[i'm, gonna, be, ...|
| 16|SIX chances to wi...|  

In [116]:
sms_train, sms_test = sms_clean_tokenized.randomSplit([0.7, 0.3], seed=42)

In [118]:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Remove stop words.
stop_words = StopWordsRemover(inputCol='words', outputCol='terms')

# Apply the hashing trick
hasher = HashingTF(inputCol='terms', outputCol='hash')
                 

# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol='hash', outputCol='features')

# Logistic Regression model
logistic = LogisticRegression(labelCol='label')
binary_evaluator = BinaryClassificationEvaluator(labelCol='label')

# Create parameter grid
params = ParamGridBuilder()

# Add grid for hashing trick parameters
params = params.addGrid(hasher.numFeatures, [1024, 4096, 16384]) \
               .addGrid(hasher.binary, [True, False])

# Add grid for logistic regression parameters
params = params.addGrid(logistic.regParam, [0.01, 0.1, 1.0, 10]) \
               .addGrid(logistic.elasticNetParam, [0.0, 0.5, 1.0])

# Build parameter grid
params = params.build()
print('Number of models to be tested: ', len(params))

# Wrap in pipeline
pipeline = Pipeline(stages=[stop_words, hasher, tf_idf, logistic])  

# Crossvalidate
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=binary_evaluator, numFolds=3)

Number of models to be tested:  72


In [119]:
cv = cv.fit(sms_train)

In [130]:
# Get the best model from cross validation
best_model = cv.bestModel

# Look at the stages in the best model
print(best_model.stages)

[StopWordsRemover_442c344ecb95, HashingTF_d21be83aab5f, IDFModel: uid=IDF_c69ad55bce7c, numDocs=4503, numFeatures=4096, LogisticRegressionModel: uid=LogisticRegression_c4d5e55ab6ae, numClasses=2, numFeatures=4096]


In [131]:
# Get the parameters for the LinearRegression object in the best model
best_model.stages[3].extractParamMap()

{Param(parent='LogisticRegression_c4d5e55ab6ae', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_c4d5e55ab6ae', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_c4d5e55ab6ae', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_c4d5e55ab6ae', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_c4d5e55ab6ae', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_c4d5e55ab6ae', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_c4d5e55ab6ae', name='maxBlockSizeInMB', doc='maximum memory in MB for s

In [132]:
# Get the parameters for the HahingTF object in the best model
best_model.stages[1].extractParamMap()

{Param(parent='HashingTF_d21be83aab5f', name='binary', doc='If True, all non zero counts are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False.'): True,
 Param(parent='HashingTF_d21be83aab5f', name='numFeatures', doc='Number of features. Should be greater than 0.'): 4096,
 Param(parent='HashingTF_d21be83aab5f', name='outputCol', doc='output column name.'): 'hash',
 Param(parent='HashingTF_d21be83aab5f', name='inputCol', doc='input column name.'): 'terms'}

In [133]:
# Generate predictions on testing data using the best model then calculate auc
predictions = best_model.transform(sms_test)

# Create a confusion matrix, comparing predictions to known labels
predictions.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   68|
|    0|       0.0|  929|
|    1|       1.0|   74|
+-----+----------+-----+



In [134]:
# Calculate evaluation metrics
evaluate_binary_classifier(predictions)

accuracy: 0.9365079365079365
precision = 1.0
recall = 0.5211267605633803
weighted_precision = 0.9408383881804142
auc = 0.9839521520944933


### Ensemble
#### Delayed flights with Gradient-Boosted Trees
You've previously built a classifier for flights likely to be delayed using a Decision Tree. In this exercise you'll compare a Decision Tree model to a Gradient-Boosted Trees model.


In [148]:
flights_data = flights_km

In [149]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=['mon','depart', 'duration'], outputCol='features')

# Consolidate predictor columns
assembled = assembler.transform(flights_data)

# Check the resulting column
flights_data = assembled.select('mon', 'depart', 'duration', 'features', 'label')

flights_data.show(5)

flights_train, flights_test = flights_data.randomSplit([0.8, 0.2], seed=42)

+---+------+--------+-----------------+-----+
|mon|depart|duration|         features|label|
+---+------+--------+-----------------+-----+
|  0| 16.33|      82| [0.0,16.33,82.0]|    1|
|  2|  6.17|      82|  [2.0,6.17,82.0]|    0|
|  9| 10.33|     195|[9.0,10.33,195.0]|    0|
|  5|  7.98|     102| [5.0,7.98,102.0]|    0|
|  7| 10.83|     135|[7.0,10.83,135.0]|    1|
+---+------+--------+-----------------+-----+
only showing top 5 rows



In [150]:
# Import the classes required
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create model objects and train on training data
tree = DecisionTreeClassifier().fit(flights_train)
gbt = GBTClassifier().fit(flights_train)

# Compare AUC on testing data
evaluator = BinaryClassificationEvaluator()
print(evaluator.evaluate(tree.transform(flights_test)))
print(evaluator.evaluate(gbt.transform(flights_test)))

# Find the number of trees and the relative importance of features
print(gbt.trees)
print(gbt.featureImportances)

0.6378906630450076
0.677416190512264
[DecisionTreeRegressionModel: uid=dtr_c8105611e708, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_2d9d2e6cb798, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_f57338b6dc89, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_584245145ea9, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_3495db58d509, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_94e441e6404f, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_09158af0dac2, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_e7c0d4548729, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_26847efec51a, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_c2eb364292ab, depth=5, numNodes=63, numFeatures=3, DecisionTreeRegressionModel: uid=dtr_c1e0a9cb4a4d, depth=5, numNodes=63, numFeatur

#### Delayed flights with a Random Forest
In this exercise you'll bring together cross validation and ensemble methods. You'll be training a Random Forest classifier to predict delayed flights, using cross validation to choose the best values for model parameters.

You'll find good values for the following parameters:

featureSubsetStrategy — the number of features to consider for splitting at each node and
maxDepth — the maximum number of splits along any branch.

In [152]:
from pyspark.ml.classification import RandomForestClassifier
# Create a random forest classifier
forest = RandomForestClassifier()

# Create a parameter grid
params = ParamGridBuilder() \
            .addGrid(forest.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2']) \
            .addGrid(forest.maxDepth, [2, 5, 10]) \
            .build()

# Create a binary classification evaluator
evaluator = BinaryClassificationEvaluator()

# Create a cross-validator
cv = CrossValidator(estimator=forest, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

In [153]:
cv = cv.fit(flights_train)

23/03/01 13:46:41 WARN DAGScheduler: Broadcasting large task binary with size 1257.4 KiB
23/03/01 13:46:42 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


                                                                                

23/03/01 13:46:43 WARN DAGScheduler: Broadcasting large task binary with size 1373.9 KiB
23/03/01 13:46:49 WARN DAGScheduler: Broadcasting large task binary with size 1238.9 KiB
23/03/01 13:46:56 WARN DAGScheduler: Broadcasting large task binary with size 1288.1 KiB
23/03/01 13:46:57 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/03/01 13:46:58 WARN DAGScheduler: Broadcasting large task binary with size 1259.4 KiB
23/03/01 13:47:03 WARN DAGScheduler: Broadcasting large task binary with size 1288.1 KiB
23/03/01 13:47:04 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/03/01 13:47:05 WARN DAGScheduler: Broadcasting large task binary with size 1259.4 KiB
23/03/01 13:47:12 WARN DAGScheduler: Broadcasting large task binary with size 1251.3 KiB
23/03/01 13:47:13 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/03/01 13:47:14 WARN DAGScheduler: Broadcasting large task binary with size 1379.3 KiB
23/03/01 13:47:20 WARN DAGSche

In [157]:
# Get the best model from cross validation
best_model = cv.bestModel

# Generate predictions on testing data using the best model then calculate auc
predictions = best_model.transform(flights_test)

# Create a confusion matrix, comparing predictions to known labels
predictions.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1509|
|    0|       0.0| 2656|
|    1|       1.0| 3307|
|    0|       1.0| 1949|
+-----+----------+-----+



In [158]:
# Calculate evaluation metrics
evaluate_binary_classifier(predictions)

accuracy: 0.6329476700987157
precision = 0.629185692541857
recall = 0.6866694352159468
weighted_precision = 0.6333450938982398
auc = 0.6811742481630176


#### Evaluating Random Forest
In this final exercise you'll be evaluating the results of cross-validation on a Random Forest model.

In [159]:
# Average AUC for each parameter combination in grid
print(cv.avgMetrics)

# Average AUC for the best model
print(max(cv.avgMetrics))

# What's the optimal parameter value for maxDepth?
print(cv.bestModel.explainParam('maxDepth'))
# What's the optimal parameter value for featureSubsetStrategy?
print(cv.bestModel.explainParam('featureSubsetStrategy'))

# AUC for best model on testing data
print(evaluator.evaluate(cv.transform(flights_test)))

[0.6172931196207102, 0.6593843727408606, 0.6705543812470856, 0.645222146976068, 0.6635712731703001, 0.6751211594695575, 0.6400507166844205, 0.6633351179969351, 0.6713220421555475, 0.6400507166844205, 0.6633351179969351, 0.6713220421555475]
0.6751211594695575
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30]. (default: 5, current: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)), 'n' (when n is in the range (0, 1.0], use n * number of features. When n is in the range (1, number of features), use n features). default = 'auto' (def