In [1]:
from pydataset import data
import pyspark
import pyspark.ml
from pyspark.sql.functions import *

spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame(data('tips'))

train, test = df.randomSplit([0.8, 0.2], seed=123)

In [2]:
def shape(df: pyspark.sql.DataFrame):
    return df.count(), len(df.columns)

In [3]:
shape(train)

(193, 7)

In [4]:
shape(test)

(51, 7)

# Regression

In [5]:
train.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     10.29| 2.6|Female|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [6]:
# nb: spark's rformula does encoding
rf = pyspark.ml.feature.RFormula(formula="tip ~ total_bill + size").fit(train)

rf.transform(train).show(5)

+----------+----+------+------+---+------+----+-----------+-----+
|total_bill| tip|   sex|smoker|day|  time|size|   features|label|
+----------+----+------+------+---+------+----+-----------+-----+
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2| [8.77,2.0]|  2.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|[10.27,2.0]| 1.71|
|     10.29| 2.6|Female|    No|Sun|Dinner|   2|[10.29,2.0]|  2.6|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|[10.33,3.0]| 1.67|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|[10.34,3.0]| 1.66|
+----------+----+------+------+---+------+----+-----------+-----+
only showing top 5 rows



In [7]:
train_input = rf.transform(train).select('features', 'label')
train_input.show(5)

+-----------+-----+
|   features|label|
+-----------+-----+
| [8.77,2.0]|  2.0|
|[10.27,2.0]| 1.71|
|[10.29,2.0]|  2.6|
|[10.33,3.0]| 1.67|
|[10.34,3.0]| 1.66|
+-----------+-----+
only showing top 5 rows



In [8]:
lr = pyspark.ml.regression.LinearRegression()
lr

LinearRegression_94b6a34ed3cc

In [9]:
lr_fit = lr.fit(train_input)
lr_fit.transform(train_input).show(5)

+-----------+-----+------------------+
|   features|label|        prediction|
+-----------+-----+------------------+
| [8.77,2.0]|  2.0|1.8431748565237749|
|[10.27,2.0]| 1.71|1.9846902983830235|
|[10.29,2.0]|  2.6|1.9865771709411468|
|[10.33,3.0]| 1.67|2.1519321354884804|
|[10.34,3.0]| 1.66|2.1528755717675416|
+-----------+-----+------------------+
only showing top 5 rows



In [10]:
lr_fit.summary

<pyspark.ml.regression.LinearRegressionTrainingSummary at 0x118be3150>

In [11]:
lr_fit.summary.r2, lr_fit.summary.rootMeanSquaredError

(0.5083641431094577, 0.9254138907986432)

In [12]:
[x for x in dir(lr_fit.summary) if not x.startswith('_')]

['coefficientStandardErrors',
 'degreesOfFreedom',
 'devianceResiduals',
 'explainedVariance',
 'featuresCol',
 'labelCol',
 'meanAbsoluteError',
 'meanSquaredError',
 'numInstances',
 'objectiveHistory',
 'pValues',
 'predictionCol',
 'predictions',
 'r2',
 'r2adj',
 'residuals',
 'rootMeanSquaredError',
 'tValues',
 'totalIterations']

In [13]:
test_input = rf.transform(test)
lr_fit.transform(test_input).show(4)

+----------+----+----+------+---+------+----+-----------+-----+------------------+
|total_bill| tip| sex|smoker|day|  time|size|   features|label|        prediction|
+----------+----+----+------+---+------+----+-----------+-----+------------------+
|      9.55|1.45|Male|    No|Sat|Dinner|   2| [9.55,2.0]| 1.45|1.9167628862905841|
|      9.68|1.32|Male|    No|Sun|Dinner|   2| [9.68,2.0]| 1.32|1.9290275579183855|
|      9.94|1.56|Male|    No|Sun|Dinner|   2| [9.94,2.0]| 1.56|1.9535569011739888|
|     11.24|1.76|Male|   Yes|Sat|Dinner|   2|[11.24,2.0]| 1.76| 2.076203617452004|
+----------+----+----+------+---+------+----+-----------+-----+------------------+
only showing top 4 rows



In [14]:
evaluator = pyspark.ml.evaluation.RegressionEvaluator()
rmse = evaluator.evaluate(lr_fit.transform(test_input))
rmse

1.2722254530718242

# Classification

In [15]:
rf = pyspark.ml.feature.RFormula(formula='time ~ total_bill + size').fit(train)
train_input = rf.transform(train)
train_input.show(50)

+----------+----+------+------+---+------+----+-----------+-----+
|total_bill| tip|   sex|smoker|day|  time|size|   features|label|
+----------+----+------+------+---+------+----+-----------+-----+
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2| [8.77,2.0]|  0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|[10.27,2.0]|  0.0|
|     10.29| 2.6|Female|    No|Sun|Dinner|   2|[10.29,2.0]|  0.0|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|[10.33,3.0]|  0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|[10.34,3.0]|  0.0|
|     12.54| 2.5|  Male|    No|Sun|Dinner|   2|[12.54,2.0]|  0.0|
|     13.37| 2.0|  Male|    No|Sat|Dinner|   2|[13.37,2.0]|  0.0|
|     13.94|3.06|  Male|    No|Sun|Dinner|   2|[13.94,2.0]|  0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|[14.78,2.0]|  0.0|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|[14.83,2.0]|  0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|[15.04,2.0]|  0.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|[15.42,2.0]|  0.0|
|     15.7

In [16]:
lr = pyspark.ml.classification.LogisticRegression()

In [17]:
lr_fit = lr.fit(train_input)
[x for x in dir(lr_fit.summary) if not x.startswith('_')]

['accuracy',
 'areaUnderROC',
 'fMeasureByLabel',
 'fMeasureByThreshold',
 'falsePositiveRateByLabel',
 'featuresCol',
 'labelCol',
 'labels',
 'objectiveHistory',
 'pr',
 'precisionByLabel',
 'precisionByThreshold',
 'predictionCol',
 'predictions',
 'probabilityCol',
 'recallByLabel',
 'recallByThreshold',
 'roc',
 'totalIterations',
 'truePositiveRateByLabel',
 'weightedFMeasure',
 'weightedFalsePositiveRate',
 'weightedPrecision',
 'weightedRecall',
 'weightedTruePositiveRate']

In [18]:
lr_fit.summary.areaUnderROC

0.6430830039525691

In [19]:
evaluator = pyspark.ml.evaluation.BinaryClassificationEvaluator()
test_auc = evaluator.evaluate(lr_fit.transform(rf.transform(test)))
test_auc

0.5809716599190283

In [20]:
test_input = rf.transform(test)

In [21]:
(lr_fit.transform(test_input)
 .select('time', 'total_bill', 'size', 'label', 'probability', 'prediction')
 .groupby('prediction') # predicted == rows
 .pivot('label') # actual values are columns
 .count()
 .show())

+----------+---+---+
|prediction|0.0|1.0|
+----------+---+---+
|       0.0| 38| 13|
+----------+---+---+



# Exercises

### 1. Use the .randomSplit method to split the 311 data into training and test sets.

In [22]:
import wrangle

spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = wrangle.wrangle_311(spark)

[wrangle.py] reading case.csv
[wrangle.py] handling data types
[wrangle.py] parsing dates
[wrangle.py] adding features
[wrangle.py] joining departments


In [23]:
train, test = df.randomSplit([0.8, 0.2], seed=123)

def shape(df: pyspark.sql.DataFrame):
    return df.count(), len(df.columns)

shape(train)

(672968, 20)

In [24]:
shape(test)

(168736, 20)

### 2. Create a classification model to predict whether a case will be late or not (i.e. predict case_late). Experiment with different combinations of features and different classification algorithms.

### 3. Create a regression model to predict how many days late a case will be (i.e. predict num_days_late). Experiment with different combinations of features and different regression algorithms.