In [1]:
import os
#spark imports
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import csv
from pyspark.sql.types import *
from pyspark.sql.functions import format_number, when
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
#change RAM allocation, current = 8GB
#change Core allocation, current = 3 Cores

spark = SparkSession.builder.appName("Project - Chicago crime")\
.config("spark.some.config.option", "some-value")\
.config("spark.driver.memory", "8g")\
.config("spark.driver.cores", '3')\
.getOrCreate()


In [3]:
crimes_schema = StructType([StructField("ID", StringType(), True),
                            StructField("Case Number", StringType(), True),
                            StructField("Date", StringType(), True ),
                            StructField("Block", StringType(), True),
                            StructField("IUCR", StringType(), True),
                            StructField("Primary Type", StringType(), True  ),
                            StructField("Description", StringType(), True ),
                            StructField("Location Description", StringType(), True ),
                            StructField("Arrest", BooleanType(), True),
                            StructField("Domestic", BooleanType(), True),
                            StructField("Beat", StringType(), True),
                            StructField("District", StringType(), True),
                            StructField("Ward", StringType(), True),
                            StructField("Community Area", StringType(), True),
                            StructField("FBI Code", StringType(), True ),
                            StructField("X Coordinate", DoubleType(), True),
                            StructField("Y Coordinate", DoubleType(), True ),
                            StructField("Year", IntegerType(), True),
                            StructField("Updated On", DateType(), True ),
                            StructField("Latitude", DoubleType(), True),
                            StructField("Longitude", DoubleType(), True),
                            StructField("Location", StringType(), True )
                            ])

In [12]:
dataset = spark.read.option("header", "True")\
        .option("inferSchema", "true")\
        .csv("/media/hai-feng/Work/Downloads/crimes-in-chicago/Crimes_-_2001_to_present.csv")

### Have a look at the data schema:

In [13]:
dataset.limit(19).toPandas().head()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,11034701,JA366925,01/01/2001 11:00:00 AM,016XX E 86TH PL,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,8,45,11,,,2001,08/05/2017 03:50:08 PM,,,
1,11227287,JB147188,10/08/2017 03:00:00 AM,092XX S RACINE AVE,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,RESIDENCE,False,False,...,21,73,2,,,2017,02/11/2018 03:57:41 PM,,,
2,11227583,JB147595,03/28/2017 02:00:00 PM,026XX W 79TH ST,620,BURGLARY,UNLAWFUL ENTRY,OTHER,False,False,...,18,70,5,,,2017,02/11/2018 03:57:41 PM,,,
3,11227293,JB147230,09/09/2017 08:17:00 PM,060XX S EBERHART AVE,810,THEFT,OVER $500,RESIDENCE,False,False,...,20,42,6,,,2017,02/11/2018 03:57:41 PM,,,
4,11227634,JB147599,08/26/2017 10:00:00 AM,001XX W RANDOLPH ST,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,HOTEL/MOTEL,False,False,...,42,32,2,,,2017,02/11/2018 03:57:41 PM,,,


### Count the total number of instance before preprocessing:

In [16]:
dataset.count()

7092612

### Drop the unused columns:

In [17]:
dataset = dataset.drop('ID')\
            .drop('Case Number')\
            .drop('FBI Code')\
            .drop('Updated On')\
            .drop('IUCR')\
            .drop('X Coordinate')\
            .drop('Y Coordinate')\
            .drop('Location')

### Drop all the instance that have null value, then count:

In [19]:
dataset = dataset.na.drop()
dataset.count()

6413586

### Drop some crime types that are rare and non-criminal:

In [21]:
#Removeing these from primary type

dataset = dataset.filter((dataset['Primary Type'] != 'NON-CRIMINAL (SUBJECT SPECIFIED)') &
                         (dataset['Primary Type'] != 'NON-CRIMINAL') &
                         (dataset['Primary Type'] != 'NON - CRIMINAL') &
                         (dataset['Primary Type'] != 'CONCEALED CARRY LICENSE VIOLATION') &
                         (dataset['Primary Type'] != 'DOMESTIC VIOLENCE') &
                         (dataset['Primary Type'] != 'PUBLIC INDECENCY') &
                         (dataset['Primary Type'] != 'OBSCENITY')&
                         (dataset['Primary Type'] != 'RITUALISM'))#Add this type, it is rare. --Haifeng
dataset.count()

6412032

### Define a function that we are going to use to combine similar type of crimes:

In [22]:
def combinePrimaryTypes(targetDf):
    '''
    This function helps combine PRIMARY CRIME TYPES
    '''
    targetDf = targetDf.withColumn("Primary Type", \
              when(targetDf["Primary Type"] == 'OTHER NARCOTIC VIOLATION','NARCOTICS').otherwise(targetDf['Primary Type']))
    targetDf = targetDf.withColumn("Primary Type", \
              when((targetDf["Primary Type"] == 'PROSTITUTION') | (targetDf["Primary Type"] == 'CRIM SEXUAL ASSAULT'),'SEX OFFENSE').otherwise(targetDf['Primary Type']))    
    targetDf = targetDf.withColumn("Primary Type", \
              when((targetDf["Primary Type"] == 'LIQUOR LAW VIOLATION') | (targetDf["Primary Type"] == 'RITUALISM') | (targetDf["Primary Type"] == 'GAMBLING'),'OTHER OFFENSE').otherwise(targetDf['Primary Type']))
    targetDf = targetDf.withColumn("Primary Type", \
              when((targetDf["Primary Type"] == 'CRIMINAL TRESPASS') | (targetDf["Primary Type"] == 'ROBBERY'),'ROBBARY or TRESPASS').otherwise(targetDf['Primary Type']))
    targetDf = targetDf.withColumn("Primary Type", \
              when((targetDf["Primary Type"] == 'INTERFERENCE WITH PUBLIC OFFICER') ,'PUBLIC PEACE VIOLATION').otherwise(targetDf['Primary Type']))
    targetDf = targetDf.withColumn("Primary Type", \
              when((targetDf["Primary Type"] == 'INTIMIDATION') | (targetDf["Primary Type"] == 'STALKING'),'INTIMIDATION or STALKING').otherwise(targetDf['Primary Type']))
    return targetDf

### Apply above function on dataset:

In [23]:
dataset = combinePrimaryTypes(dataset)

### Add the "Day" column to dataframe:

In [29]:
import pyspark.sql.functions as F
dataset=dataset\
            .withColumn("Day", F.to_date(F.split(dataset.Date, " ")[0], "MM/dd/yyyy"))
dataset.limit(10).toPandas().head()

Unnamed: 0,Date,Block,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,Year,Latitude,Longitude,Day
0,04/10/2019 04:37:00 PM,102XX S VERNON AVE,SEX OFFENSE,AGG CRIMINAL SEXUAL ABUSE,"SCHOOL, PUBLIC, BUILDING",False,False,511,5,9,49,2019,41.708589,-87.612583,2019-04-10
1,04/12/2019 04:08:00 PM,032XX N KEELER AVE,OFFENSE INVOLVING CHILDREN,AGG SEX ASSLT OF CHILD FAM MBR,RESIDENCE,False,True,1731,17,30,16,2019,41.940298,-87.732066,2019-04-12
2,04/19/2019 01:57:00 PM,002XX N LARAMIE AVE,OFFENSE INVOLVING CHILDREN,AGGRAVATED CRIMINAL SEXUAL ABUSE BY FAMILY MEMBER,RESIDENCE,False,True,1532,15,28,25,2019,41.884865,-87.75523,2019-04-19
3,04/25/2019 05:20:00 PM,108XX S DR MARTIN LUTHER KING JR DR,BATTERY,DOMESTIC BATTERY SIMPLE,RESIDENCE,False,True,513,5,9,49,2019,41.697609,-87.613508,2019-04-25
4,05/13/2019 05:26:00 PM,090XX S RACINE AVE,ASSAULT,SIMPLE,STREET,False,False,2222,22,21,73,2019,41.729973,-87.653167,2019-05-13


### Add the "Week" column to dataframe:

In [30]:
dataset=dataset.withColumn("Week", F.dayofweek(dataset.Day))
dataset.limit(10).toPandas().head()

Unnamed: 0,Date,Block,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,Year,Latitude,Longitude,Day,Week
0,04/10/2019 04:37:00 PM,102XX S VERNON AVE,SEX OFFENSE,AGG CRIMINAL SEXUAL ABUSE,"SCHOOL, PUBLIC, BUILDING",False,False,511,5,9,49,2019,41.708589,-87.612583,2019-04-10,4
1,04/12/2019 04:08:00 PM,032XX N KEELER AVE,OFFENSE INVOLVING CHILDREN,AGG SEX ASSLT OF CHILD FAM MBR,RESIDENCE,False,True,1731,17,30,16,2019,41.940298,-87.732066,2019-04-12,6
2,04/19/2019 01:57:00 PM,002XX N LARAMIE AVE,OFFENSE INVOLVING CHILDREN,AGGRAVATED CRIMINAL SEXUAL ABUSE BY FAMILY MEMBER,RESIDENCE,False,True,1532,15,28,25,2019,41.884865,-87.75523,2019-04-19,6
3,04/25/2019 05:20:00 PM,108XX S DR MARTIN LUTHER KING JR DR,BATTERY,DOMESTIC BATTERY SIMPLE,RESIDENCE,False,True,513,5,9,49,2019,41.697609,-87.613508,2019-04-25,5
4,05/13/2019 05:26:00 PM,090XX S RACINE AVE,ASSAULT,SIMPLE,STREET,False,False,2222,22,21,73,2019,41.729973,-87.653167,2019-05-13,2


### Add the "Month" column to dataframe:

In [31]:
dataset=dataset.withColumn("Month", F.month(dataset.Day))
dataset.limit(10).toPandas().head()

Unnamed: 0,Date,Block,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,Year,Latitude,Longitude,Day,Week,Month
0,04/10/2019 04:37:00 PM,102XX S VERNON AVE,SEX OFFENSE,AGG CRIMINAL SEXUAL ABUSE,"SCHOOL, PUBLIC, BUILDING",False,False,511,5,9,49,2019,41.708589,-87.612583,2019-04-10,4,4
1,04/12/2019 04:08:00 PM,032XX N KEELER AVE,OFFENSE INVOLVING CHILDREN,AGG SEX ASSLT OF CHILD FAM MBR,RESIDENCE,False,True,1731,17,30,16,2019,41.940298,-87.732066,2019-04-12,6,4
2,04/19/2019 01:57:00 PM,002XX N LARAMIE AVE,OFFENSE INVOLVING CHILDREN,AGGRAVATED CRIMINAL SEXUAL ABUSE BY FAMILY MEMBER,RESIDENCE,False,True,1532,15,28,25,2019,41.884865,-87.75523,2019-04-19,6,4
3,04/25/2019 05:20:00 PM,108XX S DR MARTIN LUTHER KING JR DR,BATTERY,DOMESTIC BATTERY SIMPLE,RESIDENCE,False,True,513,5,9,49,2019,41.697609,-87.613508,2019-04-25,5,4
4,05/13/2019 05:26:00 PM,090XX S RACINE AVE,ASSAULT,SIMPLE,STREET,False,False,2222,22,21,73,2019,41.729973,-87.653167,2019-05-13,2,5


**Till Here, we alread have the preprocessed dataframe: dataset. We can use it for next analysis.**

### Count the number of types:

In [24]:
typeDF = dataset.groupBy('Primary Type')
count = typeDF.count().count()
print('Count of Primary Type reduced to = ', count)

Count of Primary Type reduced to =  20


### Count the number of every types of crimes:

In [25]:
typeDF.count().orderBy('count', ascending=False).show(count,truncate = False)

+--------------------+-------+
|        Primary Type|  count|
+--------------------+-------+
|               THEFT|1361846|
|             BATTERY|1174203|
|     CRIMINAL DAMAGE| 731523|
|           NARCOTICS| 654276|
| ROBBARY or TRESPASS| 426411|
|       OTHER OFFENSE| 424124|
|             ASSAULT| 404135|
|            BURGLARY| 367032|
| MOTOR VEHICLE THEFT| 290723|
|  DECEPTIVE PRACTICE| 255888|
|         SEX OFFENSE| 109315|
|   WEAPONS VIOLATION|  72995|
|PUBLIC PEACE VIOL...|  62717|
|OFFENSE INVOLVING...|  43664|
|               ARSON|  10271|
|            HOMICIDE|   9750|
|INTIMIDATION or S...|   7115|
|          KIDNAPPING|   5693|
|CRIMINAL SEXUAL A...|    292|
|   HUMAN TRAFFICKING|     59|
+--------------------+-------+

