In [1]:
import os
import numpy as np

import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import date_format
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
import timeit

## Data Processing

In [2]:
sc = pyspark.SparkContext.getOrCreate()
ss = pyspark.sql.SparkSession.builder.getOrCreate()

In [3]:
rdd_fire = sc.textFile("s3a://msds697jonross.and.friends/sffd.csv")\
             .map(lambda line : line.encode('ascii', 'ignore'))
rdd_fire.count()

4557046

In [4]:
rdd_google = sc.textFile("s3a://msds697jonross.and.friends/google_data.csv")\
               .map(lambda line : line.encode('ascii', 'ignore'))
rdd_google.count()

32074

In [5]:
# number of  columns
cols_fire = rdd_fire.map(lambda x: x.split(',')).take(1)[0]
n_cols = sc.broadcast(len(cols_fire))
n_cols.value

14

In [6]:
print('\n'.join(x for x in cols_fire))

call_type
received_timestamp
entry_timestamp
dispatch_timestamp
response_timestamp
on_scene_timestamp
transport_timestamp
hospital_timestamp
call_final_disposition
available_timestamp
address
zipcode_of_incident
battalion
station_area


In [7]:
station_list = [i for i in range(1,52)]
for num in [27,30,45,46,47,50]:
    station_list.remove(num)
station_list = [str(a) for a in station_list]
station_list = ["0" + a if len(a)==1 else a for a in station_list]
stations = sc.broadcast(station_list)
# stations.value

In [8]:
def filter_fire(x):
    if len(x.split(',')) == n_cols.value:
        return x.split(',')[13] in stations.value

def map_fire(x):
    return ((x.split(',')[13],x.split(',')[10]), x.split(',')[:2]+[x.split(',')[5]]+[x.split(',')[11]])

def map_google(x):
    s = x.split(',')
    try:
        if int(s[0]) < 10:
            s[0] = '0' + s[0]
    except:
        pass
    return (tuple(s[:2]), s[2:])

def map_joined(x):
    try:
        return list(x[0])+x[1][0]+[float(y) for y in x[1][1]]
    except:
        return list(x[0])+x[1][0]+[None,None]

In [9]:
rdd_fire_data = rdd_fire.filter(lambda x: 'call_type' not in x)\
                        .filter(filter_fire)\
                        .map(map_fire)

In [10]:
rdd_fire_data.take(2)

[(('04', '2700 Block of VAN NESS AVE'),
  ['Elevator / Escalator Rescue', '2000-06-03 15:32:02+00:00', '', '94123']),
 (('20', '200 Block of PALO ALTO AVE'),
  ['Alarms', '2009-10-14 16:11:41+00:00', '', '94114'])]

In [11]:
rdd_google = rdd_google.map(map_google)
rdd_google.take(2)

[(('01', '0 Block of 0NB OCTAVIA OF'), ['3724', '12.883333333333333']),
 (('01', '0 Block of 101 NB OCTAVIA OF'), ['4290', '9.416666666666666'])]

In [12]:
joined = rdd_fire_data.leftOuterJoin(rdd_google)

In [13]:
joined.take(2)

[(('11', 'BERNAL HEIGHTS BL/ANDERSON ST'),
  (['Medical Incident',
    '2008-03-27 18:09:32+00:00',
    '2008-03-27 18:20:21+00:00',
    '94110'],
   ['2032', '7.6'])),
 (('11', 'BERNAL HEIGHTS BL/ANDERSON ST'),
  (['Medical Incident',
    '2008-03-27 18:09:32+00:00',
    '2008-03-27 18:11:46+00:00',
    '94110'],
   ['2032', '7.6']))]

In [14]:
rdd = joined.map(map_joined)

In [15]:
rdd.take(2)

[['11',
  'BERNAL HEIGHTS BL/ANDERSON ST',
  'Medical Incident',
  '2008-03-27 18:09:32+00:00',
  '2008-03-27 18:20:21+00:00',
  '94110',
  2032.0,
  7.6],
 ['11',
  'BERNAL HEIGHTS BL/ANDERSON ST',
  'Medical Incident',
  '2008-03-27 18:09:32+00:00',
  '2008-03-27 18:11:46+00:00',
  '94110',
  2032.0,
  7.6]]

In [16]:
schema = StructType([StructField("station_area", StringType(), False),
                     StructField("address", StringType(), False),
                     StructField("call_type", StringType(), False),
                     StructField("received_timestamp", StringType(), False),
                     StructField("on_scene_timestamp", StringType(), False),
                     StructField("zipcode_of_incident", StringType(), False),
                     StructField("distance", DoubleType(), True),
                     StructField("duration", DoubleType(), True)
                     ])

In [17]:
df = ss.createDataFrame(rdd, schema) # .cache()

In [18]:
df.printSchema()

root
 |-- station_area: string (nullable = false)
 |-- address: string (nullable = false)
 |-- call_type: string (nullable = false)
 |-- received_timestamp: string (nullable = false)
 |-- on_scene_timestamp: string (nullable = false)
 |-- zipcode_of_incident: string (nullable = false)
 |-- distance: double (nullable = true)
 |-- duration: double (nullable = true)



In [19]:
# convert to timestamps
my_rows = ['received_timestamp',
          'on_scene_timestamp']

df_w_time = df
for row in my_rows:
    df_w_time = df_w_time.withColumn(row, to_timestamp(df[row], format = 'yyyy-MM-dd HH:mm:ss+00:00'))

In [20]:
df_w_time = df_w_time.withColumn("received_hour",
                                 hour("received_timestamp")) \
            .withColumn("day_of_week",
                        date_format('received_timestamp', 'u').alias('dow_number'))
# df_w_time.show(10)

## Create More Interesting DataFrame

In [21]:
data_full = df_w_time.select('station_area',
                 'address',
                 'call_type',
                 'received_timestamp',
                 'on_scene_timestamp',
                 'zipcode_of_incident',
                 'distance',
                 'duration',
                 'received_hour',
                 'day_of_week')\
    .withColumn("distance", 
                col('distance') / 1609.34 )\
    .withColumn("label", 
                (unix_timestamp('on_scene_timestamp') - unix_timestamp('received_timestamp')) / 60 )\
    .withColumn("speed", 
                col('distance') / col('duration') * 60 )\
    .withColumn("label_ratio", 
                col('label') / col('duration') )\
    .orderBy('received_timestamp', ascending=[0])\
    .na.drop(subset=["label"])\
    .where('label >= 0')\
    .select('station_area',
        'call_type',
        'zipcode_of_incident',
        'distance',
        'duration',
        'speed',
        'label_ratio',
        'received_hour',
        'day_of_week',
        'label')

# data_1 is the largest dataset
# has all columns and ony removes missing google data
data_1 = data_full.na.drop(subset=["distance", "duration"]) # (removes about 50 rows)

In [22]:
data_1.printSchema()

root
 |-- station_area: string (nullable = false)
 |-- call_type: string (nullable = false)
 |-- zipcode_of_incident: string (nullable = false)
 |-- distance: double (nullable = true)
 |-- duration: double (nullable = true)
 |-- speed: double (nullable = true)
 |-- label_ratio: double (nullable = true)
 |-- received_hour: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- label: double (nullable = true)



- `label`: response time
- `duration`: google api estimate of driving time from station to address (in minutes)
- `distance`: google api estimate of driving distance from station to address
- `speed`: distance / duration * 60 (estimated average speed in mph)
- `label_ratio`: label / duration. **Using this variable for eda**. The idea is to try to find the observations where distance and duration would underestimate most severely.

In [23]:
# data_1.approxQuantile("distance", [.0001,.001,.5,.9,.97,.98,.99,.999], 0)

In [24]:
# removing rows where distance (in miles) is too large or too small
data_2 = data_1.where('distance between .01 and 2') 
#print('removed' + (data_1.count() - data_2.count()) + 'rows')

In [25]:
# check to see that speeds look reasonable (mph)
#data_2.approxQuantile("speed", [.001,.01,.02,.03,.1,.5,.9,.96,.97,.98,.99,.999], 0)

In [26]:
#data_2.approxQuantile("label", [.001,.01,.02,.03,.1,.5,.9,.96,.97,.98,.99,.999], 0)

In [27]:
# removing rows where response time (in minutes) is too large or too small
data_3 = data_2.where('label between 1 and 30') 
#print('removed' + (data_2.count() - data_3.count() + 'rows')

In [28]:
#data_3.approxQuantile("label_ratio", [.001,.01,.1,.5,.9,.96,.97,.98,.99,.999], 0)

In [29]:
# removing rows where response time is too much larger than duration
data_4 = data_3.where('label_ratio < 20') 
#print('removed' + (data_3.count() - data_4.count()) + 'rows')

In [30]:
data_cleaned = data_4\
    .select('station_area','call_type','zipcode_of_incident',
            'distance','duration','received_hour','day_of_week','label')
#data_cleaned.select('station_area','call_type','zipcode_of_incident').show()
#data_cleaned.select('distance','duration','received_hour','day_of_week','label').show()

## Machine Learning

In [31]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor

In [32]:
def remove_empty_strings(df):
    return df.replace('','unknown')
# dfnonas = remove_empty_strings(small_df)

In [33]:
#converting strings to numeric values
def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

# dfnumeric = indexStringColumns(dfnonas, ["call_type", "zipcode_of_incident", "station_area"])
# dfnumeric = indexStringColumns(dfnonas, [])

In [34]:
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

# dfhot = oneHotEncodeColumns(dfnumeric, ["call_type", "zipcode_of_incident", "station_area"])
# dfhot = oneHotEncodeColumns(dfnumeric, [])

In [35]:
def process(df, cat_cols, hot_cols):
    dfnonas = remove_empty_strings(df)
    dfnumeric = indexStringColumns(dfnonas, cat_cols)
    dfhot = oneHotEncodeColumns(dfnumeric, hot_cols)
    return dfhot

In [36]:
data_cleaned.select('distance','duration','label')\
    .withColumn('distance_log', log(col('distance')))\
    .withColumn('duration_log', log(col('duration')))\
    .select('distance_log','duration_log','label')\
    .show()

+--------------------+------------------+------------------+
|        distance_log|      duration_log|             label|
+--------------------+------------------+------------------+
| -0.5800741784344655|1.3652409519220583|              8.35|
|  -1.041458017321652| 0.597003320007043|               5.8|
|  -1.041458017321652| 0.597003320007043|             10.25|
| -0.8367940252822796|0.8685000680378065|11.966666666666667|
| -0.3483108367617068|1.3987168811184478|               3.5|
| -0.3483108367617068|1.3987168811184478|               8.0|
| -0.3483108367617068|1.3987168811184478| 6.716666666666667|
| -0.3483108367617068|1.3987168811184478|3.0166666666666666|
| -0.3483108367617068|1.3987168811184478|              4.45|
| -0.3483108367617068|1.3987168811184478| 4.783333333333333|
| -0.3483108367617068|1.3987168811184478|              4.85|
| -0.3483108367617068|1.3987168811184478|3.0166666666666666|
| -0.3509551750147972|1.3217558399823195|             19.95|
| -0.7528960504004318|  

### Choose data

In [37]:
data_small = data_cleaned.select('distance','duration','label')\
    .withColumn('distance_log', log(col('distance')))\
    .withColumn('duration_log', log(col('duration')))\
    .select('distance_log','duration_log','label')

In [40]:
data_big = data_cleaned.withColumn('distance_log', log(col('distance')))\
    .withColumn('duration_log', log(col('duration')))\
    .drop('distance','duration')

In [38]:
data_small.show(5)

+-------------------+------------------+------------------+
|       distance_log|      duration_log|             label|
+-------------------+------------------+------------------+
|-0.5800741784344655|1.3652409519220583|              8.35|
| -1.041458017321652| 0.597003320007043|             10.25|
| -1.041458017321652| 0.597003320007043|               5.8|
|-0.8367940252822796|0.8685000680378065|11.966666666666667|
|-0.3483108367617068|1.3987168811184478|3.0166666666666666|
+-------------------+------------------+------------------+
only showing top 5 rows



In [41]:
data_big.show(5)

+------------+----------------+-------------------+-------------+-----------+------------------+-------------------+------------------+
|station_area|       call_type|zipcode_of_incident|received_hour|day_of_week|             label|       distance_log|      duration_log|
+------------+----------------+-------------------+-------------+-----------+------------------+-------------------+------------------+
|          03|Medical Incident|              94109|           23|          3|              8.35|-0.5800741784344655|1.3652409519220583|
|          37|Medical Incident|              94107|           23|          3|               5.8| -1.041458017321652| 0.597003320007043|
|          37|Medical Incident|              94107|           23|          3|             10.25| -1.041458017321652| 0.597003320007043|
|          41|Medical Incident|              94109|           23|          3|11.966666666666667|-0.8367940252822796|0.8685000680378065|
|          41|  Structure Fire|              941

In [55]:
# one or the other
d = data_big
cat_cols=["call_type", "zipcode_of_incident", "station_area", "received_hour", "day_of_week"]
hot_cols=["call_type", "zipcode_of_incident", "station_area", "received_hour", "day_of_week"]

# d = data_small
# cat_cols=[]
# hot_cols=[]

processed_data = process(d,
                         cat_cols=cat_cols,
                         hot_cols=hot_cols)

In [56]:
# Take approximately top ~10% of rows convert them to dataframe 
# Also provide the schema also to avoid errors
test = ss.createDataFrame(processed_data.head(100000), processed_data.schema)

#Take the rest of the rows
train = processed_data.subtract(test)

In [None]:
print(train.count(), test.count())

In [58]:
# make pipelines
va = VectorAssembler(outputCol="features",
                     inputCols=[x for x in processed_data.columns if x != 'label'])

rf = RandomForestRegressor()
dt = DecisionTreeRegressor()
lr = LinearRegression()
lr_ElasticNet = LinearRegression(maxIter=10, regParam=10, elasticNetParam=0.8)

pipeline_lr = Pipeline(stages=[va,lr])
pipeline_lr_ElasticNet = Pipeline(stages=[va,lr_ElasticNet])
pipeline_dt = Pipeline(stages=[va,dt])
pipeline_rf = Pipeline(stages=[va,rf])

In [59]:
# fit models
start = timeit.default_timer()
lr_fitted = pipeline_lr.fit(train)
end = timeit.default_timer()
end - start

498.0646800994873

In [60]:
start = timeit.default_timer()
lr_e_fitted = pipeline_lr_ElasticNet.fit(train)
end = timeit.default_timer()
end - start

476.43625712394714

In [61]:
start = timeit.default_timer()
dt_fitted = pipeline_dt.fit(train)
end = timeit.default_timer()
end - start

131.63317203521729

In [62]:
start = timeit.default_timer()
rf_fitted = pipeline_rf.fit(train)
end = timeit.default_timer()
end - start

213.0709388256073

## Results - careful about re-running cells
In each section, first cell is results on all columns and second cell uses just `distance` and `duration`. We see that the metrics are consistently better when we have use all of the columns and not just these two.

In [65]:
# Evaluate model results
def evaluate_regression(fitted_model, test_set):
    test_pred = fitted_model.transform(test_set)
    r2_ev = RegressionEvaluator(metricName='r2')
    rmse_ev = RegressionEvaluator(metricName='rmse')
    mae_ev = RegressionEvaluator(metricName='mae')
    r2 = r2_ev.evaluate(test_pred)
    rmse = rmse_ev.evaluate(test_pred)
    mae = mae_ev.evaluate(test_pred)
    return (r2,rmse,mae)

In [66]:
ev_dt = evaluate_regression(fitted_model=dt_fitted, test_set=test)
print("Decision Tree:\nR^2:  " + str(ev_dt[0]) + "\nRMSE: " + str(ev_dt[1]) + "\nMAE: " + str(ev_dt[2]))

Decision Tree:
R^2:  0.0245728649704
RMSE: 4.85811019003
MAE: 3.34637837615


In [67]:
ev_lr = evaluate_regression(fitted_model=lr_fitted, test_set=test)
print("Linear Regression:\nR^2:  " + str(ev_lr[0]) + "\nRMSE: " + str(ev_lr[1]) + "\nMAE: " + str(ev_lr[2]))

Linear Regression:
R^2:  0.0382449456338
RMSE: 4.82394317574
MAE: 3.3039737344


In [68]:
ev_lr_e = evaluate_regression(fitted_model=lr_e_fitted, test_set=test)
print("Linear Regression w/ Elastic Net:\nR^2:  " + str(ev_lr_e[0]) + "\nRMSE: " + str(ev_lr_e[1]) + "\nMAE: " + str(ev_lr_e[2]))

Linear Regression w/ Elastic Net:
R^2:  -0.0368547038269
RMSE: 5.00874469868
MAE: 3.45915225661


In [70]:
ev_rf = evaluate_regression(fitted_model=rf_fitted, test_set=test)
print("Random Forest:\nR^2:  " + str(ev_rf[0]) + "\nRMSE: " + str(ev_rf[1]) + "\nMAE: " + str(ev_rf[2]))

Random Forest:
R^2:  0.0229994180441
RMSE: 4.86202688363
MAE: 3.34406035385
