In [1]:
%matplotlib inline
from pydataset import data

import pyspark
import pyspark.ml
from pyspark.sql.functions import *

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
import wrangle

In [4]:
df = wrangle.wrangle_311(spark)

[wrangle.py] reading case.csv
[wrangle.py] handling data types
[wrangle.py] parsing dates
[wrangle.py] adding features
[wrangle.py] joining departments


In [6]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string'),
 ('num_weeks_late', 'double'),
 ('zipcode', 'string'),
 ('case_age', 'int'),
 ('days_to_closed', 'int'),
 ('case_lifetime', 'int'),
 ('department', 'string'),
 ('dept_subject_to_SLA', 'boolean')]

Use the .randomSplit method to split the 311 data into training and test sets.

In [7]:
#Apparently spark objects have a method that does the train/test split. 
train, test = df.randomSplit([0.75, 0.25], seed=420)

Create a classification model to predict whether a case will be late or not (i.e. predict case_late). Experiment with different combinations of features and different classification algorithms.

In [9]:
#RF is a Model that makes RDDs ready for Spark-MachineLearning alogrithms. It is:
#  1 instantiated when it is called from its class: pyspark.ml.feature
#  2 it is fitted to the data. this will generate the Labelpoints that get attached in the transform() step.
#       LabelPoint(features/target)tacked to the end of the RDD. 2 columns. Feature array and target variable
#  3 This Model that has been fitted has a .transform() method. It will be assigned to a new variable.
rf = pyspark.ml.feature.RFormula(formula='case_late ~ department + council_district').fit(train)

In [10]:
#train_input is another spark dataframe
train_input = rf.transform(train)

In [12]:
type(train_input)

pyspark.sql.dataframe.DataFrame

In [14]:
train_input.show(33, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126041669999997  
 case_closed          | true                 
 service_request_type | Removal Of Obstru... 
 SLA_days             | 4.322222222          
 case_status          | Closed               
 source_id            | svcCRMSS             
 request_address      | 2215  GOLIAD RD, ... 
 council_district     | 003                  
 num_weeks_late       | -0.28751488099999994 
 zipcode              | 78223                
 case_age             | 219                  
 days_to_closed       | 2                    
 case_lifetime        | 2                    
 department           | Trans & Cap Impro... 
 dept_subject_to_SLA  | true                 
 features             | (17,[3,10]

In [15]:
#Now that the training data is ready for spark-ML, we will do the 1-2-3 steps using Spark algorithms.
#Step 1
lr = pyspark.ml.classification.LogisticRegression()

In [27]:
type(lr)

pyspark.ml.classification.LogisticRegression

In [16]:
#Step 2. SKlearn lets you keep the model after it has been fit to the training data. 
#        Spark requires this fitted model be assigned to a new variable.
lr_fit = lr.fit(train_input)

In [25]:
#lr_fit- made from lr -now is a LogisticRegressionModel object, rather than just a LogisticRegression object.
type(lr_fit)

pyspark.ml.classification.LogisticRegressionModel

In [17]:
#The fitted model has a method that can evaluate it predicts the data it was fitted from.
lr_fit.summary.areaUnderROC

0.6358776482407441

In [19]:
#Seeing how well the model does on a new dataset, namely Test, we have to pull out a Evaluation object from 
#pyspark.ml
evaluator = pyspark.ml.evaluation.BinaryClassificationEvaluator()

In [26]:
type(evaluator)

pyspark.ml.evaluation.BinaryClassificationEvaluator

In [20]:
#A lot of functions chained together. 1-Evaluate( 2-fitModel.transform( 3-rf.trainsform( 4-new data)))
test_auc = evaluator.evaluate(lr_fit.transform(rf.transform(test)))

In [24]:
type(test_auc)

float

In [22]:
#Test is just a .25 slice of the initial RDD. So it has to go through the 
test_input = rf.transform(test)

In [23]:
type(test_input)

pyspark.sql.dataframe.DataFrame

In [None]:
#I don't quite understand this one. I think the .select is making a new RDD
#.groupby is making a groupby table
#.pivot? I never feel comfortable with that one
#.counting something from this new pivoted table
#.show()!
(lr_fit.transform(test_input)
 .select('time', 'total_bill', 'size', 'label', 'probability', 'prediction')
 .groupby('prediction') # predicted == rows
 .pivot('label') # actual values are columns
 .count()
 .show())

Create a regression model to predict how many days late a case will be (i.e. predict num_days_late). Experiment with different combinations of features and different regression algorithms.