#CH1 - Introduction

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 69kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 19.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=de7895294b269c13ecf289a80390b2eeeccb760d3d86887d12369761ab5a764b
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()
print(spark.version)
spark.stop()

3.1.1


In [3]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [4]:
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x7f77986148d0>


In [5]:
flights = spark.read.csv('/content/flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

In [6]:
print("The data contain %d records." % flights.count())

The data contain 50000 records.


In [7]:
flights.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



In [8]:
flights.dtypes

[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('org', 'string'),
 ('mile', 'int'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int')]

In [9]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])
sms = spark.read.csv("/content/sms.csv", sep=';', header=False, schema=schema)
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



#CH2 - Classification

In [10]:
flights.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



In [11]:
flights = flights.drop('flight')

In [12]:
flights.filter('delay IS NULL').count()

2978

In [13]:
flights = flights.filter('delay IS NOT NULL')

In [14]:
flights = flights.dropna()
print(flights.count())

47022


In [15]:
from pyspark.sql.functions import round
flights_km = flights.withColumn('km', round(flights.mile * 1.60934, 0)) \
                    .drop('mile')

In [16]:
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

In [17]:
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [57]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')
indexer_model = indexer.fit(flights_km)
flights_indexed = indexer_model.transform(flights_km)
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

In [42]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=[
    'mon', 'dom', 'dow', 'depart', 'duration'
], outputCol='features')

In [58]:
assembler = VectorAssembler(inputCols=[
    'mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'
], outputCol='features')
flights_assembled = assembler.transform(flights_indexed)
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[0.0,22.0,2.0,0.0,0.0,509.0,16.33,82.0]  |30   |
|[2.0,20.0,4.0,0.0,1.0,542.0,6.17,82.0]   |-8   |
|[9.0,13.0,1.0,1.0,0.0,1989.0,10.33,195.0]|-5   |
|[5.0,2.0,1.0,0.0,1.0,885.0,7.98,102.0]   |2    |
|[7.0,2.0,6.0,1.0,0.0,1180.0,10.83,135.0] |54   |
+-----------------------------------------+-----+
only showing top 5 rows



In [59]:
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=17)
training_ratio = flights_train.count() / flights.count()
print(training_ratio)

0.796967376972481


In [60]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)

In [61]:
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1    |0.0       |[0.5316561844863732,0.46834381551362686]|
|0    |1.0       |[0.3535976324064369,0.6464023675935631] |
|0    |1.0       |[0.4965986394557823,0.5034013605442177] |
|1    |1.0       |[0.3535976324064369,0.6464023675935631] |
|1    |1.0       |[0.3535976324064369,0.6464023675935631] |
+-----+----------+----------------------------------------+
only showing top 5 rows



In [62]:
prediction.groupBy('label', 'prediction').count().show()
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1688|
|    0|       0.0| 2769|
|    1|       1.0| 3222|
|    0|       1.0| 1868|
+-----+----------+-----+

0.6275269718236095


In [63]:
from pyspark.ml.classification import LogisticRegression

logistic = LogisticRegression().fit(flights_train)
prediction = logistic.transform(flights_test)
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1671|
|    0|       0.0| 2522|
|    1|       1.0| 3239|
|    0|       1.0| 2115|
+-----+----------+-----+



In [64]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})

binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})

precision = 0.63
recall    = 0.66


# CH3 - Regression

In [74]:
flights_assembled.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|        0.0|    0.0|[0.0,22.0,2.0,0.0...|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|        0.0|    1.0|[2.0,20.0,4.0,0.0...|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|        1.0|    0.0|[9.0,13.0,1.0,1.0...|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|        0.0|    1.0|[5.0,2.0,1.0,0.0,...|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|        1.0|    0.0|[7.0,2.0,6.0,1.0,...|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
only showing top 5 rows



In [75]:
from pyspark.ml.feature import OneHotEncoder

onehot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])
onehot = onehot.fit(flights_assembled)
flights_onehot = onehot.transform(flights_assembled)

flights_onehot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SMF|    4.0|(7,[4],[1.0])|
|SJC|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



In [76]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

regression = LinearRegression(labelCol='duration').fit(flights_train)

In [77]:
predictions = regression.transform(flights_test)
predictions.select('duration', 'prediction').show(5, False)
RegressionEvaluator(labelCol='duration').evaluate(predictions)

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|230     |229.99999999999997|
|170     |170.00000000000057|
|120     |119.99999999999984|
|135     |135.00000000000003|
|70      |69.99999999999994 |
+--------+------------------+
only showing top 5 rows



2.0996173558454693e-13

In [79]:
inter = regression.intercept
print(inter)

coefs = regression.coefficients
print(coefs)

minutes_per_km = regression.coefficients[0]
print(minutes_per_km)

avg_speed = 60 / minutes_per_km
print(avg_speed)

-5.058088283278139e-13
[-2.6033168963891943e-15,-4.558427378989009e-16,7.147343244424583e-16,-2.9101128620086723e-14,2.6398678832414195e-14,-9.921685438149023e-16,-2.3119359913401514e-15,1.000000000000013]
-2.6033168963891943e-15
-2.3047520677647856e+16


In [80]:
avg_speed_hour = 60 / regression.coefficients[0]
print(avg_speed_hour)

inter = regression.intercept
print(inter)

avg_ground_jfk = inter + regression.coefficients[3]
print(avg_ground_jfk)

avg_ground_lga = inter + regression.coefficients[4]
print(avg_ground_lga)

-2.3047520677647856e+16
-5.058088283278139e-13
-5.349099569479007e-13
-4.794101494953997e-13


In [82]:
from pyspark.ml.feature import Bucketizer, OneHotEncoder

buckets = Bucketizer(splits=[0, 3, 6, 9, 12, 15, 18, 21, 24], inputCol='depart', outputCol='depart_bucket')
bucketed = buckets.transform(flights)
bucketed.select('depart', 'depart_bucket').show(5)

onehot = OneHotEncoder(inputCols=['depart_bucket'], outputCols=['depart_dummy'])
flights_onehot = onehot.fit(bucketed).transform(bucketed)
flights_onehot.select('depart', 'depart_bucket', 'depart_dummy').show(5)

+------+-------------+
|depart|depart_bucket|
+------+-------------+
| 16.33|          5.0|
|  6.17|          2.0|
| 10.33|          3.0|
|  7.98|          2.0|
| 10.83|          3.0|
+------+-------------+
only showing top 5 rows

+------+-------------+-------------+
|depart|depart_bucket| depart_dummy|
+------+-------------+-------------+
| 16.33|          5.0|(7,[5],[1.0])|
|  6.17|          2.0|(7,[2],[1.0])|
| 10.33|          3.0|(7,[3],[1.0])|
|  7.98|          2.0|(7,[2],[1.0])|
| 10.83|          3.0|(7,[3],[1.0])|
+------+-------------+-------------+
only showing top 5 rows



In [85]:
from pyspark.ml.evaluation import RegressionEvaluator
RegressionEvaluator(labelCol='duration').evaluate(predictions)

avg_eve_ogg = regression.intercept
print(avg_eve_ogg)

avg_night_ogg = regression.intercept + regression.coefficients[7]
print(avg_night_ogg)

avg_night_jfk = regression.intercept + regression.coefficients[7] + regression.coefficients[3]
print(avg_night_jfk)

-5.058088283278139e-13
0.9999999999995073
0.9999999999994782


In [88]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

regression = LinearRegression(labelCol='duration').fit(flights_train)
predictions = regression.transform(flights_test)

rmse = RegressionEvaluator(labelCol='duration').evaluate(predictions)
print("The test RMSE is", rmse)

coeffs = regression.coefficients
print(coeffs)

The test RMSE is 2.0996173558454693e-13
[-2.6033168963891943e-15,-4.558427378989009e-16,7.147343244424583e-16,-2.9101128620086723e-14,2.6398678832414195e-14,-9.921685438149023e-16,-2.3119359913401514e-15,1.000000000000013]


In [90]:
zero_coeff = sum([beta == 0 for beta in regression.coefficients])
print("Number of ceofficients equal to 0:", zero_coeff)

Number of ceofficients equal to 0: 7


# CH4 - Ensembles & Pipelines

In [95]:
indexer = StringIndexer(inputCol='org', outputCol='org_idx')
onehot = OneHotEncoder(
    inputCols=['org_idx', 'dow'],
    outputCols=['org_dummy', 'dow_dummy']
)
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'dow_dummy'], outputCol='features')
regression = LinearRegression(labelCol='duration')

In [96]:
flights_train.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|            features|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
|  0|  1|  2|     AA|JFK|   7.0|     385|  -16|4162.0|    0|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|JFK|  12.0|     370|   11|3983.0|    0|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|JFK|  17.0|     379|  -10|3983.0|    0|        1.0|    2.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|LGA|   6.5|     240|   40|2235.0|    1|        1.0|    3.0|[0.0,1.0,2.0,1.0,...|
|  0|  1|  2|     AA|LGA|  8.25|     250|   27|2235.0|    1|        1.0|    3.0|[0.0,1.0,2.0,1.0,...|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+--------------------+
only showing top 5 rows



In [100]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])
#pipeline = pipeline.fit(flights_train)
#predictions = pipeline.transform(flights_test)

In [98]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
tokenizer = Tokenizer(inputCol='text', outputCol='words')
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')
hasher = HashingTF(inputCol=remover.getOutputCol(), outputCol="hash")
idf = IDF(inputCol=hasher.getOutputCol(), outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

In [102]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

params = ParamGridBuilder().build()

regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')

cv = CrossValidator(estimator=regression, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

cv = cv.fit(flights_train)

In [103]:
indexer = StringIndexer(inputCol='org', outputCol='org_idx')

onehot = OneHotEncoder(inputCols=['org_idx'], outputCols=['org_dummy'])

assembler = VectorAssembler(inputCols=['km', 'org_dummy'], outputCol='features')


pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=params,
                    evaluator=evaluator)

In [104]:
params = ParamGridBuilder()

params = params.addGrid(regression.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(regression.elasticNetParam, [0.0, 0.5, 1.0])

params = params.build()
print('Number of models to be tested: ', len(params))

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

Number of models to be tested:  12


In [106]:
cv

CrossValidator_477f102a6712

In [108]:
#best_model = cv.bestModel

#print(best_model.stages)

#best_model.stages[3].extractParamMap()

#predictions = best_model.transform(flights_test)
#evaluator.evaluate(predictions)

In [109]:
params = ParamGridBuilder()
params = params.addGrid(hasher.numFeatures, [1024, 4096, 16384]) \
               .addGrid(hasher.binary, [True, False])
params = params.addGrid(logistic.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(logistic.elasticNetParam, [0.0, 0.5, 1.0])
params = params.build()

In [110]:
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
tree = DecisionTreeClassifier().fit(flights_train)
gbt = GBTClassifier().fit(flights_train)

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(tree.transform(flights_test))
evaluator.evaluate(gbt.transform(flights_test))

print(gbt.getNumTrees)
print(gbt.featureImportances)

20
(8,[0,1,2,3,4,5,6,7],[0.17699712323223848,0.167701382879042,0.14888364254247366,0.0965055546906009,0.16711741132041333,0.06241292961958451,0.14838056544846173,0.03200139026718548])


In [112]:
from pyspark.ml.classification import RandomForestClassifier
forest = RandomForestClassifier()

params = ParamGridBuilder() \
            .addGrid(forest.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2']) \
            .addGrid(forest.maxDepth, [2, 5, 10]) \
            .build()

evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(estimator=forest, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

In [115]:
#cv.fit(flights_train)

In [116]:
#avg_auc = cv.avgMetrics

#best_model_auc = max(cv.avgMetrics)

#opt_max_depth = cv.bestModel.explainParam('maxDepth')
#opt_feat_substrat = cv.bestModel.explainParam('featureSubsetStrategy')

#best_auc = evaluator.evaluate(cv.transform(flights_test))