# Spark Random Forest Implementation

## Chicago Crime Use Case

You are provided with the dataset that contains the crime records from Chicago. The dataset belongs to Chicago Police Department. This dataset reflects reported incidents of crime that occurred in the City of Chicago from 2012 to 2017. The data is extracted from the Chicago Police Department's CLEAR (Citizen Law Enforcement Analysis and Reporting) system.

## Dataset Understanding

## Objective: 

Our objective is to use the information and try to come up with a system that classifies the **FBI Code** for each crime absed on the given information.

Columns in the Dataset:

**ID** - Unique identifier for the record.

**Case Number** - The Chicago Police Department RD Number (Records Division Number), which is unique to the incident.

**Date** - Date when the incident occurred. this is sometimes a best estimate.

**Block** - The partially redacted address where the incident occurred, placing it on the same block as the actual address.

**IUCR** - The Illinois Unifrom Crime Reporting code. This is directly linked to the Primary Type and Description.

**Primary Type** - The primary description of the IUCR code.

**Description** - The secondary description of the IUCR code, a subcategory of the primary description.

**Location Description** - Description of the location where the incident occurred.

**Arrest** - Indicates whether an arrest was made.

**Domestic** - Indicates whether the incident was domestic-related as defined by the Illinois Domestic Violence Act.

**Beat** - Indicates the beat where the incident occurred. A beat is the smallest police geographic area – each beat has a dedicated police beat car. Three to five beats make up a police sector, and three sectors make up a police district. The Chicago Police Department has 22 police districts.

**District** - Indicates the police district where the incident occurred.

**Ward** - The ward (City Council district) where the incident occurred.

**Community Area** - Indicates the community area where the incident occurred. Chicago has 77 community areas.

**FBI Code** - Indicates the crime classification as outlined in the FBI's National Incident-Based Reporting System (NIBRS).

**X Coordinate** - The x coordinate of the location where the incident occurred in State Plane Illinois East NAD 1983 projection. This location is shifted from the actual location for partial redaction but falls on the same block.

**Y Coordinate** - The y coordinate of the location where the incident occurred in State Plane Illinois East NAD 1983 projection. This location is shifted from the actual location for partial redaction but falls on the same block.

**Year** - Year the incident occurred.

**Updated On** - Date and time the record was last updated.

**Latitude** - The latitude of the location where the incident occurred. This location is shifted from the actual location for partial redaction but falls on the same block.

**Longitude** - The longitude of the location where the incident occurred. This location is shifted from the actual location for partial redaction but falls on the same block.

**Location** - The location where the incident occurred in a format that allows for creation of maps and other geographic operations on this data portal. This location is shifted from the actual location for partial redaction but falls on the same block.

### Initialising the Spark session

In [1]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
         }
}

In [2]:
from pyspark import SparkContext, SparkConf

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6,application_1659025105765_0007,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
sc = SparkContext.getOrCreate();
sc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-6>

Listing all installed packages 

In [4]:
sc.list_packages()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

aws-cfn-bootstrap (2.0)
beautifulsoup4 (4.9.3)
boto (2.49.0)
click (8.1.1)
docutils (0.14)
jmespath (1.0.0)
joblib (1.1.0)
lockfile (0.11.0)
lxml (4.8.0)
mysqlclient (1.4.2)
nltk (3.7)
nose (1.3.4)
numpy (1.20.0)
pip (9.0.1)
py-dateutil (2.2)
pystache (0.5.4)
python-daemon (2.2.3)
python37-sagemaker-pyspark (1.4.1)
pytz (2022.1)
PyYAML (5.4.1)
regex (2021.11.10)
setuptools (28.8.0)
simplejson (3.2.0)
six (1.13.0)
tqdm (4.63.1)
wheel (0.29.0)
windmill (1.6)

You are using pip version 9.0.1, however version 22.2.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

In [5]:
sc.install_pypi_package("pandas==0.25.1")
sc.install_pypi_package("matplotlib==3.1.1", "https://pypi.org/simple")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Using cached https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1 (from pandas==0.25.1)
  Using cached https://files.pythonhosted.org/packages/36/7a/87837f39d0296e723bb9b62bbb257d0355c7f6128853c78955f57342a56d/python_dateutil-2.8.2-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.2

Collecting matplotlib==3.1.1
  Using cached https://files.pythonhosted.org/packages/19/7a/60bd79c5d79559150f8bba866dd7d434f0a170312e4d15e8aefa5faba294/matplotlib-3.1.1-cp37-cp37m-manylinux1_x86_64.whl
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 (from matplotlib==3.1.1)
  Using cached https://files.pythonhosted.org/packages/6c/10/a7d0fa5baea8fe7b50f448ab742f26f52b80bfca85ac2be9d35cdd9a3246/pyparsing-3.0.9-py3-none-any.whl
Collecting cycler>=0.10 (from matplo

In [6]:
sc.list_packages()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

aws-cfn-bootstrap (2.0)
beautifulsoup4 (4.9.3)
boto (2.49.0)
click (8.1.1)
cycler (0.11.0)
docutils (0.14)
jmespath (1.0.0)
joblib (1.1.0)
kiwisolver (1.4.4)
lockfile (0.11.0)
lxml (4.8.0)
matplotlib (3.1.1)
mysqlclient (1.4.2)
nltk (3.7)
nose (1.3.4)
numpy (1.20.0)
pandas (0.25.1)
pip (9.0.1)
py-dateutil (2.2)
pyparsing (3.0.9)
pystache (0.5.4)
python-daemon (2.2.3)
python-dateutil (2.8.2)
python37-sagemaker-pyspark (1.4.1)
pytz (2022.1)
PyYAML (5.4.1)
regex (2021.11.10)
setuptools (28.8.0)
simplejson (3.2.0)
six (1.13.0)
tqdm (4.63.1)
typing-extensions (4.3.0)
wheel (0.29.0)
windmill (1.6)

You are using pip version 9.0.1, however version 22.2.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

### Loading the dataset

In [7]:
df = spark.read.csv("s3a://chicago-crime-mlc/Chicago_Crimes_2012_to_2017.csv", 
                    header = True, 
                    inferSchema = False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Printing the first row
df.head(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(_c0='3', ID='10508693', Case Number='HZ250496', Date='05/03/2016 11:40:00 PM', Block='013XX S SAWYER AVE', IUCR='0486', Primary Type='BATTERY', Description='DOMESTIC BATTERY SIMPLE', Location Description='APARTMENT', Arrest='True', Domestic='True', Beat='1022', District='10.0', Ward='24.0', Community Area='29.0', FBI Code='08B', X Coordinate='1154907.0', Y Coordinate='1893681.0', Year='2016', Updated On='05/10/2016 03:56:50 PM', Latitude='41.864073157', Longitude='-87.706818608', Location='(41.864073157, -87.706818608)')]

In [9]:
# Schema of the dataset
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _c0: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)

In [10]:
# Count total no of rows
df.count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1456714

In [11]:
# print 5 rows
df.head(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(_c0='3', ID='10508693', Case Number='HZ250496', Date='05/03/2016 11:40:00 PM', Block='013XX S SAWYER AVE', IUCR='0486', Primary Type='BATTERY', Description='DOMESTIC BATTERY SIMPLE', Location Description='APARTMENT', Arrest='True', Domestic='True', Beat='1022', District='10.0', Ward='24.0', Community Area='29.0', FBI Code='08B', X Coordinate='1154907.0', Y Coordinate='1893681.0', Year='2016', Updated On='05/10/2016 03:56:50 PM', Latitude='41.864073157', Longitude='-87.706818608', Location='(41.864073157, -87.706818608)'), Row(_c0='89', ID='10508695', Case Number='HZ250409', Date='05/03/2016 09:40:00 PM', Block='061XX S DREXEL AVE', IUCR='0486', Primary Type='BATTERY', Description='DOMESTIC BATTERY SIMPLE', Location Description='RESIDENCE', Arrest='False', Domestic='True', Beat='313', District='3.0', Ward='20.0', Community Area='42.0', FBI Code='08B', X Coordinate='1183066.0', Y Coordinate='1864330.0', Year='2016', Updated On='05/10/2016 03:56:50 PM', Latitude='41.782921527', Longi

In [12]:
# Show 5 rows in tabular format
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------+-----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|_c0|      ID|Case Number|                Date|              Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+---+--------+-----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|  3|10508693|   HZ250496|05/03/2016 11:40:...| 013XX S SAWYER AVE|0486|             BATTERY|DOMESTIC BATTERY ...| 

In [13]:
# get all columns
df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['_c0', 'ID', 'Case Number', 'Date', 'Block', 'IUCR', 'Primary Type', 'Description', 'Location Description', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'Community Area', 'FBI Code', 'X Coordinate', 'Y Coordinate', 'Year', 'Updated On', 'Latitude', 'Longitude', 'Location']

### Data Exploration and Cleaning

In [14]:
df.select("Date").show(10, truncate = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------+
|Date                  |
+----------------------+
|05/03/2016 11:40:00 PM|
|05/03/2016 09:40:00 PM|
|05/03/2016 11:31:00 PM|
|05/03/2016 10:10:00 PM|
|05/03/2016 10:00:00 PM|
|05/03/2016 10:35:00 PM|
|05/03/2016 10:30:00 PM|
|05/03/2016 09:30:00 PM|
|05/03/2016 04:00:00 PM|
|05/03/2016 10:30:00 PM|
+----------------------+
only showing top 10 rows

In [15]:
# Column type
df.select("Date").dtypes

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Date', 'string')]

Date is string data type, convert to timestamp format

In [16]:
# Changing the type of column Date to timestamp
from pyspark.sql.functions import to_timestamp


df = df.withColumn("Date_Time",to_timestamp('Date',"MM/dd/yyyy hh:mm:ss a"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
df.select("Date_Time").show(10, truncate = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+
|Date_Time          |
+-------------------+
|2016-05-03 23:40:00|
|2016-05-03 21:40:00|
|2016-05-03 23:31:00|
|2016-05-03 22:10:00|
|2016-05-03 22:00:00|
|2016-05-03 22:35:00|
|2016-05-03 22:30:00|
|2016-05-03 21:30:00|
|2016-05-03 16:00:00|
|2016-05-03 22:30:00|
+-------------------+
only showing top 10 rows

In [18]:
df.select("Date_Time").dtypes

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Date_Time', 'timestamp')]

### Extracting 'hour' from the dataset

In [19]:
from pyspark.sql.functions import hour

df = df.withColumn('hour', hour(df["Date_Time"]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df.select('hour').show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+
|hour|
+----+
|  23|
|  21|
|  23|
|  22|
|  22|
+----+
only showing top 5 rows

### Extract day of week from date in pyspark

In [21]:
from pyspark.sql.functions import dayofweek

# create a new column for dayofweek from Date_Time

df = df.withColumn('day_of_week', dayofweek(df["Date_Time"]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df.select('day_of_week').show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+
|day_of_week|
+-----------+
|          3|
|          3|
|          3|
|          3|
|          3|
+-----------+
only showing top 5 rows

### Show 'hour' & 'day_of_week'

In [23]:
df.select('hour','day_of_week').show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+
|hour|day_of_week|
+----+-----------+
|  23|          3|
|  21|          3|
|  23|          3|
|  22|          3|
|  22|          3|
+----+-----------+
only showing top 5 rows

In [24]:
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------+-----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+-------------------+----+-----------+
|_c0|      ID|Case Number|                Date|              Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|          Date_Time|hour|day_of_week|
+---+--------+-----------+--------------------+-------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+-------------------+----+-----------+
|  3

In [25]:
# Dropping the columns: Date & Date_Time
df = df.drop('Date', 'Date_Time')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------+-----------+-------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+----+-----------+
|_c0|      ID|Case Number|              Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|hour|day_of_week|
+---+--------+-----------+-------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+----+-----------+
|  3|10508693|   HZ250496| 013XX S SAWYER AVE|0486|             BATTERY|DOMESTIC BATTERY ...|           APARTMENT|  True|    Tr

### Hours- statistical analysis

In [27]:
# In each hour, how many crimes happened

df.groupBy('hour').count().show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+
|hour|count|
+----+-----+
|  12|83930|
|   1|43771|
|   6|24609|
|   3|31048|
|   4|23325|
|   8|50637|
|  11|67005|
|  19|84193|
|  23|61224|
|  21|76543|
+----+-----+
only showing top 10 rows

In [28]:
# Storing in a pandas dataframe for visualisation 
# store in descending order
hour_df = df.groupBy('hour').count().orderBy('count',ascending=False).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# print 10 rows
hour_df.head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   hour  count
0    19  84193
1    12  83930
2    18  82414
3    20  80826
4    15  79930
5    21  76543
6    16  76065
7    22  75824
8    17  75556
9    14  73698

In [30]:
# import matplotlib 
import matplotlib.pyplot as plt

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### What time of the day are criminals the busiest?

In [38]:

# create the plot
plt.figure(figsize=(14,10))

# Plot Crime data for hour
hour_df.plot(x='hour', y='count', kind='bar', color='blue')

plt.title('Amount of Crimes by Hour')
plt.ylabel('Amount of Crimes')
plt.xlabel('Hour')


# display the plot
%matplot plt


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Cannot interpret '<attribute 'dtype' of 'numpy.generic' objects>' as a data type
Traceback (most recent call last):
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_core.py", line 794, in __call__
    return plot_backend.plot(data, kind=kind, **kwargs)
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/__init__.py", line 62, in plot
    plot_obj.generate()
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/core.py", line 279, in generate
    self._compute_plot_data()
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/core.py", line 404, in _compute_plot_data
    include=[np.number, "datetime", "datetimetz", "timedelta"]
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/core/frame.py", line 3427, in select_dtypes
    include_these = Series(not bool(include), index=self.columns)
  File "/tmp/1659035001029-0/lib/python3.7/si

### Day of week statistical analysis

In [33]:
df.groupBy("day_of_week").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------+
|day_of_week| count|
+-----------+------+
|          1|202212|
|          6|218643|
|          3|206129|
|          4|208374|
|          5|205851|
|          2|205762|
|          7|209743|
+-----------+------+

In [34]:
dayofweek_df = df.groupBy("day_of_week").count().orderBy("count", ascending = False).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
dayofweek_df.head(7)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   day_of_week   count
0            6  218643
1            7  209743
2            4  208374
3            3  206129
4            5  205851
5            2  205762
6            1  202212

### which day of the week ciminals are busiest?

(1- Sunday , 2- Monday …… 7- Saturday)

In [36]:
# create the plot
plt.figure(figsize=(14,10))


dayofweek_df.plot(x = 'day_of_week', y = 'count', kind='bar', color = "pink")


plt.title('Amount of Crimes by day_of_week')
plt.ylabel('Amount of Crimes')
plt.xlabel('day_of_week')


# display the plot
%matplot plt


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Cannot interpret '<attribute 'dtype' of 'numpy.generic' objects>' as a data type
Traceback (most recent call last):
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_core.py", line 794, in __call__
    return plot_backend.plot(data, kind=kind, **kwargs)
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/__init__.py", line 62, in plot
    plot_obj.generate()
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/core.py", line 279, in generate
    self._compute_plot_data()
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/core.py", line 404, in _compute_plot_data
    include=[np.number, "datetime", "datetimetz", "timedelta"]
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/core/frame.py", line 3427, in select_dtypes
    include_these = Series(not bool(include), index=self.columns)
  File "/tmp/1659035001029-0/lib/python3.7/si

### year statistical analysis

In [39]:
df.groupBy("Year").count().show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+------+
|Year| count|
+----+------+
|2014|274527|
|2015|262995|
|2016|265462|
|2012|335670|
|2017| 11357|
|2013|306703|
+----+------+

In [40]:
year_df = df.groupBy("year").count().orderBy("count", ascending = False).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
year_df.head(7)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   year   count
0  2012  335670
1  2013  306703
2  2014  274527
3  2016  265462
4  2015  262995
5  2017   11357

### how no of crimes are changing over the years

In [42]:

# create the plot
plt.figure(figsize=(14,10))


year_df.plot(x = 'year', y = 'count', kind='bar', color = "red")


plt.title('Amount of Crimes by year')
plt.ylabel('Amount of Crimes')
plt.xlabel('year')


# display the plot
%matplot plt


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Cannot interpret '<attribute 'dtype' of 'numpy.generic' objects>' as a data type
Traceback (most recent call last):
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_core.py", line 794, in __call__
    return plot_backend.plot(data, kind=kind, **kwargs)
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/__init__.py", line 62, in plot
    plot_obj.generate()
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/core.py", line 279, in generate
    self._compute_plot_data()
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/core.py", line 404, in _compute_plot_data
    include=[np.number, "datetime", "datetimetz", "timedelta"]
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/core/frame.py", line 3427, in select_dtypes
    include_these = Series(not bool(include), index=self.columns)
  File "/tmp/1659035001029-0/lib/python3.7/si

### Primary Type statistical analysis

In [43]:
df.groupBy("Primary Type").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+
|        Primary Type| count|
+--------------------+------+
|           OBSCENITY|   187|
|             BATTERY|263700|
|            HOMICIDE|  2649|
|         SEX OFFENSE|  4885|
|PUBLIC PEACE VIOL...| 13122|
|             ASSAULT| 91289|
| CRIM SEXUAL ASSAULT|  6823|
|  DECEPTIVE PRACTICE| 75495|
|            BURGLARY| 83397|
|INTERFERENCE WITH...|  6195|
|            STALKING|   828|
|NON-CRIMINAL (SUB...|     4|
|            GAMBLING|  2212|
|   CRIMINAL TRESPASS| 36912|
| MOTOR VEHICLE THEFT| 61138|
|CONCEALED CARRY L...|    90|
|        NON-CRIMINAL|    93|
|               THEFT|329460|
|             ROBBERY| 57313|
|     CRIMINAL DAMAGE|155455|
+--------------------+------+
only showing top 20 rows

In [44]:
primarytype_df = df.groupBy("Primary Type").count().orderBy("count", ascending = False).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
primarytype_df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

      Primary Type   count
0            THEFT  329460
1          BATTERY  263700
2  CRIMINAL DAMAGE  155455
3        NARCOTICS  135240
4          ASSAULT   91289

### Primary Types of crime which is mostly reported 

In [46]:
# create the plot


primarytype_df.head(14).plot(x = 'Primary Type', y = 'count', kind='barh',figsize=(20,20), color = "#b35900")



plt.title('Amount of Crimes by Primary Type')
plt.ylabel('Amount of Crimes')
plt.xlabel('Primary Type')

# display the plot
%matplot plt



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Cannot interpret '<attribute 'dtype' of 'numpy.generic' objects>' as a data type
Traceback (most recent call last):
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_core.py", line 794, in __call__
    return plot_backend.plot(data, kind=kind, **kwargs)
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/__init__.py", line 62, in plot
    plot_obj.generate()
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/core.py", line 279, in generate
    self._compute_plot_data()
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/plotting/_matplotlib/core.py", line 404, in _compute_plot_data
    include=[np.number, "datetime", "datetimetz", "timedelta"]
  File "/tmp/1659035001029-0/lib/python3.7/site-packages/pandas/core/frame.py", line 3427, in select_dtypes
    include_these = Series(not bool(include), index=self.columns)
  File "/tmp/1659035001029-0/lib/python3.7/si

### Location Description statistical analysis

In [None]:
df.groupBy("Location Description").count().show()


In [None]:
location_df = df.groupBy("Location Description").count().orderBy("count", ascending = False).toPandas()

In [None]:
location_df.head()

### Top locations for most number of crime

In [None]:
%matplotlib inline

In [None]:
# create the plot


location_df.head(20).plot(x = 'Location Description', y = 'count', kind='barh',figsize=(20,20), color = "green")


plt.title('Amount of Crimes by Location Description')
plt.ylabel('Amount of Crimes')
plt.xlabel('Location Description')


# display the plot
%matplot plt


### How many arrests happened

In [None]:
df.groupBy('Arrest').count().show()

### In what percentage of crime arrests happened?

In [None]:
df.filter(df["Arrest"]==True).count()/df.count() * 100

### How many crimes are domestic

In [None]:
df.groupBy("Domestic").count().show()

### Calculating percentage of domestic crime

In [None]:
df.filter(df["Domestic"]==True).count()/df.count() * 100

### How many narcotics cases are there in the dataset?

In [None]:
df.where(df["Primary Type"]=="NARCOTICS").count()

### Calculating percentage of narcotics cases in the dataset?

In [None]:
df.where(df["Primary Type"] == "NARCOTICS").count()/df.count() * 100

### How many domestic assualts there are?

In [None]:
df.filter((df["Primary Type"] == "ASSAULT") & (df["Domestic"] == "True")).count()


### Calculating percentage of domestic assault cases in the dataset

In [None]:
df.filter((df["Primary Type"] == "ASSAULT") & (df["Domestic"] == "True")).count()/df.count() * 100

## Drop columns which are not required for model building

In [None]:
# show 5 rows
df.show(5)

In [None]:
# get columns
df.columns

**Dropping columns which are ID or numbers which won't help in model learning:**


'_c0', 'ID', 'Case Number': are IDs

'Block', 'Description' : Lots of text like address 

'Updated On' : no need 

'Location': combination of lat, long so no need

In [None]:
df = df.drop("_c0", "ID", "Case Number",'Block', 'Description', "Updated On", 'Location')

In [None]:
df.show(5)

In [None]:
df.columns

**Now we're left with lots of categorical columns, need to see how many distinct labels are there in each column, if the number of distinct labels are huge in a column, so during One-Hot_encoding need to create lots of new column.**

### Unique Values

In [None]:
for c in df.columns:
    print (c)

In [None]:
# Checking the number distinct values in each attribute
from pyspark.sql.functions import col, countDistinct


df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)).show()

In [None]:
# get columns
df.columns

**Based on distinct count analysis, we can clearly decide on dropping few more columns, which is having huge distinct count, that many new columns needs to be cerated if we're considering that.**

***'IUCR', 'Beat','Ward','Community Area' : these columns can be dropped, this info can be inferred from the coordinates & lat, long columns, since they are more granular.***

In [None]:
df = df.drop('IUCR', 'Beat','Ward','Community Area')

In [None]:
df.show(5)

In [None]:
df.columns

#### Handling null values

In [None]:
# Counting the number of null values in each column
from pyspark.sql.functions import when, count, col, isnull


df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()



**As we can see many row is not having the coordinates & lat, long details, without this info, it'll be diffcult to predict the FBI Code. So we'll drop these rows.**

In [None]:
# Dropping the rows with null values
df = df.na.drop()

In [None]:
# Check if the null values are dropped

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

In [None]:

print((df.count(), len(df.columns)))

#### Correction in column type

In [None]:
# Column type
df.dtypes

In [None]:
df.printSchema()

In [None]:
df.show(3)

**Need to change the data type of all lat, long, coordinates, district, year from String to Float/Integer**

In [None]:
# Changing the required columns from string type to numerical 
from pyspark.sql.types import FloatType, IntegerType


df = df.withColumn('District', df['District'].cast(IntegerType()))



df = df.withColumn('X Coordinate', df['X Coordinate'].cast(FloatType()))
df = df.withColumn('Y Coordinate', df['Y Coordinate'].cast(FloatType()))
df = df.withColumn('Longitude', df['Longitude'].cast(FloatType()))
df = df.withColumn('Latitude', df['Latitude'].cast(FloatType()))
df = df.withColumn('Year', df['Year'].cast(IntegerType()))


In [None]:
df.dtypes

In [None]:
df.show(3)

## Exploring the target variable: FBI Code

In [None]:
df.groupBy("FBI Code").count().show()

In [None]:
# Storing in a pandas dataframe for visualisation
fbi_df = df.groupBy("FBI Code").count().orderBy("count", ascending = False).toPandas()

In [None]:
fbi_df.head()

In [None]:
# create the plot
plt.figure(figsize=(14,10))

fbi_df.head(15).plot(x = 'FBI Code', y = 'count', kind='bar', color = "violet")

plt.title('Amount of Crimes by FBI Code')
plt.ylabel('Amount of Crimes')
plt.xlabel('FBI Code')


# display the plot
%matplot plt

## Feature Generation & Vector Creation

In [None]:
# Identifying the catrgorical columns for indexing
df.columns

In [None]:
len(df.columns)

In [None]:
df.show(3)

In [None]:
# Storing the categorical and continuous columns in different lists


categorical_features = ['Primary Type', 'Location Description', 'Arrest', 'Domestic', 'District','Year','hour','day_of_week' ]


continuous_features = ['X Coordinate', 'Y Coordinate', 'Latitude', 'Longitude']



### Spark Pipeline concept will be used here

In [None]:
# Importing the libraries for data transormation
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

In [None]:
# Initialising the variable 'stages' to store every step for building a pipeline
stages = []

### StringIndexer: Features which are in string are converted to numerical values

### OneHotEncoderEstimator: Converts categorical variable into new columns

In [None]:
# Building a function for encoding all the categorical variables


for categoricalCol in categorical_features:
    print(categoricalCol)
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + '_Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "_encoded"])    
    stages += [stringIndexer, encoder]

In [None]:
# Encoding the target variable as label

label_stringIdx = StringIndexer(inputCol = 'FBI Code', outputCol = 'label')

stages += [label_stringIdx]

### VectorAssembler: Generated vectors for all the features

In [None]:
# Building a function for generating a vector of all features

assemblerInputs = [c + "_encoded" for c in categorical_features] + continuous_features


assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")


stages += [assembler]

### Pipeline stages are used to run all Steps/stages

**Stages is a list of functions which is used as an input to the pipeline**

In [None]:
# Loading all the steps in a pipeline
from pyspark.ml import Pipeline


In [None]:
pipeline = Pipeline(stages = stages)

In [None]:
pipeline

### Fit & Transform DF

In [None]:
# Fitting the steps on the dataFrame
pipelineModel = pipeline.fit(df)

In [None]:
# Transforming the dataframe
df = pipelineModel.transform(df)

In [None]:
# show rows
df.show(5)

In [None]:
# Checking the schema of transformed dataFrame
df.printSchema()

In [None]:
df.groupBy("label").count().orderBy("count", ascending = False).show()

### Split data into train & test

In [None]:
# Splitting the dataFrame into training and testing set

train, test = df.randomSplit([0.7, 0.3], seed = 100)

In [None]:
print("Training Dataset Count: " + str(train.count()))

In [None]:
print("Test Dataset Count: " + str(test.count()))

## Spark Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
# Building the RF model

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', \
                            maxDepth=5, impurity='gini', numTrees=25, seed=100)

In [None]:
# Fitting the model over the training set
rfmodel = rf.fit(train)

In [None]:
# Printing the forest obtained from the model
print(rfmodel.toDebugString)

### Model Prediction

In [None]:
# Applying the model on test set
predictions = rfmodel.transform(test)

In [None]:
predictions

In [None]:
predictions.show()

In [None]:
# Printing the required columns
predictions.select('label', 'rawPrediction', 'prediction', 'probability').show(10)

### Model Evaluation

In [None]:
# Model evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")



In [None]:

accuracy = evaluator.evaluate(predictions)

In [None]:
# Model Accuracy
print(accuracy)

In [None]:
# Test Error
print("Test Error = %g" % (1.0 - accuracy))

### Feature Importance

In [None]:
# Feature Importance
rfmodel.featureImportances

In [None]:
# Defining a function to extract features along with the feature importance score
import pandas as pd
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [None]:
# Printing the feature importance scores
ExtractFeatureImp(rfmodel.featureImportances, predictions, "features").head(10)