In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
import pyspark.sql.functions as func
from pyspark.sql.types import TimestampType
from datetime import datetime

sc = SparkContext()
sqlcontext = SQLContext(sc)
path = "hdfs://wolf.analytics.private/user/slx4192/data/crime/Crimes_-_2001_to_present.csv"
mydata = sqlcontext.read.csv(path, header=True).sample(.01, False, 42)

### Question 3
Predict the number of crime events in the next week at the beat level. Violent crime events represent a greater threat to the public and thus it is desirable that they are forecasted more accurately (IUCR codes available here: https://data.cityofchicago.org/widgets/c7ck-438e). (45 pts) You are encouraged to bring in additional data sets. (extra 10 pts if you mix the existing data with an exogenous data set) Report the accuracy of your models. You must use Spark dataframes and ML pipelines.

In [2]:
getDateTime = udf(lambda x: datetime.strptime( x, '%m/%d/%Y %I:%M:%S %p'), TimestampType())
mydata_violent = mydata\
                    .withColumn('Date_time', getDateTime(col('Date')))\
                    .withColumn('Week_num', weekofyear('Date_time'))\
                    .withColumn("Violent",func.when(mydata["IUCR"].like("01%") | mydata["IUCR"].like("02%") |\
                                  mydata["IUCR"].like("03%") | mydata["IUCR"].like("04%") |\
                                  mydata["IUCR"].like("05%") | mydata["IUCR"].like("06%") |\
                                  mydata["IUCR"].like("10%") | mydata["IUCR"].like("13%") |\
                                  mydata["IUCR"].like("24%") | mydata["IUCR"].like("39%") |\
                                  mydata["IUCR"].like("42%"),1).otherwise(0))
mydata_violent.persist()        

DataFrame[ID: string, Case Number: string, Date: string, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: string, Domestic: string, Beat: string, District: string, Ward: string, Community Area: string, FBI Code: string, X Coordinate: string, Y Coordinate: string, Year: string, Updated On: string, Latitude: string, Longitude: string, Location: string, Date_time: timestamp, Week_num: int, Violent: int]

In [4]:
agg_crime = mydata_violent.groupBy("Beat", "Year", "Week_num", "Violent").count()

nv_crime = mydata_violent.groupBy("Year","Beat","Week_num").count().orderBy("Beat", "Year", "Week_num")\
                .withColumn("Year_WeekNum", concat(mydata_violent.Year, lpad(mydata_violent.Week_num, 3, "-0")))\
                .drop("Year", "Week_num")
nv_crime.show(20)

+----+-----+------------+
|Beat|count|Year_WeekNum|
+----+-----+------------+
|0111|    1|     2001-01|
|0111|    1|     2001-11|
|0111|    2|     2001-29|
|0111|    1|     2001-37|
|0111|    1|     2001-39|
|0111|    1|     2001-43|
|0111|    1|     2001-48|
|0111|    3|     2001-50|
|0111|    1|     2002-01|
|0111|    1|     2002-23|
|0111|    1|     2002-40|
|0111|    1|     2002-41|
|0111|    1|     2002-42|
|0111|    1|     2002-43|
|0111|    1|     2002-44|
|0111|    1|     2002-47|
|0111|    2|     2002-52|
|0111|    1|     2003-04|
|0111|    1|     2003-08|
|0111|    2|     2003-17|
+----+-----+------------+
only showing top 20 rows



In [5]:
from pyspark.sql.window import Window
nv_crime_w_lag = nv_crime\
        .withColumn('lag1', lag('count').over(Window.partitionBy("Beat").orderBy("Year_WeekNum")))\
        .withColumn('lag2', lag('count',2).over(Window.partitionBy("Beat").orderBy("Year_WeekNum")))\
        .withColumn('lag3', lag('count',3).over(Window.partitionBy("Beat").orderBy("Year_WeekNum")))\
        .withColumn('lag4', lag('count',4).over(Window.partitionBy("Beat").orderBy("Year_WeekNum")))\
        .withColumn('lag5', lag('count',5).over(Window.partitionBy("Beat").orderBy("Year_WeekNum")))\
        .withColumn('lag6', lag('count',6).over(Window.partitionBy("Beat").orderBy("Year_WeekNum")))\
        .withColumn('lag7', lag('count',7).over(Window.partitionBy("Beat").orderBy("Year_WeekNum")))\
        .withColumn('lag8', lag('count',8).over(Window.partitionBy("Beat").orderBy("Year_WeekNum")))\
        .orderBy("Beat","Year_WeekNum")
nv_crime_w_lag.show(10)

+----+-----+------------+----+----+----+----+----+----+----+----+
|Beat|count|Year_WeekNum|lag1|lag2|lag3|lag4|lag5|lag6|lag7|lag8|
+----+-----+------------+----+----+----+----+----+----+----+----+
|0111|    1|     2001-01|null|null|null|null|null|null|null|null|
|0111|    1|     2001-11|   1|null|null|null|null|null|null|null|
|0111|    2|     2001-29|   1|   1|null|null|null|null|null|null|
|0111|    1|     2001-37|   2|   1|   1|null|null|null|null|null|
|0111|    1|     2001-39|   1|   2|   1|   1|null|null|null|null|
|0111|    1|     2001-43|   1|   1|   2|   1|   1|null|null|null|
|0111|    1|     2001-48|   1|   1|   1|   2|   1|   1|null|null|
|0111|    3|     2001-50|   1|   1|   1|   1|   2|   1|   1|null|
|0111|    1|     2002-01|   3|   1|   1|   1|   1|   2|   1|   1|
|0111|    1|     2002-23|   1|   3|   1|   1|   1|   1|   2|   1|
+----+-----+------------+----+----+----+----+----+----+----+----+
only showing top 10 rows



In [6]:
from pyspark.sql.types import IntegerType,DoubleType
nv_crime_final = nv_crime_w_lag\
    .withColumn("Year", nv_crime_w_lag["Year_WeekNum"].substr(0,4).cast(IntegerType()))\
    .withColumn("WeekNum", nv_crime_w_lag["Year_WeekNum"].substr(6,2).cast(IntegerType()))\
    .drop("Year_WeekNum").na.drop()
nv_crime_final.show(10)

+----+-----+----+----+----+----+----+----+----+----+----+-------+
|Beat|count|lag1|lag2|lag3|lag4|lag5|lag6|lag7|lag8|Year|WeekNum|
+----+-----+----+----+----+----+----+----+----+----+----+-------+
|0111|    1|   3|   1|   1|   1|   1|   2|   1|   1|2002|      1|
|0111|    1|   1|   3|   1|   1|   1|   1|   2|   1|2002|     23|
|0111|    1|   1|   1|   3|   1|   1|   1|   1|   2|2002|     40|
|0111|    1|   1|   1|   1|   3|   1|   1|   1|   1|2002|     41|
|0111|    1|   1|   1|   1|   1|   3|   1|   1|   1|2002|     42|
|0111|    1|   1|   1|   1|   1|   1|   3|   1|   1|2002|     43|
|0111|    1|   1|   1|   1|   1|   1|   1|   3|   1|2002|     44|
|0111|    1|   1|   1|   1|   1|   1|   1|   1|   3|2002|     47|
|0111|    2|   1|   1|   1|   1|   1|   1|   1|   1|2002|     52|
|0111|    1|   2|   1|   1|   1|   1|   1|   1|   1|2003|      4|
+----+-----+----+----+----+----+----+----+----+----+----+-------+
only showing top 10 rows



In [7]:
from pyspark.ml.feature import VectorAssembler,StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor

tmp = str(nv_crime_final.select(countDistinct("Beat")).collect()[0])
dis_beats = int(tmp[tmp.find('=')+1:-1])

BeatIdx = StringIndexer(inputCol='Beat',outputCol='BeatIdx')
WeekNumIdx = StringIndexer(inputCol='WeekNum',outputCol='WeekNumIdx')
encoder = OneHotEncoderEstimator(inputCols = ["BeatIdx","WeekNumIdx"], outputCols = ["BeatVec","WeekNumVec"]).setHandleInvalid("keep")

assembler = VectorAssembler(inputCols=["BeatVec","Year","WeekNumVec","lag1", "lag2", "lag3", "lag4", "lag5", "lag6", "lag7", "lag8"], outputCol='features')
gradient_boosted = GBTRegressor(labelCol="count", featuresCol="features",maxBins = dis_beats, maxIter=10)

pipeline = Pipeline(stages = [BeatIdx, WeekNumIdx, encoder, assembler, gradient_boosted])
train, test = nv_crime_final.randomSplit([0.8, 0.2])

model = pipeline.fit(train)
predictions = model.transform(test)

In [8]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions2 = predictions.select(col("count").cast("Float"), col("prediction"))
evaluator_mse = RegressionEvaluator(labelCol="count", predictionCol="prediction", metricName="mse")
mse = evaluator_mse.evaluate(predictions2)

print(mse)

0.1726275643852226
