In [1]:
import os
#spark imports
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import csv
from pyspark.sql.types import *
from pyspark.sql.functions import format_number, when
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
#change RAM allocation, current = 8GB
#change Core allocation, current = 3 Cores

spark = SparkSession.builder.appName("Project - Chicago crime")\
.config("spark.some.config.option", "some-value")\
.config("spark.driver.memory", "8g")\
.config("spark.driver.cores", '3')\
.getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/03 15:05:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/03 15:05:21 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/12/03 15:05:21 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
crimes_schema = StructType([StructField("ID", StringType(), True),
                            StructField("Case Number", StringType(), True),
                            StructField("Date", StringType(), True ),
                            StructField("Block", StringType(), True),
                            StructField("IUCR", StringType(), True),
                            StructField("Primary Type", StringType(), True  ),
                            StructField("Description", StringType(), True ),
                            StructField("Location Description", StringType(), True ),
                            StructField("Arrest", BooleanType(), True),
                            StructField("Domestic", BooleanType(), True),
                            StructField("Beat", StringType(), True),
                            StructField("District", StringType(), True),
                            StructField("Ward", StringType(), True),
                            StructField("Community Area", StringType(), True),
                            StructField("FBI Code", StringType(), True ),
                            StructField("X Coordinate", DoubleType(), True),
                            StructField("Y Coordinate", DoubleType(), True ),
                            StructField("Year", IntegerType(), True),
                            StructField("Updated On", DateType(), True ),
                            StructField("Latitude", DoubleType(), True),
                            StructField("Longitude", DoubleType(), True),
                            StructField("Location", StringType(), True )
                            ])

In [6]:
#set your local dataset path
dataSetPath = '/Users/sudiptabanerjee/Documents/Big Data Project/Chicago-crime-analysis/Data-Preprocessing/Crime-1.csv'

In [7]:
dataset = spark.read.option("header", "True")\
        .option("schema", crimes_schema)\
        .csv(dataSetPath)

### Have a look at the data schema:

In [8]:
dataset.limit(20).toPandas().head()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,5741943,HN549294,08/25/2007 09:22:18 AM,074XX N ROGERS AVE,560,ASSAULT,SIMPLE,OTHER,False,False,...,49,1,08A,,,2007,08/17/2015 03:03:40 PM,,,
1,25953,JE240540,05/24/2021 03:06:00 PM,020XX N LARAMIE AVE,110,HOMICIDE,FIRST DEGREE MURDER,STREET,True,False,...,36,19,01A,1141387.0,1913179.0,2021,11/18/2023 03:39:49 PM,41.91783806,-87.75596897,"(41.917838056, -87.755968972)"
2,26038,JE279849,06/26/2021 09:24:00 AM,062XX N MC CORMICK RD,110,HOMICIDE,FIRST DEGREE MURDER,PARKING LOT,True,False,...,50,13,01A,1152781.0,1941458.0,2021,11/18/2023 03:39:49 PM,41.99521944,-87.71335491,"(41.995219444, -87.713354912)"
3,13279676,JG507211,11/09/23 7:30,019XX W BYRON ST,620,BURGLARY,UNLAWFUL ENTRY,APARTMENT,False,False,...,47,5,05,1162518.0,1925906.0,2023,11/18/2023 03:39:49 PM,41.95234509,-87.67797506,"(41.952345086, -87.677975059)"
4,13274752,JG501049,11/12/23 7:59,086XX S COTTAGE GROVE AVE,454,BATTERY,"AGGRAVATED P.O. - HANDS, FISTS, FEET, NO / MIN...",SMALL RETAIL STORE,True,False,...,6,44,08B,1183071.0,1847869.0,2023,12/09/23 15:41,41.73775077,-87.60485591,"(41.737750767, -87.604855911)"


### Count the total number of instance before preprocessing:

In [9]:
dataset.count()

999

### Drop the unused columns:

In [10]:
dataset = dataset.drop('ID')\
            .drop('Case Number')\
            .drop('FBI Code')\
            .drop('Updated On')\
            .drop('IUCR')\
            .drop('X Coordinate')\
            .drop('Y Coordinate')\
            .drop('Location')

### Drop all the instance that have null value, then count:

In [11]:
# Dropping some NA values for now
dataset = dataset.na.drop()
dataset.count()

980

### Drop columns whos Lat, Long are outside Claifornia:

In [12]:
dataset=dataset\
        .filter((dataset["Latitude"] < 45)
             & (dataset["Latitude"] > 40)
             & (dataset["Longitude"] < -85)
             & (dataset["Longitude"] > -90))

In [13]:
dataset.count()

980