In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Mount on google drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Importing the pyspark sql functions

In [None]:
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import *
import pyspark

Importing Numpy, pandas, seaborn and matplotlib for exploratory analysis

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

Creating a spark session

In [None]:
spark = SparkSession \
    .builder \
    .appName("project") \
    .getOrCreate()
sc = spark.sparkContext    

In [None]:
from pyspark.sql.functions import col, lower

Read the data file. The dataset has 1.7GB data and the data is from 2001 to present

In [None]:
crime_data = spark.read.csv('/content/drive/MyDrive/Crimes_-_2001_to_Present (1).csv', inferSchema=True, header=True)

In [None]:
crime_data.take(10)

[Row(ID=10224738, Case Number='HY411648', Date='09/05/2015 01:30:00 PM', Block='043XX S WOOD ST', IUCR='0486', Primary Type='BATTERY', Description='DOMESTIC BATTERY SIMPLE', Location Description='RESIDENCE', Arrest=False, Domestic=True, Beat=924, District=9, Ward=12, Community Area=61, FBI Code='08B', X Coordinate=1165074, Y Coordinate=1875917, Year=2015, Updated On='02/10/2018 03:50:01 PM', Latitude=41.815117282, Longitude=-87.669999562, Location='(41.815117282, -87.669999562)'),
 Row(ID=10224739, Case Number='HY411615', Date='09/04/2015 11:30:00 AM', Block='008XX N CENTRAL AVE', IUCR='0870', Primary Type='THEFT', Description='POCKET-PICKING', Location Description='CTA BUS', Arrest=False, Domestic=False, Beat=1511, District=15, Ward=29, Community Area=25, FBI Code='06', X Coordinate=1138875, Y Coordinate=1904869, Year=2015, Updated On='02/10/2018 03:50:01 PM', Latitude=41.895080471, Longitude=-87.765400451, Location='(41.895080471, -87.765400451)'),
 Row(ID=11646166, Case Number='JC21

To print the schema in the tree format use printSchema() on the crime dataset

In [None]:
crime_data.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



Converted the date column to date time stamp in this 'MM/DD/YYYY HH:MM:SS a' notation so that we can extract the month, year, day of the week, month of the year, day of the month for further calculations

In [None]:
crime_data = crime_data.withColumn('DateTimeStamp', to_timestamp('Date', 'MM/dd/yyyy hh:mm:ss a'))\
                      .withColumn('DateFormatted', trunc('DateTimeStamp', 'YYYY')) 

In [None]:
crime_data = crime_data.withColumn('dayofweek', dayofweek(crime_data['DateTimeStamp']))\
                 .withColumn('monthOfYear', month(crime_data['DateTimeStamp']))\
                 .withColumn('dayofmonth', dayofmonth(crime_data['DateTimeStamp']))\
                 .withColumn('datediff', datediff(crime_data['DateTimeStamp'], to_date(lit('2001-01-01'), format='yyyy-MM-dd')))\
                 .cache()

In [None]:
crime_data.select(['Date','DateFormatted', 'dayofweek', 'Year', 'monthOfYear', 'dayofmonth', 'datediff']).show(5)

+--------------------+-------------+---------+----+-----------+----------+--------+
|                Date|DateFormatted|dayofweek|Year|monthOfYear|dayofmonth|datediff|
+--------------------+-------------+---------+----+-----------+----------+--------+
|09/05/2015 01:30:...|   2015-01-01|        7|2015|          9|         5|    5360|
|09/04/2015 11:30:...|   2015-01-01|        6|2015|          9|         4|    5359|
|09/05/2015 12:45:...|   2015-01-01|        7|2015|          9|         5|    5360|
|09/05/2015 01:00:...|   2015-01-01|        7|2015|          9|         5|    5360|
|09/05/2015 10:55:...|   2015-01-01|        7|2015|          9|         5|    5360|
+--------------------+-------------+---------+----+-----------+----------+--------+
only showing top 5 rows



In [None]:
crime_data= crime_data[crime_data['Year'] == 2015]

In [None]:
crime_data.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location',
 'DateTimeStamp',
 'DateFormatted',
 'dayofweek',
 'monthOfYear',
 'dayofmonth',
 'datediff']

In [None]:
crime_data = crime_data.withColumn("Arrest", col("Arrest").cast(StringType()))
crime_data = crime_data.withColumn("Domestic", col("Domestic").cast(StringType()))

In [None]:
crime_data.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- DateTimeStamp: timestamp (nullable = true)
 |-- DateFormatted: date (nullable = true)
 |-- day

In [None]:
crime_data.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location',
 'DateTimeStamp',
 'DateFormatted',
 'dayofweek',
 'monthOfYear',
 'dayofmonth',
 'datediff']

This section covers algorithms for working with features, roughly divided into these groups:

Extraction: Extracting features from “raw” data

Selection: Selecting a subset from a larger set of features

Transformation: Scaling, converting, or modifying features

In [None]:
selected_features=['Block','IUCR','Description','Location Description','Arrest','Domestic','Beat','District','Ward','Community Area','FBI Code',
                   'Year','Latitude','Longitude','Location','dayofweek','monthOfYear','dayofmonth','datediff']

In [None]:
features_df = crime_data.select(selected_features)
features_df.printSchema()

root
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- monthOfYear: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- datediff: integer (nullable = true)



Added a for loop to append all the selected features and their counts to a list

In [None]:
feature_list_count = []
for feature in selected_features:
    feature_name = features_df.select(feature).distinct()
    feature_list_count.append({'feature': feature, 'level_count': feature_name.count()})

Importing String Indexer and Vector Assembler

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

The selected features were selected from the crime data and if any NA values were there in the dataset. All were dropped.

In [None]:
crime_data_features = crime_data.na.drop(subset=selected_features)

In [None]:
crime_data_features.head(5)

[Row(ID=10224738, Case Number='HY411648', Date='09/05/2015 01:30:00 PM', Block='043XX S WOOD ST', IUCR='0486', Primary Type='BATTERY', Description='DOMESTIC BATTERY SIMPLE', Location Description='RESIDENCE', Arrest='false', Domestic='true', Beat=924, District=9, Ward=12, Community Area=61, FBI Code='08B', X Coordinate=1165074, Y Coordinate=1875917, Year=2015, Updated On='02/10/2018 03:50:01 PM', Latitude=41.815117282, Longitude=-87.669999562, Location='(41.815117282, -87.669999562)', DateTimeStamp=datetime.datetime(2015, 9, 5, 13, 30), DateFormatted=datetime.date(2015, 1, 1), dayofweek=7, monthOfYear=9, dayofmonth=5, datediff=5360),
 Row(ID=10224739, Case Number='HY411615', Date='09/04/2015 11:30:00 AM', Block='008XX N CENTRAL AVE', IUCR='0870', Primary Type='THEFT', Description='POCKET-PICKING', Location Description='CTA BUS', Arrest='false', Domestic='false', Beat=1511, District=15, Ward=29, Community Area=25, FBI Code='06', X Coordinate=1138875, Y Coordinate=1904869, Year=2015, Upda

String indexer is used for converting string columns to numeric columns because the machine learning model is a mathematical equation and accepts numbers.


In [None]:
for feature in feature_list_count:
    predictor_variables_indexer = StringIndexer(inputCol=feature['feature'], outputCol='%s_indexed' % feature['feature'])
    model = predictor_variables_indexer.fit(crime_data_features)
    crime_data_features = model.transform(crime_data_features)

The process is followed similarly for the response variable. where we converted all the string columns to numeric columns because the machine learning model is a mathematical equation and accepts numbers.

In [None]:
response_variable_indexer = StringIndexer(inputCol='Primary Type', outputCol='Primary_Type_Of_Crime_indexed')
response_model = response_variable_indexer.fit(crime_data_features)
crime_data_features = response_model.transform(crime_data_features)

In [None]:
crime_data_features.show(5)

+--------+-----------+--------------------+-------------------+----+------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+-------------------+-------------+---------+-----------+----------+--------+-------------+------------+-------------------+----------------------------+--------------+----------------+------------+----------------+------------+----------------------+----------------+------------+----------------+-----------------+----------------+-----------------+-------------------+------------------+----------------+-----------------------------+
|      ID|Case Number|                Date|              Block|IUCR|Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location

Adding all the indexed columns to a list.

In [None]:
crime_data_indexed_features = ['%s_indexed' % fc['feature'] for fc in feature_list_count]
crime_data_indexed_features

['Block_indexed',
 'IUCR_indexed',
 'Description_indexed',
 'Location Description_indexed',
 'Arrest_indexed',
 'Domestic_indexed',
 'Beat_indexed',
 'District_indexed',
 'Ward_indexed',
 'Community Area_indexed',
 'FBI Code_indexed',
 'Year_indexed',
 'Latitude_indexed',
 'Longitude_indexed',
 'Location_indexed',
 'dayofweek_indexed',
 'monthOfYear_indexed',
 'dayofmonth_indexed',
 'datediff_indexed']

Merge multiple columns into a vector column using Vector assembler in order to train a machine learning model..​

In [None]:
assembler = VectorAssembler(inputCols=crime_data_indexed_features, outputCol='features')
vectorized_crime_data = assembler.transform(crime_data_features)

In [None]:
vectorized_crime_data.select('features').take(1)

[Row(features=DenseVector([2420.0, 0.0, 1.0, 1.0, 0.0, 1.0, 113.0, 7.0, 34.0, 19.0, 1.0, 0.0, 16217.0, 16733.0, 16215.0, 1.0, 4.0, 21.0, 108.0]))]

**Developing Logistic Regression Model**

The data was split into training and test in the ratio 80:20 where 80% of the data was used for training the model and 20% was used for testing purposes.

In [None]:
train, test = vectorized_crime_data.randomSplit([0.8, 0.2])

Import Logistic Regression for fitting the Machine Learning model

In [None]:
from pyspark.ml.classification import LogisticRegression

Build the logistic regression model by specifying the below parameters :

response variable= Primary type of crime

predictor variables= All the variables in the crime data

family= multinomial.

In [None]:
logisticRegression_model = LogisticRegression(labelCol='Primary_Type_Of_Crime_indexed', featuresCol='features', maxIter=10, family='multinomial')

Fit the Logistic regression model on a training dataset

In [None]:
fittedModel = logisticRegression_model.fit(train)

Check the accuracy of the model

In [None]:
fittedModel.summary.accuracy

0.6159731999805796

Make the overall predictions on the test dataset.


In [None]:
predictions = fittedModel.transform(test)
predictions.select("prediction","Primary_Type_Of_Crime_indexed","Primary Type").show(10)

+----------+-----------------------------+-------------------+
|prediction|Primary_Type_Of_Crime_indexed|       Primary Type|
+----------+-----------------------------+-------------------+
|      11.0|                         18.0|           HOMICIDE|
|       3.0|                          3.0|          NARCOTICS|
|       3.0|                          3.0|          NARCOTICS|
|      12.0|                         11.0|  WEAPONS VIOLATION|
|       3.0|                          3.0|          NARCOTICS|
|      16.0|                         19.0|              ARSON|
|       9.0|                         16.0|CRIM SEXUAL ASSAULT|
|       5.0|                          5.0|            ASSAULT|
|       1.0|                          1.0|            BATTERY|
|       1.0|                          1.0|            BATTERY|
+----------+-----------------------------+-------------------+
only showing top 10 rows

