
# Data Preprocessing


In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import isnan, when, count, col, sequence, date_format, UserDefinedFunction
from datetime import datetime
import pandas as pd
from pyspark.sql.types import  (StructType, 
                                StructField, 
                                DateType, 
                                BooleanType,
                                DoubleType,
                                IntegerType,
                                StringType,
                               TimestampType)
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.types import IntegerType
import numpy as np
import seaborn as sns
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pymongo import MongoClient
from fbprophet import Prophet
from pyspark.sql.functions import current_date
import re

Importing plotly failed. Interactive plots will not work.


In [2]:
conn = MongoClient("mongo", 27017)
db = conn.mydb
collection_crime = db.chicago_crime
collection_crime_clean = db.chicago_crime_cleaned
collection_primary_type = db.primary_type
collection_location_description = db.location_description
collection_description = db.description

### Initialize Spark Session

In [3]:
spark = SparkSession.\
        builder.\
        appName("pysparkapp").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "2g").\
        config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
        getOrCreate()

### Read data from Mongo DB

In [4]:
df = spark.read.format("mongo").option("uri","mongodb://mongo:27017/mydb.chicago_crime").load()

In [5]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- arrest: boolean (nullable = true)
 |-- beat: string (nullable = true)
 |-- block: string (nullable = true)
 |-- case_number: string (nullable = true)
 |-- community_area: string (nullable = true)
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- district: string (nullable = true)
 |-- domestic: boolean (nullable = true)
 |-- fbi_code: string (nullable = true)
 |-- id: string (nullable = true)
 |-- iucr: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- human_address: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- updated_on: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- x_coordi

In [6]:
# count the entrys in the dataframe
df.cache()
df.count()

7273203

In [7]:
df.show(10)

+--------------------+------+----+--------------------+-----------+--------------+--------------------+--------------------+--------+--------+--------+--------+----+------------+--------------------+--------------------+-------------+--------------------+--------------------+----+------------+------------+----+
|                 _id|arrest|beat|               block|case_number|community_area|                date|         description|district|domestic|fbi_code|      id|iucr|    latitude|            location|location_description|    longitude|        primary_type|          updated_on|ward|x_coordinate|y_coordinate|year|
+--------------------+------+----+--------------------+-----------+--------------+--------------------+--------------------+--------+--------+--------+--------+----+------------+--------------------+--------------------+-------------+--------------------+--------------------+----+------------+------------+----+
|[5ff9814953230ace...| false|2032| 050XX N ASHLAND AVE|   JB1

### Drop columns which are not needed

In [8]:
df = df.drop('_id')
df = df.drop('x_coordinate')
df = df.drop('y_coordinate')
df = df.drop('beat')
df = df.drop('fbi_code')
df = df.drop('location')
df = df.drop('id')
df = df.drop('iucr')

In [9]:
df.printSchema()

root
 |-- arrest: boolean (nullable = true)
 |-- block: string (nullable = true)
 |-- case_number: string (nullable = true)
 |-- community_area: string (nullable = true)
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- district: string (nullable = true)
 |-- domestic: boolean (nullable = true)
 |-- latitude: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- updated_on: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- year: string (nullable = true)



## Convert the strings Types into numeric values

Convert the columne description into int

In [10]:
#select unique descriptions
unique_descriptions = df.select("description").distinct().collect()

In [11]:
unique_descriptions

[Row(description='UNLAWFUL USE OF A COMPUTER'),
 Row(description='AGGRAVATED PO:KNIFE/CUT INSTR'),
 Row(description='AGGRAVATED: OTHER DANG WEAPON'),
 Row(description='POSS: COCAINE'),
 Row(description='AGGRAVATED DOMESTIC BATTERY - KNIFE / CUTTING INSTRUMENT'),
 Row(description='ATTEMPT POSSESSION CANNABIS'),
 Row(description='POSSESSION: SYNTHETIC MARIJUANA'),
 Row(description='POSSESS - SYNTHETIC DRUGS'),
 Row(description='OVER $500'),
 Row(description='DEFACE IDENT MARKS OF FIREARM'),
 Row(description='SEX OFFENDER - FAIL TO REGISTER'),
 Row(description='VIOLENT OFFENDER - DUTY TO REGISTER'),
 Row(description='DISCLOSE DV VICTIM LOCATION'),
 Row(description='JUVENILE PIMPING'),
 Row(description='COMPELLING CONFESSION'),
 Row(description='ATTEMPT STRONG ARM - NO WEAPON'),
 Row(description='UNLAWFUL USE - OTHER DANGEROUS WEAPON'),
 Row(description='ATTEMPT FINANCIAL IDENTITY THEFT'),
 Row(description='CRIMINAL TRANSMISSION OF HIV'),
 Row(description='BY EXPLOSIVE'),
 Row(description=

In [12]:
unique_descriptions.sort(key=lambda x: x.description)

In [13]:
unique_descriptions

[Row(description='$300 AND UNDER'),
 Row(description='$500 AND UNDER'),
 Row(description='ABUSE / NEGLECT - CARE FACILITY'),
 Row(description='ABUSE/NEGLECT: CARE FACILITY'),
 Row(description='ADULTRY'),
 Row(description='AGG CRIM SEX ABUSE FAM MEMBER'),
 Row(description='AGG CRIMINAL SEXUAL ABUSE'),
 Row(description='AGG PO HANDS ETC SERIOUS INJ'),
 Row(description='AGG PO HANDS NO/MIN INJURY'),
 Row(description='AGG PRO EMP HANDS SERIOUS INJ'),
 Row(description='AGG PRO.EMP: HANDGUN'),
 Row(description='AGG PRO.EMP: OTHER DANG WEAPON'),
 Row(description='AGG PRO.EMP: OTHER FIREARM'),
 Row(description='AGG PRO.EMP:KNIFE/CUTTING INST'),
 Row(description='AGG RIT MUT: HANDS/FIST/FEET NO/MINOR INJURY'),
 Row(description='AGG RIT MUT: HANDS/FIST/FEET SERIOUS INJURY'),
 Row(description='AGG RITUAL MUT:HANDGUN'),
 Row(description='AGG RITUAL MUT:KNIFE/CUTTING I'),
 Row(description='AGG RITUAL MUT:OTH DANG WEAPON'),
 Row(description='AGG SEX ASSLT OF CHILD FAM MBR'),
 Row(description='AGG. D

In [14]:
#schema for dataframe
schema = StructType([
  StructField('id', IntegerType()),
  StructField('description', StringType())
  ])

In [15]:
#create for each unique descriptions an unique int and put it on a list
description_codes = []
df_description = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)

for i in unique_descriptions:
    if i.description != None:
        description_codes.append(i.description)
        index = description_codes.index(i.description)
        df_temp = spark.createDataFrame([(index, i.description)],("id", "description"))
        df_description = df_description.union(df_temp)

In [16]:
df_description.show(5)

+---+--------------------+
| id|         description|
+---+--------------------+
|  0|      $300 AND UNDER|
|  1|      $500 AND UNDER|
|  2|ABUSE / NEGLECT -...|
|  3|ABUSE/NEGLECT: CA...|
|  4|             ADULTRY|
+---+--------------------+
only showing top 5 rows



In [17]:
collection_description.drop()

In [18]:
#write the infos into the mongo db
df_description.write.format("mongo").option("uri","mongodb://mongo:27017/mydb.description").mode("append").save()

In [19]:
#test = spark.read.format("mongo").option("uri","mongodb://mongo:27017/mydb.description").load()
#test.show(20)

In [20]:
# convert the string description into integer
myfunc =  UserDefinedFunction(lambda x: description_codes.index(x) if x in description_codes else -1)
df = df.withColumn('description_int', myfunc(col('description')))

In [21]:
df.show(10)

+------+--------------------+-----------+--------------+--------------------+--------------------+--------+--------+------------+--------------------+-------------+--------------------+--------------------+----+----+---------------+
|arrest|               block|case_number|community_area|                date|         description|district|domestic|    latitude|location_description|    longitude|        primary_type|          updated_on|ward|year|description_int|
+------+--------------------+-----------+--------------+--------------------+--------------------+--------+--------+------------+--------------------+-------------+--------------------+--------------------+----+----+---------------+
| false| 050XX N ASHLAND AVE|   JB152500|             3|2001-01-01T00:00:...|AGG SEX ASSLT OF ...|     020|   false|        null|           RESIDENCE|         null|OFFENSE INVOLVING...|2018-10-27T16:00:...|  47|2001|             19|
| false|   018XX N TALMAN AV|    G055473|          null|2001-01-01T0

Convert the Primary Type

In [22]:
#select unique primary_type
unique_primary_type = df.select("primary_type").distinct().collect()

In [23]:
unique_primary_type.sort(key=lambda x: x.primary_type)

In [24]:
#schema for dataframe
schema = StructType([
  StructField('id', IntegerType()),
  StructField('primary_type', StringType())
  ])

In [25]:
#create for each unique primary type an unique int and put it on a list
primary_type_code = []
df_primary_type = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)

for i in unique_primary_type:
    if i.primary_type != None:
        primary_type_code.append(i.primary_type)
        index = primary_type_code.index(i.primary_type)
        df_temp = spark.createDataFrame([(index, i.primary_type)],("id", "primary_type"))
        df_primary_type = df_primary_type.union(df_temp)

In [26]:
collection_primary_type.drop()

In [27]:
#write the infos into the mongo db
df_primary_type.write.format("mongo").option("uri","mongodb://mongo:27017/mydb.primary_type").mode("append").save()

In [28]:
#test = spark.read.format("mongo").option("uri","mongodb://mongo:27017/mydb.primary_type").load()
#test.show(10)

In [29]:
# convert the string primary type into integer
myfunc =  UserDefinedFunction(lambda x: primary_type_code.index(x) if x in primary_type_code else -1)
df = df.withColumn('primary_type_int', myfunc(col('primary_type')))

Convert the Location description

In [30]:
#select unique location description
unique_location_description = df.select("location_description").distinct().collect()

In [31]:
unique_location_description

[Row(location_description='RAILROAD PROPERTY'),
 Row(location_description='SCHOOL - PRIVATE GROUNDS'),
 Row(location_description='VEHICLE - COMMERCIAL'),
 Row(location_description='AIRPORT TERMINAL LOWER LEVEL - NON-SECURE AREA'),
 Row(location_description='POLICE FACILITY/VEH PARKING LOT'),
 Row(location_description='RESIDENCE - YARD (FRONT / BACK)'),
 Row(location_description='MOTEL'),
 Row(location_description='CHA PARKING LOT / GROUNDS'),
 Row(location_description='SIDEWALK'),
 Row(location_description='AIRPORT TERMINAL MEZZANINE - NON-SECURE AREA'),
 Row(location_description='OTHER RAILROAD PROPERTY / TRAIN DEPOT'),
 Row(location_description='CTA GARAGE / OTHER PROPERTY'),
 Row(location_description='CAR WASH'),
 Row(location_description='TRUCKING TERMINAL'),
 Row(location_description='AIRPORT/AIRCRAFT'),
 Row(location_description='HOSPITAL'),
 Row(location_description='MEDICAL/DENTAL OFFICE'),
 Row(location_description='FEDERAL BUILDING'),
 Row(location_description='TRAILER'),
 Ro

In [32]:
unique_location_description.sort(key=lambda x: x.location_description or '')

In [33]:
unique_location_description

[Row(location_description=None),
 Row(location_description='ABANDONED BUILDING'),
 Row(location_description='AIRCRAFT'),
 Row(location_description='AIRPORT BUILDING NON-TERMINAL - NON-SECURE AREA'),
 Row(location_description='AIRPORT BUILDING NON-TERMINAL - SECURE AREA'),
 Row(location_description='AIRPORT EXTERIOR - NON-SECURE AREA'),
 Row(location_description='AIRPORT EXTERIOR - SECURE AREA'),
 Row(location_description='AIRPORT PARKING LOT'),
 Row(location_description='AIRPORT TERMINAL LOWER LEVEL - NON-SECURE AREA'),
 Row(location_description='AIRPORT TERMINAL LOWER LEVEL - SECURE AREA'),
 Row(location_description='AIRPORT TERMINAL MEZZANINE - NON-SECURE AREA'),
 Row(location_description='AIRPORT TERMINAL UPPER LEVEL - NON-SECURE AREA'),
 Row(location_description='AIRPORT TERMINAL UPPER LEVEL - SECURE AREA'),
 Row(location_description='AIRPORT TRANSPORTATION SYSTEM (ATS)'),
 Row(location_description='AIRPORT VENDING ESTABLISHMENT'),
 Row(location_description='AIRPORT/AIRCRAFT'),
 Ro

In [34]:
#schema for dataframe
schema = StructType([
  StructField('id', IntegerType()),
  StructField('location_description', StringType())
  ])

In [35]:
#create for each unique location description an unique int and put it on a list
location_description_code = []
df_location_description = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)

for i in unique_location_description:
    if i.location_description != None:
        location_description_code.append(i.location_description)
        index = location_description_code.index(i.location_description)  
        df_temp = spark.createDataFrame([(index, i.location_description)],("id", "location_description"))
        df_location_description = df_location_description.union(df_temp)
    
    #print(i.description)

In [36]:
collection_location_description.drop()

In [37]:
#write the infos into the mongo db
df_location_description.write.format("mongo").option("uri","mongodb://mongo:27017/mydb.location_description").mode("append").save()

In [38]:
#test = spark.read.format("mongo").option("uri","mongodb://mongo:27017/mydb.location_description").load()
#test.show(20)

In [39]:
# convert the string primary type into integer
myfunc =  UserDefinedFunction(lambda x: location_description_code.index(x) if x in location_description_code else -1)
df = df.withColumn('location_description_int', myfunc(col('location_description')))

In [40]:
df.show(10)

+------+--------------------+-----------+--------------+--------------------+--------------------+--------+--------+------------+--------------------+-------------+--------------------+--------------------+----+----+---------------+----------------+------------------------+
|arrest|               block|case_number|community_area|                date|         description|district|domestic|    latitude|location_description|    longitude|        primary_type|          updated_on|ward|year|description_int|primary_type_int|location_description_int|
+------+--------------------+-----------+--------------+--------------------+--------------------+--------+--------+------------+--------------------+-------------+--------------------+--------------------+----+----+---------------+----------------+------------------------+
| false| 050XX N ASHLAND AVE|   JB152500|             3|2001-01-01T00:00:...|AGG SEX ASSLT OF ...|     020|   false|        null|           RESIDENCE|         null|OFFENSE INV

In [41]:
# drop the columns with the string values
df = df.drop('description')
df = df.drop('primary_type')
df = df.drop('location_description')


## Drop null values from columne

In [None]:
# show the number of null values from each columne
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
# drop the columns where latitude is null
df = df.na.drop(subset=["latitude"])

## Convert data string to datetime

In [None]:
df.show(10)

In [None]:
# convert the date into datetime format
myfunc =  UserDefinedFunction(lambda x: datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f'), TimestampType())
df = df.withColumn('date_time', myfunc(col('date')))

In [None]:
df.show(5)

In [None]:
#convert the date_time into date format
myfunc =  UserDefinedFunction(lambda x: datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f'), DateType())
df = df.withColumn('date', myfunc(col('date')))

In [None]:
df.show(5)

### Save Cleaned Data to Mongo

In [None]:
collection_crime_clean.drop()

In [None]:
#write the cleaned dataframe into a new mongo collection
df.write.format("mongo").option("uri","mongodb://mongo:27017/mydb.chicago_crime_cleaned").mode("append").save()

In [None]:
spark.stop()