In [1]:
%matplotlib inline
from pydataset import data

import pyspark
import pyspark.ml
from pyspark.sql.functions import *

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
df = spark.read.csv('case.csv', header=True, inferSchema=True)

# Rename column
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

# Convert to better data types
df = (
    df.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)
df = df.withColumn('council_district', format_string('%04d', col('council_district')))
df = (
    df.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), 'M/d/yy H:mm'))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), 'M/d/yy H:mm'))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), 'M/d/yy H:mm'))
)

# Cleanup text data
df = df.withColumn('request_address', lower(trim(col('request_address'))))
# Extract zipcode
df = df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

# Create a `case_lifetime` feature
df = (
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

# Join departments and sources
depts = spark.read.csv('dept.csv', header=True, inferSchema=True)
sources = spark.read.csv('source.csv', header=True, inferSchema=True)

df = df.join(depts, 'dept_division', 'left').join(sources, 'source_id', 'left')

# Exercises
Use the .randomSplit method to split the 311 data into training and test sets.

In [3]:
train, test = df.randomSplit([0.8, 0.2], seed=123)

# Exercises
Create a classification model to predict whether a case will be late or not (i.e. predict case_late). Experiment with different combinations of features and different classification algorithms.

In [4]:
train.show(1,vertical = True)

-RECORD 0--------------------------------------
 source_id              | 100137               
 dept_division          | 311 Call Center      
 case_id                | 1014263399           
 case_opened_date       | 2018-02-21 15:36:00  
 case_closed_date       | 2018-02-21 15:38:00  
 case_due_date          | 2018-03-02 15:36:00  
 case_late              | false                
 num_days_late          | -8.998854167000001   
 case_closed            | true                 
 service_request_type   | Compliment           
 SLA_days               | 9.0                  
 case_status            | Closed               
 request_address        | 927  donaldson av... 
 council_district       | 0007                 
 zipcode                | 78228                
 case_lifetime          | 0                    
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
 source_username        | Merlene Blodge

In [6]:
rf = pyspark.ml.feature.RFormula(formula='case_late ~ service_request_type').fit(train)
train_input = rf.transform(train)
train_input.show(1,vertical = True)

-RECORD 0--------------------------------------
 source_id              | 100137               
 dept_division          | 311 Call Center      
 case_id                | 1014263399           
 case_opened_date       | 2018-02-21 15:36:00  
 case_closed_date       | 2018-02-21 15:38:00  
 case_due_date          | 2018-03-02 15:36:00  
 case_late              | false                
 num_days_late          | -8.998854167000001   
 case_closed            | true                 
 service_request_type   | Compliment           
 SLA_days               | 9.0                  
 case_status            | Closed               
 request_address        | 927  donaldson av... 
 council_district       | 0007                 
 zipcode                | 78228                
 case_lifetime          | 0                    
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
 source_username        | Merlene Blodge

In [7]:
lr = pyspark.ml.classification.LogisticRegression()
lr_fit = lr.fit(train_input)
lr_fit.summary.areaUnderROC

0.8162039773262637

# Exercises
Create a regression model to predict how many days late a case will be (i.e. predict num_days_late). Experiment with different combinations of features and different regression algorithms.

In [50]:
rf = pyspark.ml.feature.RFormula(formula="num_days_late ~ case_lifetime + SLA_days").fit(train)

In [51]:
train_input = rf.transform(train).select('features', 'label')
train_input.show(3,vertical = True)

-RECORD 0----------------------
 features | [0.0,9.0]          
 label    | -8.998854167000001 
-RECORD 1----------------------
 features | [1.0,14.0053125]   
 label    | -12.64164352       
-RECORD 2----------------------
 features | [2.0,14.0]         
 label    | -12.32291667       
only showing top 3 rows



In [56]:
lr = pyspark.ml.regression.LinearRegression()
lr

LinearRegression_22ca386e6e2a

In [None]:
from pydataset import data