In [47]:
# run this to shorten the data import from the files
path_data = '/home/nero/Documents/Estudos/DataCamp/Python/Machine_Learning_with_PySpark/datasets/'

# start spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('chapter_04').getOrCreate()

23/07/14 07:58:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [48]:
flights = spark.read.csv(path_data + 'clean_flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv(path_data+'sms.csv', sep=';', header=False, schema=schema)

sms.createOrReplaceTempView('sms')
flights.createOrReplaceTempView('flights')

spark.catalog.listTables()

                                                                                

[Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='sms', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [49]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer

In [50]:
# exercise 01

"""
Flight duration model: Pipeline stages

You're going to create the stages for the flights duration model pipeline. You will use these in the next exercise to build a pipeline and to create a regression model.

The StringIndexer, OneHotEncoder, VectorAssembler and LinearRegression classes are already imported.
"""

# Instructions

"""

    Create an indexer to convert the 'org' column into an indexed column called 'org_idx'.
    Create a one-hot encoder to convert the 'org_idx' and 'dow' columns into dummy variable columns called 'org_dummy' and 'dow_dummy'.
    Create an assembler which will combine the 'km' column with the two dummy variable columns. The output column should be called 'features'.
    Create a linear regression object to predict flight duration.

You might find it useful to revisit the slides from the lessons in the Slides panel next to the IPython Shell.
"""

# solution

# Convert categorical strings to index values
indexer = StringIndexer(inputCol='org', outputCol='org_idx')

# One-hot encode index values
onehot = OneHotEncoder(
    inputCols=['org_idx','dow'],
    outputCols=['org_dummy','dow_dummy']
)

# Assemble predictors into a single column
assembler = VectorAssembler(inputCols=['km','org_dummy','dow_dummy'], outputCol='features')

# A linear regression object
regression = LinearRegression(labelCol='duration')

#----------------------------------#

# Conclusion

"""
Excellent! The stages are now ready for you to build a pipeline.
"""

'\nExcellent! The stages are now ready for you to build a pipeline.\n'

In [51]:
flights = flights.withColumn('km', flights.mile * 1.609344).drop('mile')

In [52]:
from pyspark.sql.functions import round
flights = flights.withColumn('km', round(flights.km, 2))

In [53]:
flights.show(5)

+---+---+---+-------+---+------+--------+-----+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|     km|
+---+---+---+-------+---+------+--------+-----+-------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 508.55|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.35|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.15|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.14|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1179.65|
+---+---+---+-------+---+------+--------+-----+-------+
only showing top 5 rows



In [54]:
flights_train , flights_test = flights.randomSplit([0.80,0.20])

In [55]:
# exercise 02

"""
Flight duration model: Pipeline model

You're now ready to put those stages together in a pipeline.

You'll construct the pipeline and then train the pipeline on the training data. This will apply each of the individual stages in the pipeline to the training data in turn. None of the stages will be exposed to the testing data at all: there will be no leakage!

Once the entire pipeline has been trained it will then be used to make predictions on the testing data.

The data are available as flights, which has been randomly split into flights_train and flights_test.
"""

# Instructions

"""

    Import the class for creating a pipeline.
    Create a pipeline object and specify the indexer, onehot, assembler and regression stages, in this order.
    Train the pipeline on the training data.
    Make predictions on the testing data.

"""

# solution

# Import class for creating a pipeline
from pyspark.ml import Pipeline

# Construct a pipeline
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])

# Train the pipeline on the training data
pipeline = pipeline.fit(flights_train)

# Make predictions on the testing data
predictions = pipeline.transform(flights_test)

#----------------------------------#

# Conclusion

"""
Fantastic! A pipeline makes your code easier to read and maintain.
"""

23/07/14 07:58:48 WARN Instrumentation: [b55dccd7] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

'\nFantastic! A pipeline makes your code easier to read and maintain.\n'

In [56]:
from pyspark.ml.classification import LogisticRegression

In [57]:
# exercise 03

"""
SMS spam pipeline

You haven't looked at the SMS data for quite a while. Last time we did the following:

    split the text into tokens
    removed stop words
    applied the hashing trick
    converted the data from counts to IDF and
    trained a logistic regression model.

Each of these steps was done independently. This seems like a great application for a pipeline!

The Pipeline and LogisticRegression classes have already been imported into the session, so you don't need to worry about that!
"""

# Instructions

"""

    Create an object for splitting text into tokens.
    Create an object to remove stop words. Rather than explicitly giving the input column name, use the getOutputCol() method on the previous object.
    Create objects for applying the hashing trick and transforming the data into a TF-IDF. Use the getOutputCol() method again.
    Create a pipeline which wraps all of the above steps as well as an object to create a Logistic Regression model.

"""

# solution

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol=remover.getOutputCol(), outputCol="hash")
idf = IDF(inputCol=hasher.getOutputCol(), outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

#----------------------------------#

# Conclusion

"""
Great job! Isn't that a lot simpler than applying each stage separately?
"""

"\nGreat job! Isn't that a lot simpler than applying each stage separately?\n"

In [58]:
flights.show()

+---+---+---+-------+---+------+--------+-----+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|     km|
+---+---+---+-------+---+------+--------+-----+-------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 508.55|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.35|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.15|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.14|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1179.65|
|  1| 16|  6|     UA|ORD|   8.0|     232|   -7|2317.46|
|  1| 22|  5|     UA|SJC|  7.98|     250|  -13|2943.49|
| 11|  8|  1|     OO|SFO|  7.77|      60|   88| 254.28|
|  4| 26|  1|     AA|SFO| 13.25|     210|  -10|2356.08|
|  4| 25|  0|     AA|ORD| 13.75|     160|   31|1573.94|
|  8| 30|  2|     UA|ORD| 13.28|     151|   16|1157.12|
|  3| 16|  3|     UA|ORD|   9.0|     264|    3|2808.31|
|  0|  3|  4|     AA|LGA| 17.08|     190|   32|1765.45|
|  5|  9|  1|     UA|SFO|  12.7|     158|   20|1556.24|
|  3| 10|  4|     B6|ORD| 17.58|     265|  155|2

In [59]:
assembler = VectorAssembler(inputCols=['km'], outputCol='features')
data = assembler.transform(flights)
flights_train, flights_test = data.randomSplit([0.80,0.20])

In [60]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [61]:
# exercise 04

"""
Cross validating simple flight duration model

You've already built a few models for predicting flight duration and evaluated them with a simple train/test split. However, cross-validation provides a much better way to evaluate model performance.

In this exercise you're going to train a simple model for flight duration using cross-validation. Travel time is usually strongly correlated with distance, so using the km column alone should give a decent model.

The data have been randomly split into flights_train and flights_test.

The following classes have already been imported: LinearRegression, RegressionEvaluator, ParamGridBuilder and CrossValidator.
"""

# Instructions

"""

    Create an empty parameter grid.
    Create objects for building and evaluating a linear regression model. The model should predict the "duration" field.
    Create a cross-validator object. Provide values for the estimator, estimatorParamMaps and evaluator arguments. Choose 5-fold cross validation.
    Train and test the model across multiple folds of the training data.

"""

# solution

# Create an empty parameter grid
params = ParamGridBuilder().build()

# Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')

# Create a cross validator
cv = CrossValidator(estimator=regression, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

# Train and test model on multiple folds of the training data
cv = cv.fit(flights_train)

# NOTE: Since cross-valdiation builds multiple models, the fit() method can take a little while to complete.

#----------------------------------#

# Conclusion

"""
Great! Now let's cross validate a model pipeline.
"""

23/07/14 07:58:55 WARN Instrumentation: [d6aeb831] regParam is zero, which might cause numerical instability and overfitting.
23/07/14 07:58:57 WARN Instrumentation: [431a4cbc] regParam is zero, which might cause numerical instability and overfitting.
23/07/14 07:58:59 WARN Instrumentation: [b0a4c754] regParam is zero, which might cause numerical instability and overfitting.
23/07/14 07:59:00 WARN Instrumentation: [75423e05] regParam is zero, which might cause numerical instability and overfitting.
23/07/14 07:59:01 WARN Instrumentation: [b8e526e3] regParam is zero, which might cause numerical instability and overfitting.
23/07/14 07:59:03 WARN Instrumentation: [09f8dce6] regParam is zero, which might cause numerical instability and overfitting.


"\nGreat! Now let's cross validate a model pipeline.\n"

In [62]:
cv.avgMetrics

[17.001812810108934]

In [63]:
# exercise 05

"""
Cross validating flight duration model pipeline

The cross-validated model that you just built was simple, using km alone to predict duration.

Another important predictor of flight duration is the origin airport. Flights generally take longer to get into the air from busy airports. Let's see if adding this predictor improves the model!

In this exercise you'll add the org field to the model. However, since org is categorical, there's more work to be done before it can be included: it must first be transformed to an index and then one-hot encoded before being assembled with km and used to build the regression model. We'll wrap these operations up in a pipeline.

The following objects have already been created:

    params — an empty parameter grid
    evaluator — a regression evaluator
    regression — a LinearRegression object with labelCol='duration'.

The StringIndexer, OneHotEncoder, VectorAssembler and CrossValidator classes have already been imported.
"""

# Instructions

"""

    Create a string indexer. Specify the input and output fields as org and org_idx.
    Create a one-hot encoder. Name the output field org_dummy.
    Assemble the km and org_dummy fields into a single field called features.
    Create a pipeline using the following operations: string indexer, one-hot encoder, assembler and linear regression. Use this to create a cross-validator.

"""
regression = LinearRegression(labelCol='duration')
# solution

# Create an indexer for the org field
indexer = StringIndexer(inputCol='org', outputCol='org_idx')

# Create an one-hot encoder for the indexed org field
onehot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])

# Assemble the km and one-hot encoded fields
assembler = VectorAssembler(inputCols=['org_dummy','km'], outputCol='features')

# Create a pipeline and cross-validator.
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])
cv = CrossValidator(estimator=pipeline,
          estimatorParamMaps=params,
          evaluator=evaluator)

#----------------------------------#

# Conclusion

"""
Wrapping operations in a pipeline makes cross validating the entire workflow easy!
"""

'\nWrapping operations in a pipeline makes cross validating the entire workflow easy!\n'

In [64]:
# exercise 06

"""
Optimizing flights linear regression

Up until now you've been using the default hyper-parameters when building your models. In this exercise you'll use cross validation to choose an optimal (or close to optimal) set of model hyper-parameters.

The following have already been created:

    regression — a LinearRegression object
    pipeline — a pipeline with string indexer, one-hot encoder, vector assembler and linear regression and
    evaluator — a RegressionEvaluator object.

"""

# Instructions

"""

    Create a parameter grid builder.
    Add grids for with regression.regParam (values 0.01, 0.1, 1.0, and 10.0) and regression.elasticNetParam (values 0.0, 0.5, and 1.0).
    Build the grid.
    Create a cross validator, specifying five folds.

"""

# solution

# Create parameter grid
params = ParamGridBuilder()

# Add grids for two parameters
params = params.addGrid(regression.regParam, [0.01,0.1,1.0,10.0]) \
               .addGrid(regression.elasticNetParam, [0.0,0.5,1.0])

# Build the parameter grid
params = params.build()
print('Number of models to be tested: ', len(params))

# Create cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

#----------------------------------#

# Conclusion

"""
Nice! Multiple models are built effortlessly using grid search.
"""

Number of models to be tested:  12


'\nNice! Multiple models are built effortlessly using grid search.\n'

In [65]:
flights_train, flights_test = flights.randomSplit([0.80,0.20])

In [66]:
cv = cv.fit(flights_train)

In [67]:
# exercise 07

"""
Dissecting the best flight duration model

You just set up a CrossValidator to find good parameters for the linear regression model predicting flight duration.

The model pipeline has multiple stages (objects of type StringIndexer, OneHotEncoder, VectorAssembler and LinearRegression), which operate in sequence. The stages are available as the stages attribute on the pipeline object. They are represented by a list and the stages are executed in the sequence in which they appear in the list.

Now you're going to take a closer look at the pipeline, split out the stages and use it to make predictions on the testing data.

The following objects have already been created:

    cv — a trained CrossValidatorModel object and
    evaluator — a RegressionEvaluator object.

The flights data have been randomly split into flights_train and flights_test.
"""

# Instructions

"""

    Retrieve the best model.
    Look at the stages in the best model.
    Isolate the linear regression stage and extract its parameters.
    Use the best model to generate predictions on the testing data and calculate the RMSE.

"""

# solution

# Get the best model from cross validation
best_model = cv.bestModel

# Look at the stages in the best model
print(best_model.stages)

# Get the parameters for the LinearRegression object in the best model
best_model.stages[3].extractParamMap()

# Generate predictions on testing data using the best model then calculate RMSE
predictions = best_model.transform(flights_test)
print("RMSE =", evaluator.evaluate(predictions))

#----------------------------------#

# Conclusion

"""
The best model performs pretty well on the testing data!
"""

[StringIndexerModel: uid=StringIndexer_9e236fd88b7b, handleInvalid=error, OneHotEncoderModel: uid=OneHotEncoder_09ecf555e94c, dropLast=true, handleInvalid=error, numInputCols=1, numOutputCols=1, VectorAssembler_617a265e6ee9, LinearRegressionModel: uid=LinearRegression_a567419c0ebc, numFeatures=8]
RMSE = 11.159192651672685


'\nThe best model performs pretty well on the testing data!\n'

In [68]:
# exercise 08

"""
SMS spam optimised

The pipeline you built earlier for the SMS spam model used the default parameters for all of the elements in the pipeline. It's very unlikely that these parameters will give a particularly good model though. In this exercise you're going to run the pipeline for a selection of parameter values. We're going to do this in a systematic way: the values for each of the parameters will be laid out on a grid and then pipeline will systematically run across each point in the grid.

In this exercise you'll set up a parameter grid which can be used with cross validation to choose a good set of parameters for the SMS spam classifier.

The following are already defined:

    hasher — a HashingTF object and
    logistic — a LogisticRegression object.

"""

# Instructions

"""

    Create a parameter grid builder object.
    Add grid points for numFeatures and binary parameters to the HashingTF object, giving values 1024, 4096 and 16384, and True and False, respectively.
    Add grid points for regParam and elasticNetParam parameters to the LogisticRegression object, giving values of 0.01, 0.1, 1.0 and 10.0, and 0.0, 0.5, and 1.0 respectively.
    Build the parameter grid.

"""

# solution

# Create parameter grid
params = ParamGridBuilder()

# Add grid for hashing trick parameters
params = params.addGrid(hasher.numFeatures, [1024,4096,16384]) \
               .addGrid(hasher.binary, [True, False])

# Add grid for logistic regression parameters
params = params.addGrid(logistic.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(logistic.elasticNetParam, [0.0,0.5,1.0])

# Build parameter grid
params = params.build()

#----------------------------------#

# Conclusion

"""
Using cross-validation on a pipeline makes it possible to optimise each stage in the workflow.
"""

'\nUsing cross-validation on a pipeline makes it possible to optimise each stage in the workflow.\n'

In [72]:
assembler = VectorAssembler(inputCols=['mon','depart','duration'], outputCol='features')
data = assembler.transform(flights)

In [75]:
data.show(5)

+---+---+---+-------+---+------+--------+-----+-------+-----------------+
|mon|dom|dow|carrier|org|depart|duration|delay|     km|         features|
+---+---+---+-------+---+------+--------+-----+-------+-----------------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 508.55| [0.0,16.33,82.0]|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.35|  [2.0,6.17,82.0]|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.15|[9.0,10.33,195.0]|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.14| [5.0,7.98,102.0]|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1179.65|[7.0,10.83,135.0]|
+---+---+---+-------+---+------+--------+-----+-------+-----------------+
only showing top 5 rows



In [74]:
import pyspark.sql.functions as F

def modify_label(df):
    df = df.withColumn('label', F.when(df.delay >= 15, 1).otherwise(0))
    return df

In [77]:
data = modify_label(data)

In [78]:
data.show(5)

+---+---+---+-------+---+------+--------+-----+-------+-----------------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|     km|         features|label|
+---+---+---+-------+---+------+--------+-----+-------+-----------------+-----+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 508.55| [0.0,16.33,82.0]|    1|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.35|  [2.0,6.17,82.0]|    0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.15|[9.0,10.33,195.0]|    0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.14| [5.0,7.98,102.0]|    0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1179.65|[7.0,10.83,135.0]|    1|
+---+---+---+-------+---+------+--------+-----+-------+-----------------+-----+
only showing top 5 rows



In [79]:
flights_train, flights_test = data.randomSplit([0.80,0.20])

In [80]:
# exercise 09

"""
Delayed flights with Gradient-Boosted Trees

You've previously built a classifier for flights likely to be delayed using a Decision Tree. In this exercise you'll compare a Decision Tree model to a Gradient-Boosted Trees model.

The flights data have been randomly split into flights_train and flights_test.
"""

# Instructions

"""

    Import the classes required to create Decision Tree and Gradient-Boosted Tree classifiers.
    Create Decision Tree and Gradient-Boosted Tree classifiers. Train on the training data.
    Create an evaluator and calculate AUC on testing data for both classifiers. Which model performs better?
    For the Gradient-Boosted Tree classifier print the number of trees and the relative importance of features.

"""

# solution

# Import the classes required
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create model objects and train on training data
tree = DecisionTreeClassifier().fit(flights_train)
gbt = GBTClassifier().fit(flights_train)

# Compare AUC on testing data
evaluator = BinaryClassificationEvaluator()
print(evaluator.evaluate(tree.transform(flights_test)))
print(evaluator.evaluate(gbt.transform(flights_test)))

# Find the number of trees and the relative importance of features
print(gbt.getNumTrees)
print(gbt.featureImportances)

#----------------------------------#

# Conclusion

"""
Good job! A Gradient-Boosted Tree almost always provides better performance than a plain Decision Tree.
"""

                                                                                

0.6082295234958037


                                                                                

0.6831926357282928
20
(3,[0,1,2],[0.3350362153702097,0.3220376711165534,0.34292611351323676])


'\nGood job! A Gradient-Boosted Tree almost always provides better performance than a plain Decision Tree.\n'

In [None]:
# exercise 10

"""
Delayed flights with a Random Forest

In this exercise you'll bring together cross validation and ensemble methods. You'll be training a Random Forest classifier to predict delayed flights, using cross validation to choose the best values for model parameters.

You'll find good values for the following parameters:

    featureSubsetStrategy — the number of features to consider for splitting at each node and
    maxDepth — the maximum number of splits along any branch.

Unfortunately building this model takes too long, so we won't be running the .fit() method on the pipeline.
"""

# Instructions

"""

    Create a random forest classifier object.
    Create a parameter grid builder object. Add grid points for the featureSubsetStrategy and maxDepth parameters.
    Create binary classification evaluator.
    Create a cross-validator object, specifying the estimator, parameter grid and evaluator. Choose 5-fold cross validation.

"""

# solution

# Create a random forest classifier
forest = RandomForestClassifier()

# Create a parameter grid
params = ParamGridBuilder() \
            .addGrid(forest.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2']) \
            .addGrid(forest.maxDepth, [2, 5, 10]) \
            .build()

# Create a binary classification evaluator
evaluator = BinaryClassificationEvaluator()

# Create a cross-validator
cv = CrossValidator(estimator = forest, estimatorParamMaps = params , evaluator = evaluator, numFolds = 5)

#----------------------------------#

# Conclusion

"""
Excellent! A grid search can be used to optimize all of the parameters in a model pipeline.
"""

'\n\n'

In [None]:
# exercise 11

"""
Evaluating Random Forest

In this final exercise you'll be evaluating the results of cross-validation on a Random Forest model.

The following have already been created:

    cv - a cross-validator which has already been fit to the training data
    evaluator — a BinaryClassificationEvaluator object and
    flights_test — the testing data.

"""

# Instructions

"""

    Print a list of average AUC metrics across all models in the parameter grid.
    Display the average AUC for the best model. This will be the largest AUC in the list.
    Print an explanation of the maxDepth and featureSubsetStrategy parameters for the best model.
    Display the AUC for the best model predictions on the testing data.

"""

# solution

# Average AUC for each parameter combination in grid
print(cv.avgMetrics)

# Average AUC for the best model
print(cv.bestModel.evaluate(flights_test))

# What's the optimal parameter value for maxDepth?
print(cv.bestModel.explainParam('maxDepth'))
# What's the optimal parameter value for featureSubsetStrategy?
print(cv.bestModel.explainParam('featureSubsetStrategy'))

# AUC for best model on testing data
print(evaluator.evaluate(cv.transform(flights_test)))

#----------------------------------#

# Conclusion

"""
Fantastic! Optimized Random Forest > Random Forest > Decision Tree
"""

In [81]:
spark.stop()