In [1]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, DoubleType
import pyspark.sql.functions as func
from pyspark.sql.functions import col, split
from pyspark.sql.functions import year, month, dayofweek, hour



ssql = SparkSession.builder.master("local").appName("chicago_crime_p1") .config("spark.some.config.option", "some-value").getOrCreate()


data_schema = StructType([StructField("n/a", StringType()), StructField("ID", StringType()),StructField("Case Number", StringType()), StructField("Date", StringType()),
StructField("Block", StringType()), StructField("IUCR", DoubleType()),StructField("Primary Type", StringType()), StructField("Description", StringType()),
StructField("Location Description", StringType()), StructField("Arrest", StringType()),StructField("Domestic", StringType()), StructField("Beat", DoubleType()),
StructField("District", DoubleType()), StructField("Ward", DoubleType()),StructField("Community Area", DoubleType()), StructField("FBI Code", StringType()),
StructField("X Coordinate", DoubleType()), StructField("Y Coordinate", DoubleType()),StructField("Year", StringType()), StructField("Updated On", StringType()),
StructField("Latitude", DoubleType()), StructField("Longitude", DoubleType()),StructField("Location", StringType())])
timestamp = func.to_timestamp(col("Date"), "M/d/yyyy H:mm:ss a")
# method 1
# dataset =ssql.read.format("csv").option("header","true").schema(data_schema).load(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2001_to_2004.csv") #withColumn("new_date", to_date(col("date"),"yyyy.MM.dd."))
# method 2
#read and combine all datasets
importedDataset1 = ssql.read.csv(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2001_to_2004.csv", header="true", 
schema = data_schema).drop("n/a").drop("ID").drop("Case Number").drop("Location").drop("FBI Code").drop("Updated On").drop('Block').drop('Description').drop("X Coordinate").drop("Y Coordinate")
importedDataset2 = ssql.read.csv(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2005_to_2007.csv", header="true",
schema = data_schema).drop("n/a").drop("ID").drop("Case Number").drop("Location").drop("FBI Code").drop("Updated On").drop('Block').drop('Description').drop("X Coordinate").drop("Y Coordinate")
importedDataset3 = ssql.read.csv(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2008_to_2011.csv", header="true",
schema = data_schema).drop("n/a").drop("ID").drop("Case Number").drop("Location").drop("FBI Code").drop("Updated On").drop('Block').drop('Description').drop("X Coordinate").drop("Y Coordinate")
importedDataset4 = ssql.read.csv(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2012_to_2017.csv", header="true",
schema = data_schema).drop("n/a").drop("ID").drop("Case Number").drop("Location").drop("FBI Code").drop("Updated On").drop('Block').drop('Description').drop("X Coordinate").drop("Y Coordinate")
importedDataset = importedDataset1.union(importedDataset2).union(importedDataset3).union(importedDataset4)
#format time columns
dataset = importedDataset.withColumn("timestamp", timestamp).withColumn("Month", month(col("timestamp"))).withColumn("Day of Week", dayofweek(col("timestamp"))).withColumn("Hour", hour(col("timestamp"))).drop("Date").drop("timestamp").cache()
##################################################################
#Total crime count in each district, ward, and community area
crimeByDistrict = dataset.groupBy("District").agg(func.count(func.lit(1)).alias("Total Crime Count")).filter("District is NOT NULL").sort(func.desc("Total Crime Count"))
# crimeByWard = dataset.groupBy("Ward").agg(func.count(func.lit(1)).alias("Total Crime Count")).filter(col("Ward").isNotNull()).sort(func.desc("Total Crime Count")).show()
# crimeByCommunity = dataset.groupBy("Community Area").agg(func.count(func.lit(1)).alias("Total Crime Count")).filter(col("Community Area").isNotNull()).sort(func.desc("Total Crime Count")).show()

dataset.describe()

#Crime count by type in each district
crimeByDistrictType = dataset.groupBy("District", "Primary Type").agg(func.count("Primary Type").alias("Total Crime Count")).filter((col("District").isNotNull()) & (col("Primary Type")!="OTHER OFFENSE")).orderBy(col("District").asc(), col("Total Crime Count").desc())

#Homicide count in each district
homicideByDistrict = crimeByDistrictType.filter(col("Primary Type")=="HOMICIDE")

#Homicide count by month, day of week and hour
crimeByYear = dataset.filter(col("Primary Type")=="HOMICIDE").groupBy("Year").count().sort(func.desc("count"))
crimeByMonth = dataset.filter(col("Primary Type")=="HOMICIDE").groupBy("Month").agg(func.count("Month")).orderBy("Month", col("count(Month)").desc())
crimeByDay = dataset.filter(col("Primary Type")=="HOMICIDE").groupBy("Day of Week").agg(func.count("Day of Week")).orderBy("Day of Week", col("count(Day of Week)").desc())
crimeByHour = dataset.filter(col("Primary Type")=="HOMICIDE").groupBy("Hour").agg(func.count("Hour")).orderBy(col("count(Hour)").desc())


#Crime by location description
crimeByLocation = dataset.groupBy("Primary Type").agg(func.collect_set("Location Description"))

#Homicide by location description
homicideByLocation = dataset.filter(col("Primary Type")=="HOMICIDE").select("Primary Type", "Location Description").withColumn("location", split(col("Location Description"), " ")).drop("Location Description")


In [2]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, DoubleType
import pyspark.sql.functions as func
from pyspark.sql.functions import col, split
from pyspark.sql.functions import year, month, dayofweek, hour
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator


ssql = SparkSession.builder.master("local").appName("chicago_crime_p1") .config("spark.some.config.option", "some-value").getOrCreate()


# import data from csv into dataframes
    # data_schema = """
    # 'n/a' String,
    # 'ID' String,
    # 'Case Number' String,
    # 'Date' String,
    # 'Block' String,
    # 'IUCR' String,
    # 'Primary Type' String,
    # 'Description' String,
    # 'Location Description' String,
    # 'Arrest' String,
    # 'Domestic' String,
    # 'Beat' String,
    # 'District' String,
    # 'Ward' String,
    # 'Community Area' String,
    # 'FBI Code' String,
    # 'X Coordinate' String,
    # 'Y Coordinate' String,
    # 'Year' String,
    # 'Updated On' String,
    # 'Latitude' String,
    # 'Longitude' String,
    # "Location' String
    # """

# Label: Arrest (True/False)
# Feature DoubleTypes: District, Ward, Community Area, Beat, IUCR, Year, Y Coordinate, X Coordinate, Latitude, Longitude


data_schema = StructType([StructField("n/a", StringType()), StructField("ID", StringType()),StructField("Case Number", StringType()), StructField("Date", StringType()),
StructField("Block", StringType()), StructField("IUCR", DoubleType()),StructField("Primary Type", StringType()), StructField("Description", StringType()),
StructField("Location Description", StringType()), StructField("Arrest", StringType()),StructField("Domestic", StringType()), StructField("Beat", DoubleType()),
StructField("District", DoubleType()), StructField("Ward", DoubleType()),StructField("Community Area", DoubleType()), StructField("FBI Code", StringType()),
StructField("X Coordinate", DoubleType()), StructField("Y Coordinate", DoubleType()),StructField("Year", DoubleType()), StructField("Updated On", StringType()),
StructField("Latitude", DoubleType()), StructField("Longitude", DoubleType()),StructField("Location", StringType())])
timestamp = func.to_timestamp(col("Date"), "M/d/yyyy H:mm:ss a")
# method 1
# dataset =ssql.read.format("csv").option("header","true").schema(data_schema).load(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2001_to_2004.csv") #withColumn("new_date", to_date(col("date"),"yyyy.MM.dd."))
# method 2
importedDataset1 = ssql.read.csv(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2001_to_2004.csv", header="true", 
schema = data_schema).drop("n/a").drop("ID").drop("Case Number").drop("Location").drop("FBI Code").drop("Updated On").drop('Block').drop('Description').drop("X Coordinate").drop("Y Coordinate")
importedDataset2 = ssql.read.csv(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2005_to_2007.csv", header="true",
schema = data_schema).drop("n/a").drop("ID").drop("Case Number").drop("Location").drop("FBI Code").drop("Updated On").drop('Block').drop('Description').drop("X Coordinate").drop("Y Coordinate")
importedDataset3 = ssql.read.csv(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2008_to_2011.csv", header="true",
schema = data_schema).drop("n/a").drop("ID").drop("Case Number").drop("Location").drop("FBI Code").drop("Updated On").drop('Block').drop('Description').drop("X Coordinate").drop("Y Coordinate")
importedDataset4 = ssql.read.csv(r"C:\Users\joyce\Jupyter_Notebooks\files\Chicago_Crimes_2012_to_2017.csv", header="true",
schema = data_schema).drop("n/a").drop("ID").drop("Case Number").drop("Location").drop("FBI Code").drop("Updated On").drop('Block').drop('Description').drop("X Coordinate").drop("Y Coordinate")
importedDataset = importedDataset1.union(importedDataset2).union(importedDataset3).union(importedDataset4)
#format time columns
dataset = importedDataset.withColumn("timestamp", timestamp).withColumn("Month", month(col("timestamp"))).withColumn("Day of Week", dayofweek(col("timestamp"))).withColumn("Hour", hour(col("timestamp"))).drop("Date").drop("timestamp").cache()
##################################################################

# Split dataset into train and test datasets
split = dataset.randomSplit([0.8, 0.2], seed=42)
trainDF = split[0]
testDF = split[1]
# print(trainDF.cache().count()) # Cache because accessing training data multiple times
# print(testDF.count())


categoricalCols = ['Arrest','Location Description','Domestic']

# Initialize the pipeline
# stages = [] 
 
# uses StringIndexer to convert each categorical variable into a series of levels, and then uses OneHotEncoder to convert each categorical column into a set of columns, one for each level.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[categoricalCol + "Index" for categoricalCol in categoricalCols])
stringIndexer.setHandleInvalid("skip")
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[categoricalCol + "OHE" for categoricalCol in categoricalCols])
 
    
    
labelToIndex = StringIndexer(inputCol="Primary Type", outputCol="label")
labelToIndex.setHandleInvalid("skip")


 
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ['IUCR',
'Beat',
'District',
'Ward',
'Community Area',
'Latitude',
'Longitude', "Year", "Month", "Day of Week", "Hour"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
vecAssembler.setHandleInvalid("skip")


lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])
 
# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Examine the coefficient for each variable
for col, coef in zip(assemblerInputs, pipelineModel.coefficients):
    print(col, coef)
print(f"intercept: {pipelineModel.intercept}")

# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF)

predDF.select("features", "label", "prediction", "probability").show()


bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")
 
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())
             
# Create a 3-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)
 
# Run cross validations. This step takes a few minutes and returns the best model found from the cross validation.
cvModel = cv.fit(trainDF)

# Use the model identified by the cross-validation to make predictions on the test dataset
cvPredDF = cvModel.transform(testDF)
 
# Evaluate the model's performance based on area under the ROC curve and accuracy 
print(f"Area under ROC curve: {bcEvaluator.evaluate(cvPredDF)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}")