# Chicago Crimes Data Model
### Data Engineering Capstone Project

#### Project Summary
the aim of this project is to build single-source-of-truth to help perform further analysis and visualization regarding Chicago crimes from the period 2001 till 2022.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import os
import pandas as pd
import json
import pyspark
from pyspark import SparkContext 
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import col, max, sum
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.functions import lit

In [2]:
from pyspark import SparkConf
from pyspark import SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

### Step 1: Scope the Project and Gather Data

#### Scope 
in this project, we'll establish a data model to prepare data for further analysis.
this will be done by extracting, joining and forming the corresponding tables (Districts, Primary Type Dimensions and Crimes Fact table).

tools used in this project:
- Jupyter Notebook.
- Python.
- PySpark library.

#### Describe and Gather Data 
##### Chicago CrimeLocation data - CSV file:
- crimes grouped by year and primary_type of each crime with its count.
- Source: https://www.kaggle.com/datasets/elijahtoumoua/chicago-analysis-of-crime-data-dashboard?select=CrimeLocation.csv

##### Chicago districts data - JSON file:
- this file has chicago districts' names.
- Source: https://data.cityofchicago.org/api/views/zidz-sdfj/rows.json?accessType=DOWNLOAD

In [3]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [4]:
# Chicago CrimeLocation data
locations_df = spark.read.csv("Datasets/CrimeLocation.csv",
                              header='true',
                              inferSchema='true')
locations_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- district: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- crime_count: integer (nullable = true)



In [5]:
locations_df.count()

5417807

In [6]:
# Chicago districts data 
with open('Datasets/chicago_districts.json', 'r') as f:
    data = json.load(f)

df = pd.DataFrame(data['data'],
                            columns = ['sid', 'id', 'position', 'created_at', 'created_meta', 'updated_at',
                                       'updated_meta', 'meta', 'district_name', 'designation_date'])
df.head(5)

Unnamed: 0,sid,id,position,created_at,created_meta,updated_at,updated_meta,meta,district_name,designation_date
0,row-kyf5_yb7r.vrwc,00000000-0000-0000-E6B2-7767EA2D2877,0,1566178027,,1566178027,,{ },Old Town Triangle,244278000
1,row-g4ak.p5ja.8pv8,00000000-0000-0000-2D36-D11E47447804,0,1566178027,,1566178027,,{ },Milwaukee Avenue,1207724400
2,row-shrt~gdnn~3kcf,00000000-0000-0000-9B11-357FA5616303,0,1566178027,,1566178027,,{ },Astor Street,188208000
3,row-vapw~nh6d_sywy,00000000-0000-0000-7C6C-ACED956B631A,0,1566178027,,1566178027,,{ },Beverly/Morgan Park Railroad Stations,797929200
4,row-hmi5~yp2s.6ttw,00000000-0000-0000-EC91-F9519EBE1B46,0,1566178027,,1566178027,,{ },Black Metropolis-Bronzeville,905324400


In [7]:
# create districts' spark dataframe with its schema
schema = StructType([ \
    StructField("sid",StringType(),True), \
    StructField("id",StringType(),True), \
    StructField("position",StringType(),True), \
    StructField("created_at", StringType(), True), \
    StructField("created_meta", StringType(), True), \
    StructField("updated_at", StringType(), True), \
    StructField("updated_meta", StringType(), True), \
    StructField("meta", StringType(), True), \
    StructField("district_name", StringType(), True), \
    StructField("designation_date", IntegerType(), True) \
  ])
districts_df = spark.createDataFrame(df, schema) 
districts_df.printSchema()

root
 |-- sid: string (nullable = true)
 |-- id: string (nullable = true)
 |-- position: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- created_meta: string (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- updated_meta: string (nullable = true)
 |-- meta: string (nullable = true)
 |-- district_name: string (nullable = true)
 |-- designation_date: integer (nullable = true)



In [8]:
districts_df.count()

59

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

In [9]:
locations_df.show(10)

+----+--------+------------+-------------+-------------+-----------+
|year|district|    latitude|    longitude| primary_type|crime_count|
+----+--------+------------+-------------+-------------+-----------+
|2001|       6|41.726587107|-87.628650815|OTHER OFFENSE|          1|
|2001|       6|41.743292263|-87.601517412|      ROBBERY|          1|
|2001|       7|41.784650088|-87.665708402|      ASSAULT|          2|
|2001|      24|42.016181819|-87.666751351|      ROBBERY|          1|
|2001|      10|41.861351884|-87.694550926|      BATTERY|          5|
|2001|      14|41.917669402|-87.679950307|      BATTERY|          1|
|2001|      10|41.856919058|-87.726270093|      BATTERY|          3|
|2001|      20| 41.98537745|-87.657686188|      BATTERY|          1|
|2001|      11|41.879549718|-87.705125189|      BATTERY|          2|
|2001|       3|41.759980814| -87.56624795|      BATTERY|          2|
+----+--------+------------+-------------+-------------+-----------+
only showing top 10 rows



In [10]:
locations_df.describe().show()

+-------+-----------------+------------------+-------------------+-------------------+-----------------+------------------+
|summary|             year|          district|           latitude|          longitude|     primary_type|       crime_count|
+-------+-----------------+------------------+-------------------+-------------------+-----------------+------------------+
|  count|          5417807|           5417760|            5417807|            5417807|          5417807|           5417807|
|   mean| 2009.78657859167|11.543600491716134|  41.84087468860387| -87.67337706996014|             null|1.3804805154557924|
| stddev|5.989131793791821| 6.918199651874521|0.08832914173955167|0.05830104426599391|             null| 2.726444299721679|
|    min|             2001|                 1|       41.644585429|      -87.939732936|            ARSON|                 1|
|    max|             2022|                31|       42.022910333|      -87.524529378|WEAPONS VIOLATION|               746|
+-------

In [11]:
districts_df.show(10)

+------------------+--------------------+--------+----------+------------+----------+------------+----+--------------------+----------------+
|               sid|                  id|position|created_at|created_meta|updated_at|updated_meta|meta|       district_name|designation_date|
+------------------+--------------------+--------+----------+------------+----------+------------+----+--------------------+----------------+
|row-kyf5_yb7r.vrwc|00000000-0000-000...|       0|1566178027|        null|1566178027|        null| { }|   Old Town Triangle|       244278000|
|row-g4ak.p5ja.8pv8|00000000-0000-000...|       0|1566178027|        null|1566178027|        null| { }|    Milwaukee Avenue|      1207724400|
|row-shrt~gdnn~3kcf|00000000-0000-000...|       0|1566178027|        null|1566178027|        null| { }|        Astor Street|       188208000|
|row-vapw~nh6d_sywy|00000000-0000-000...|       0|1566178027|        null|1566178027|        null| { }|Beverly/Morgan Pa...|       797929200|
|row-h

In [12]:
# used this solution to find the max year:
# https://stackoverflow.com/questions/38377894/how-to-get-maxdate-from-given-set-of-data-grouped-by-some-fields-using-pyspark
# show latest date in CrimeDate table dataset
temp_df = (locations_df.groupBy("primary_type")
    .agg(max("year")))
temp_df.show()

+--------------------+---------+
|        primary_type|max(year)|
+--------------------+---------+
|OFFENSE INVOLVING...|     2022|
|CRIMINAL SEXUAL A...|     2022|
|            STALKING|     2022|
|PUBLIC PEACE VIOL...|     2022|
|           OBSCENITY|     2022|
|NON-CRIMINAL (SUB...|     2018|
|               ARSON|     2022|
|   DOMESTIC VIOLENCE|     2001|
|            GAMBLING|     2022|
|   CRIMINAL TRESPASS|     2022|
|             ASSAULT|     2022|
|LIQUOR LAW VIOLATION|     2022|
| MOTOR VEHICLE THEFT|     2022|
|               THEFT|     2022|
|             BATTERY|     2022|
|             ROBBERY|     2022|
|            HOMICIDE|     2022|
|           RITUALISM|     2020|
|    PUBLIC INDECENCY|     2022|
|   HUMAN TRAFFICKING|     2022|
+--------------------+---------+
only showing top 20 rows



In [13]:
locations_nulls = locations_df.groupBy('district').count().show()

+--------+------+
|district| count|
+--------+------+
|      31|   175|
|      12|268369|
|      22|197319|
|    null|    47|
|       1|126393|
|       6|317012|
|      16|189863|
|       3|270634|
|      20| 97069|
|       5|255214|
|      19|241873|
|      15|228845|
|      17|175945|
|       9|288883|
|       4|324153|
|       8|392556|
|       7|320362|
|      10|248514|
|      25|340574|
|      24|175258|
+--------+------+
only showing top 20 rows



#### Cleaning Steps
1. drop all rows with 'district' is null -47 rows-.
2. create corresponding table (Districts dimension) by joining locations_df and districts_df using 'district' column.
3. create corresponding table (Primary Type dimension) by adding new column 'primary_type_code' with 'primary_type' to form the table.
4. create corresponding table (Crimes fact table) by joining locations_df with Primary Type table and removing 'primary_type' column from locations_df to form the final fact table.
5. aggregate the fact table to be grouped by 'year', 'district' and 'primary_type'.

In [14]:
# 1. drop all rows with 'district' is null
locations_df = locations_df.na.drop()
locations_df.count()

5417760

In [15]:
# 2. create Districts dimension
districts_temp = districts_df.select(['district_name'])
districts_temp = districts_temp.withColumn('district', row_number().over(Window.orderBy(monotonically_increasing_id())))

In [16]:
districts_temp.show()

+--------------------+--------+
|       district_name|district|
+--------------------+--------+
|   Old Town Triangle|       1|
|    Milwaukee Avenue|       2|
|        Astor Street|       3|
|Beverly/Morgan Pa...|       4|
|Black Metropolis-...|       5|
|     Surf-Pine Grove|       6|
|Five Houses on Av...|       7|
|     Hawthorne Place|       8|
|Historic Michigan...|       9|
|   Hutchinson Street|      10|
|Ukrainian Village...|      11|
|           Motor Row|      12|
|  Arlington & Roslyn|      13|
|       Old Edgebrook|      14|
|        East Village|      15|
|  Printing House Row|      16|
|             Pullman|      17|
|               Villa|      18|
|        Jewelers Row|      19|
|Milwaukee-Diverse...|      20|
+--------------------+--------+
only showing top 20 rows



In [17]:
districts_temp.count()

59

In [18]:
locations_df_temp = (locations_df.groupBy("district").agg(max("latitude"), max("longitude")))
districts_dim = districts_temp.join(locations_df_temp, on=['district']  , how = 'inner').dropDuplicates()
districts_dim = districts_dim.withColumnRenamed('max(latitude)', 'latitude')
districts_dim = districts_dim.withColumnRenamed('max(longitude)', 'longitude')
districts_dim = districts_dim.na.drop()
districts_dim.show()

+--------+--------------------+------------+-------------+
|district|       district_name|    latitude|    longitude|
+--------+--------------------+------------+-------------+
|      31| McCormick Row House|42.019387999|-87.535277004|
|      12|           Motor Row|41.965391521| -87.60501807|
|      22|Old Chicago Water...|41.855718661|-87.589645116|
|       1|   Old Town Triangle|41.987404624|-87.549249732|
|       6|     Surf-Pine Grove|42.018755041|-87.558095071|
|      16|  Printing House Row| 42.01938364|-87.582560321|
|       3|        Astor Street|41.799461412|-87.552612861|
|      20|Milwaukee-Diverse...| 42.00458471|-87.629919733|
|       5|Black Metropolis-...|41.886932776|-87.543478545|
|      19|        Jewelers Row|41.983969039|-87.587747647|
|      15|        East Village|41.942338896|-87.630867327|
|       9|Historic Michigan...|41.976449458|-87.602820231|
|      17|             Pullman| 42.01390135|-87.661310629|
|       4|Beverly/Morgan Pa...| 41.79220444|-87.52452937

In [19]:
districts_dim.count()

24

In [20]:
# 3. create Primary Type dimension
primary_type_dim = locations_df.select(['primary_type']).dropDuplicates()
primary_type_dim = primary_type_dim.withColumn('primary_type_code', row_number().over(Window.orderBy(monotonically_increasing_id())))
primary_type_dim.show()

+--------------------+-----------------+
|        primary_type|primary_type_code|
+--------------------+-----------------+
|OFFENSE INVOLVING...|                1|
|CRIMINAL SEXUAL A...|                2|
|            STALKING|                3|
|PUBLIC PEACE VIOL...|                4|
|           OBSCENITY|                5|
|NON-CRIMINAL (SUB...|                6|
|               ARSON|                7|
|   DOMESTIC VIOLENCE|                8|
|            GAMBLING|                9|
|   CRIMINAL TRESPASS|               10|
|             ASSAULT|               11|
|LIQUOR LAW VIOLATION|               12|
| MOTOR VEHICLE THEFT|               13|
|               THEFT|               14|
|             BATTERY|               15|
|             ROBBERY|               16|
|            HOMICIDE|               17|
|           RITUALISM|               18|
|    PUBLIC INDECENCY|               19|
|   HUMAN TRAFFICKING|               20|
+--------------------+-----------------+
only showing top

In [21]:
primary_type_dim.count()

34

In [22]:
locations_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- district: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- crime_count: integer (nullable = true)



In [23]:
# 4. create Crimes fact table
crimes_fact = locations_df.select(['year', 'district', 'primary_type', 'crime_count'])
primary_type_dim_temp = primary_type_dim.select(['primary_type_code', 'primary_type'])

crimes_fact = crimes_fact.join(primary_type_dim_temp, on=['primary_type']  , how = 'inner')
crimes_fact = crimes_fact.select(['year', 'district', 'primary_type_code', 'crime_count'])
crimes_fact.show()

+----+--------+-----------------+-----------+
|year|district|primary_type_code|crime_count|
+----+--------+-----------------+-----------+
|2001|      15|                1|          1|
|2001|       6|                1|          1|
|2001|      11|                1|          1|
|2001|      22|                1|          1|
|2001|       3|                1|          1|
|2001|       7|                1|          1|
|2001|      24|                1|          1|
|2001|       5|                1|          1|
|2001|      10|                1|          1|
|2001|       5|                1|          2|
|2001|      12|                1|          1|
|2001|      17|                1|          1|
|2001|       4|                1|          1|
|2001|       8|                1|          1|
|2001|       3|                1|          1|
|2001|       5|                1|          1|
|2001|      15|                1|          1|
|2001|      24|                1|          1|
|2001|       2|                1| 

In [24]:
crimes_fact.count()

5417760

In [25]:
# 5. fact table aggregation by 'year', 'district' and 'primary_type_code'
crimes_fact = (crimes_fact.groupBy("year", "district", "primary_type_code")
    .agg(sum("crime_count")))
crimes_fact = crimes_fact.withColumnRenamed('sum(crime_count)', 'crime_count')
crimes_fact.show()

+----+--------+-----------------+-----------+
|year|district|primary_type_code|crime_count|
+----+--------+-----------------+-----------+
|2005|      12|                1|         81|
|2017|       1|                2|         65|
|2010|       7|                2|         91|
|2008|      17|                3|          6|
|2018|      25|                3|          7|
|2009|      11|                3|          2|
|2006|       2|                3|          5|
|2021|      12|                3|         19|
|2019|      25|                4|         89|
|2020|      22|                4|         16|
|2018|       4|                5|          7|
|2021|       4|                7|         44|
|2004|      24|                9|         39|
|2009|      18|                9|          9|
|2010|      24|                9|         11|
|2002|      15|               10|        267|
|2013|      14|               10|        223|
|2014|       2|               10|        377|
|2004|      22|               10| 

In [26]:
crimes_fact.count()

12945

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
##### Data Model : 
<img src='images/ERD.png'>

this was the last normalization level of the given data as it shows in the ERD.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. create corresponding table (Districts dimension) by joining locations_df and districts_df using 'district' column.
2. create corresponding table (Primary Type dimension) by adding new column 'primary_type_code' with 'primary_type' to form the table.
3. create corresponding table (Crimes fact table) by joining locations_df with Primary Type table and removing 'primary_type' column from locations_df to form the final fact table.
4. aggregate the fact table to be grouped by 'year', 'district' and 'primary_type'.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

already created the data model in the cleaning step as it results into:
- districts_dim 
- primary_type_dim
- crimes_fact

will write the tables into parquet as follows:

In [27]:
# districts_dim
districts_dim.write.mode('overwrite').parquet("sas_data/districts_dim.parquet")

In [28]:
# primary_type_dim
primary_type_dim.write.mode('overwrite').parquet("sas_data/primary_type_dim.parquet")

In [29]:
# crimes_dim
crimes_fact.write.mode('overwrite').parquet("sas_data/crimes_fact.parquet")

#### 4.2 Data Quality Checks
1. Schema check : to ensure that same data schema is used for each table.
2. tables' count\length to ensure that same data are retrieved.
 
##### Run Quality Checks

In [30]:
# Perform quality checks here
# districts_dim
districts_df_temp = spark.read.parquet("sas_data/districts_dim.parquet")
districts_df_temp.createOrReplaceTempView("districts_dim")

districts_df_temp.printSchema()

root
 |-- district: integer (nullable = true)
 |-- district_name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [31]:
districts_df_temp.count()

24

In [32]:
# primary_type_dim
primary_type_temp = spark.read.parquet("sas_data/primary_type_dim.parquet")
primary_type_temp.createOrReplaceTempView("primary_type_dim")

primary_type_temp.printSchema()

root
 |-- primary_type: string (nullable = true)
 |-- primary_type_code: integer (nullable = true)



In [33]:
primary_type_temp.count()

34

In [34]:
# crimes_fact 
crimes_temp = spark.read.parquet("sas_data/crimes_fact.parquet")
crimes_temp.createOrReplaceTempView("crimes_fact")

crimes_temp.printSchema()

root
 |-- year: integer (nullable = true)
 |-- district: integer (nullable = true)
 |-- primary_type_code: integer (nullable = true)
 |-- crime_count: long (nullable = true)



In [35]:
crimes_temp.count()

12945

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

##### - Districts Dimension : 
| Attribute | Data-type | Description |
| :-: | :-: | :-: |
| district | int | district code as primary key
| district_name | string | district name
| latitude | double | district latitude
| longitude | double | district longitude


##### - Primary Type Dimension : 
| Attribute | Data-type | Description |
| :-: | :-: | :-: |
| primary_type_code | int | primary type code as primary key
| primary_type | string | primary type which is crime description

##### - Crimes Fact table : 
| Attribute | Data-type | Description |
| :-: | :-: | :-: |
| year | int | year in YYYY format
| district | int | district code
| primary_type_code | int | primary type code
| crime_count | int | crimes count

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project :
used jupyter and pyspark as it's the fastest way to do data mining in less time with less efforts.

* Propose how often the data should be updated and why :
took the chicago crimes data from Kaggle and might be updated every half year or so.
if it got updated, the data files will be uploaded again unless the data publisher provided an API or something to get data updated automatically.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x :
 in this case, should use a Redshift or Spark cluster to handle larger data and provide better results with less time and better performance that the local computer.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day :
 should use scheduling service such as Airflow to schedule the updating job to be daily at 7 am.
 
 * The database needed to be accessed by 100+ people :
 should use Redshift to handle larger concurrent users as it can handle up to 2000 concurrent connections.
 
 * Future Development :
 - to have an automated data source rather than local source.
 - add crime-based level of data instead of aggregated level (here by primary_type).
 - create corrosponding cluster to handle all the data and its ETL.
 - schedule data updating job.
 - add more tables to give more useful data, such as crimes' full details -place, number of victims, police station, ..etc.- and criminals' data.