# Script for cleaning Seattle police call data

This version groups by day rather than by month and does the last-mile cleaning

In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import functions as F


In [2]:
from pyspark.sql.functions import col, when
from pyspark.sql.functions import to_timestamp, current_timestamp
from pyspark.sql.functions import year, month, dayofmonth

In [3]:
sc
sqlContext = SQLContext(sc)

In [4]:
dforig = spark.read.csv("gs://seattle_project/Call_Data.csv",header=True)
dforig.printSchema()


root
 |-- CAD Event Number: string (nullable = true)
 |-- Event Clearance Description: string (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- Initial Call Type: string (nullable = true)
 |-- Final Call Type: string (nullable = true)
 |-- Original Time Queued: string (nullable = true)
 |-- Arrived Time: string (nullable = true)
 |-- Precinct: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Beat: string (nullable = true)



In [5]:
#Load police call data
df = spark.read.csv("gs://seattle_project/Call_Data-csv.csv",header=True)

In [6]:
#View columns and types
df.printSchema()

root
 |-- CAD Event Number: string (nullable = true)
 |-- Event Clearance Description: string (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- Initial Call Type: string (nullable = true)
 |-- Final Call Type: string (nullable = true)
 |-- Original Time Queued: string (nullable = true)
 |-- Arrived Time: string (nullable = true)
 |-- Precinct: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Beat: string (nullable = true)



In [7]:
#Cast columns to correct data types
df2 = df.withColumn("CAD Event Number",df["CAD Event Number"].cast("bigint"))
df2 = df2.withColumn("Priority",df["Priority"].cast("int"))
df2 = df2.withColumn('Original Time Queued', to_timestamp('Original Time Queued',"MM/dd/yyyy HH:mm:ss"))
df2 = df2.withColumn('Arrived Time', to_timestamp('Arrived Time',"MMM dd yyyy HH:mm:ss"))

#Add a column that calculates total response time in seconds
df2 = df2.withColumn("Response_time", col('Arrived Time').cast("long") - col("Original Time Queued").cast("long"))

#Add columns for day, month, year
df2 = df2.withColumn('Day',dayofmonth('Arrived Time')).withColumn('Month',month('Arrived Time')).withColumn('Year',year('Arrived Time'))


In [8]:
#Add the Call Type column from the original dataset so that we can capture whether it was an emergency call or not
add911 = dforig.select(['CAD Event Number','Call Type']).withColumnRenamed('Call Type', 'Call911').alias('a').join(df2.alias('b'), on='CAD Event Number')

In [9]:
#Get rid of rows with 1900 as the year
add911 = add911.filter(add911.Year != 1900)

In [10]:
add911.printSchema()

root
 |-- CAD Event Number: string (nullable = true)
 |-- Call911: string (nullable = true)
 |-- Event Clearance Description: string (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Priority: integer (nullable = true)
 |-- Initial Call Type: string (nullable = true)
 |-- Final Call Type: string (nullable = true)
 |-- Original Time Queued: timestamp (nullable = true)
 |-- Arrived Time: timestamp (nullable = true)
 |-- Precinct: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- Response_time: long (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)



In [11]:
#Create new features
grouped = add911.groupby("Year","Month","Day","Precinct","Sector","Beat").agg(F.count("CAD Event Number").alias('Total_calls'),F.avg("Response_time"),F.count(F.when(col("Event Clearance Description") == 'No_action_needed',1).otherwise(None)).alias('No_action'),
                                                                           F.count(F.when(col("Event Clearance Description") == 'Assistance',1).otherwise(None)).alias('Assistance'),
                                                                           F.count(F.when(col("Event Clearance Description") == 'Citation',1).otherwise(None)).alias('Citation'),
                                                                           F.count(F.when(col("Event Clearance Description") == 'Disturbance',1).otherwise(None)).alias('Disturbance'),
                                                                           F.count(F.when(col("Event Clearance Description") == 'Arrest',1).otherwise(None)).alias('Arrest'),
                                                                           F.count(F.when(col("Final Call Type") == 'DOMESTIC VIOLENCE',1).otherwise(None)).alias('Domestic_violence'),
                                                                           F.count(F.when(col("Call911") == '911',1).otherwise(None)).alias('911'),
                                                                           F.count(F.when(col("Call Type") == 'Police_initiated',1).otherwise(None)).alias('Police_initiated'),
                                                                             F.count(F.when(col("Call Type") == 'Public_initiated',1).otherwise(None)).alias('Public_initiated'),
                                                                             F.count(F.when(col("Call Type") == 'Alarm',1).otherwise(None)).alias('Alarm'))
                                                                           


In [12]:
#Continue with creating new features -- these are the ratio features
grouped = grouped.withColumn('ratio_domestic_to_all_calls',col('Domestic_violence')/col("Total_calls"))
grouped = grouped.withColumn('ratio_citation_to_arrest',col('Citation')/col('Arrest'))
grouped = grouped.withColumn('ratio_911_to_all_calls',col('911')/col('Total_calls'))
grouped = grouped.withColumn('ratio_no_action_to_all_calls',col('No_action')/col('Total_calls'))
grouped = grouped.withColumn('ratio_public_to_police_initiated', col('Public_initiated')/col('Police_initiated'))
grouped = grouped.withColumn('ratio_assistance_to_all_calls',col('Assistance')/col('Total_calls'))

In [13]:
#Create a date object
grouped = grouped.withColumn('Date', F.concat(F.col('Year'),F.lit('-'), F.col('Month'), F.lit('-'), F.col('Day')).cast('date'))

In [14]:
#Form 3 different sets: full, medium, and small for testing
groupedLarge = grouped.filter(col('Year').isin(range(2010,2021)))

#Limit a second set to the years 2016-2020 [8 rows with a negative response time]. 1806734 rows
groupedMed = grouped.filter(col('Year').isin(range(2016,2021)))

#Limit third set of data to 2019-2020 May/June -- this is to be a test set of 115262 rows
groupedSmall = grouped.filter(col('Year').isin(range(2019,2021)) & col('Month').isin(5,6))

In [15]:
#Write the 2 datasets to csv
#groupedSmall.coalesce(1).write.csv('gs://seattle_project/TestSet_grouped2',header=True)
#groupedMed.coalesce(1).write.csv('gs://seattle_project/FiveYears_grouped2',header=True)
groupedLarge.coalesce(1).write.csv('gs://seattle_project/FullSet_groupedDay2',header=True)
