In [1]:
import os
#spark imports
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import csv

from pyspark.sql.types import *
from pyspark.sql.functions import format_number, when


import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import pyspark.sql.functions as F

In [None]:
#change RAM allocation, current = 8GB
#change Core allocation, current = 3 Cores

spark = SparkSession.builder.appName("Project - Chicago crime")\
.config("spark.some.config.option", "some-value")\
.config("spark.driver.memory", "5g")\
.getOrCreate()

In [22]:
crimes_schema = StructType([StructField("ID", StringType(), True),
                            StructField("Case Number", StringType(), True),
                            StructField("Date", StringType(), True ),
                            StructField("Block", StringType(), True),
                            StructField("IUCR", StringType(), True),
                            StructField("Primary Type", StringType(), True  ),
                            StructField("Description", StringType(), True ),
                            StructField("Location Description", StringType(), True ),
                            StructField("Arrest", BooleanType(), True),
                            StructField("Domestic", BooleanType(), True),
                            StructField("Beat", StringType(), True),
                            StructField("District", StringType(), True),
                            StructField("Ward", StringType(), True),
                            StructField("Community Area", StringType(), True),
                            StructField("FBI Code", StringType(), True ),
                            StructField("X Coordinate", DoubleType(), True),
                            StructField("Y Coordinate", DoubleType(), True ),
                            StructField("Year", IntegerType(), True),
                            StructField("Updated On", DateType(), True ),
                            StructField("Latitude", DoubleType(), True),
                            StructField("Longitude", DoubleType(), True),
                            StructField("Location", StringType(), True )
                            ])


In [23]:
dataset = spark.read.csv('C:/Courses/BigData/Project/Crimes.csv',header = True,schema = crimes_schema)
#dataset.persist()

In [24]:
#dataset.select(["Latitude","Longitude","Year","X Coordinate","Y Coordinate"]).describe().show()

dataset = dataset.drop('ID')
dataset = dataset.drop('Case Number')
dataset = dataset.drop('FBI Code')
dataset = dataset.drop('Updated On')
dataset = dataset.drop('IUCR')# Could be useful - four digit codes that law enforcement agencies use to classify criminal incidents when taking individual reports.
dataset = dataset.drop('X Coordinate')
dataset = dataset.drop('Y Coordinate')
dataset = dataset.drop('Location')
#dataset = dataset.drop('Arrest')
dataset = dataset.drop('Domestic')
dataset = dataset.drop('Beat')
#dataset = dataset.drop('Location Description')
dataset = dataset.drop('Description')
dataset = dataset.drop('District')
#dataset = dataset.drop('Ward')
dataset = dataset.drop('Community Area')
#dataset = dataset.drop('Longitude')
#dataset = dataset.drop('Latitude')
dataset = dataset.drop('Block')

<font size="4">Rows in original dataset(Year 2001-2020)</font>

In [25]:
dataset.count()

7067336

<font size="4">Dataset chosen of year 2010-2019</font>

In [26]:
dataset=dataset.filter((dataset['Year'] >2009) & (dataset['Year'] !=2020))

In [27]:
dataset.count()

2969191

<font size="4">Dropped null values in data</font>

In [28]:
dataset=dataset.na.drop()

In [None]:
<font size="4">Dropped values accounted for <1 % of data

In [29]:
dataset.count()

2943230

In [31]:
d=dataset.groupby('Primary Type').count()
d.show(50)

+--------------------+------+
|        Primary Type| count|
+--------------------+------+
|OFFENSE INVOLVING...| 22415|
|            STALKING|  1794|
|PUBLIC PEACE VIOL...| 24065|
|           OBSCENITY|   474|
|NON-CRIMINAL (SUB...|     9|
|               ARSON|  4412|
|            GAMBLING|  4409|
|   CRIMINAL TRESPASS| 75108|
|             ASSAULT|192574|
|      NON - CRIMINAL|    38|
|LIQUOR LAW VIOLATION|  3992|
| MOTOR VEHICLE THEFT|128894|
|               THEFT|666384|
|             BATTERY|535709|
|             ROBBERY|114459|
|            HOMICIDE|  5312|
|    PUBLIC INDECENCY|   116|
| CRIM SEXUAL ASSAULT| 14137|
|   HUMAN TRAFFICKING|    59|
|        INTIMIDATION|  1504|
|        PROSTITUTION| 14640|
|  DECEPTIVE PRACTICE|148325|
|CONCEALED CARRY L...|   518|
|         SEX OFFENSE| 10162|
|     CRIMINAL DAMAGE|315193|
|           NARCOTICS|253623|
|        NON-CRIMINAL|   162|
|       OTHER OFFENSE|179540|
|          KIDNAPPING|  2201|
|            BURGLARY|169950|
|   WEAPON

In [7]:
#dataset=dataset.filter((dataset['Year'] >2009) & (dataset['Year'] !=2020))

In [8]:
dataset_new=dataset.filter((dataset['Primary Type']!='STALKING')&
                             (dataset['Primary Type']!='OBSCENITY')&
                             (dataset['Primary Type']!='NON-CRIMINAL (SUBJECT SPECIFIED)')&
                             (dataset['Primary Type']!='GAMBLING')&
                             (dataset['Primary Type']!='NON - CRIMINAL')&
                             (dataset['Primary Type']!='LIQUOR LAW VIOLATION')&
                             (dataset['Primary Type']!='PUBLIC INDECENCY')&
                             (dataset['Primary Type']!='HUMAN TRAFFICKING')&
                             (dataset['Primary Type']!='INTIMIDATION')&
                             (dataset['Primary Type']!='CONCEALED CARRY LICENSE VIOLATION')&
                             (dataset['Primary Type']!='NON-CRIMINAL')&
                             (dataset['Primary Type']!='OTHER NARCOTIC VIOLATION'))
dataset_new.show(50)                             

+--------------------+--------------------+--------------------+------+----+----+------------+-------------+
|                Date|        Primary Type|Location Description|Arrest|Ward|Year|    Latitude|    Longitude|
+--------------------+--------------------+--------------------+------+----+----+------------+-------------+
|01/03/2019 07:20:...|PUBLIC PEACE VIOL...|            AIRCRAFT| false|  41|2019|42.002816387| -87.90609433|
|03/16/2019 05:58:...|             BATTERY|              STREET| false|  42|2019| 41.88336939|-87.633860272|
|03/12/2019 10:00:...|               THEFT|RESIDENTIAL YARD ...| false|   4|2019|41.825346902|-87.606780575|
|03/14/2019 06:42:...|             BATTERY|           RESIDENCE| false|  49|2019|42.016541612|-87.672499325|
|03/14/2019 04:03:...|       OTHER OFFENSE|              STREET| false|   4|2019|41.825298645|  -87.6069609|
|03/16/2019 03:16:...|             BATTERY|           RESIDENCE| false|  41|2019|41.976370778|-87.800502515|
|04/07/2019 04:08:.

In [9]:
dataset_new_1 = dataset_new.withColumn("Primary Type", \
                               when((dataset_new["Primary Type"] == 'CRIM SEXUAL ASSAULT') | \
                                    (dataset_new["Primary Type"] == 'PROSTITUTION') ,'SEX OFFENSE').otherwise(dataset_new['Primary Type']))
dataset_new_1=dataset_new_1.withColumn("Primary Type", \
                               when((dataset_new_1["Primary Type"] == 'WEAPONS VIOLATION') | \
                                    (dataset_new_1["Primary Type"] == 'INTERFERENCE WITH PUBLIC OFFICER') ,'PUBLIC PEACE VIOLATION').otherwise(dataset_new_1['Primary Type']))
dataset_new_1=dataset_new_1.withColumn("Primary Type", \
                               when((dataset_new_1["Primary Type"] == 'ARSON') ,'CRIMINAL TRESPASS').otherwise(dataset_new_1['Primary Type']))
dataset_new_1.show()

+--------------------+--------------------+--------------------+------+----+----+------------+-------------+
|                Date|        Primary Type|Location Description|Arrest|Ward|Year|    Latitude|    Longitude|
+--------------------+--------------------+--------------------+------+----+----+------------+-------------+
|01/03/2019 07:20:...|PUBLIC PEACE VIOL...|            AIRCRAFT| false|  41|2019|42.002816387| -87.90609433|
|03/16/2019 05:58:...|             BATTERY|              STREET| false|  42|2019| 41.88336939|-87.633860272|
|03/12/2019 10:00:...|               THEFT|RESIDENTIAL YARD ...| false|   4|2019|41.825346902|-87.606780575|
|03/14/2019 06:42:...|             BATTERY|           RESIDENCE| false|  49|2019|42.016541612|-87.672499325|
|03/14/2019 04:03:...|       OTHER OFFENSE|              STREET| false|   4|2019|41.825298645|  -87.6069609|
|03/16/2019 03:16:...|             BATTERY|           RESIDENCE| false|  41|2019|41.976370778|-87.800502515|
|04/07/2019 04:08:.

In [10]:
d=dataset_new_1.groupby('Location Description').count()
d.show()

+--------------------+------+
|Location Description| count|
+--------------------+------+
|   RAILROAD PROPERTY|     4|
|AIRPORT TERMINAL ...|  1734|
|EXPRESSWAY EMBANK...|     1|
|POLICE FACILITY/V...|  8642|
|               MOTEL|     2|
|            SIDEWALK|302839|
|AIRPORT TERMINAL ...|    82|
|CTA GARAGE / OTHE...|  5539|
|            CAR WASH|  1227|
|    AIRPORT/AIRCRAFT|   759|
|            HOSPITAL|     5|
|MEDICAL/DENTAL OF...|  2942|
|    FEDERAL BUILDING|   357|
|             TRAILER|     1|
|         CTA STATION|  5175|
|SCHOOL, PUBLIC, G...| 12974|
|SPORTS ARENA/STADIUM|  2992|
|                FARM|     5|
|               HOUSE|   279|
|VEHICLE - OTHER R...|   453|
+--------------------+------+
only showing top 20 rows



In [11]:
dataset_new_1.repartition(1).write.csv('my_csv2',mode="append",header=True)