# The Effect of Weather and Seasonality on Crime in Chicago
### Data Engineering Capstone Project

#### Project Summary
Chicago, the windy city, has brutal, cold winters and hot, humid summers. Chicago is also know for it's high rate of crime and murder. The aim of this project is to build a data pipeline and framework that makes it easy for a data science team to quickly load clean data with an intuitive schema that makes is easy to run statistical models and machine learning algorithms on Spark.

The project contains the following modules:
* Scope of the Project Data
    - where the data comes from and tools
* Project Setup
    - required libraries, environment setup
* Explore and Assess the data
    - identifying data quality issues
* Defining the Data Model
    - ETL, Schema, Data quality checks
* Data dictionary
* Project Write Up

### Scope of the Project and Data

#### Scope 

To analyze the effects of weather and seasonality on crime in Chicago, I need at least two data sources -- one containing crime data for Chicago and one containing weather data for Chicago.     

The city of Chicago publishes crime data and makes it publically avalibile on their [website](https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2).    

For the weather data, I chose [NOAA](https://www.noaa.gov), as it is a standard source for weather data that a lot of people in the data science community are familiar with.     

The primary aim of this data pipeline is to wrangle these two data sources into a schema that is intuitive and easy to query to perform timeseries and statistical modeling.     

Because the primary goal is to model the data, the tool I am using is Spark. I chose Spark because the wrangling and modeling can all be done using the same tool. And because the statistical models will likely be computationally intensive, and therefore, a distributed data archateture is needed.


#### Data      

`crime data`
* the crime data is from the City of Chicago website located [here](https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2)
* the dates range from 2001 to present
* 6,975,703 total rows in csv export     

`weather data`
* the weather data is from [NOAA](https://www.noaa.gov)
* the dates range from 2001 to present
* 6,836 total rows

`IUCR data` 
* IUCR data is located [here](https://data.cityofchicago.org/Public-Safety/Chicago-Police-Department-Illinois-Uniform-Crime-R/c7ck-438e)
* standard code for reporting type of crime
* 401 total rows in csv export


#### Tools
All data wrangling and analyses are done in *Spark*. 

## Project Setup

#### Required Python Libraries

In [1]:
import configparser
from datetime import datetime
import os

import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

#### Get AWS credentials

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEY']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEY']['AWS_SECRET_ACCESS_KEY']

#### Create Spark session

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

## Explore and Assess the data

### Part 1 - Crime Data | Part 2 - Weather Data | Part 3 - IUCR Data

### Crime Data

After evaluating the crime data and schema, I am primarily interested in the following variables, and need to perform the following data wrangling steps: 
* ID
    - A unique crime event. 
    - I want to verify that this in the primary key of the raw data.  
* Date
    - Using date is a greate way to distill seasonal patterns. 
    - This value should never be null.
    - This value needs to be formated from 01/01/2019 to 2019-01-01
* IUCR
    - Universal code that distinguishes type of crime. For example, assault, theft, burglary, etc. 
    - Potentially, certain types of crimes will have seasonal patterns, while other types of crime may not
    - I do not want to include records with a null IUCR code
* Domestic
    - Indicates if crime was committed in a domestic setting or not. 
    - It is possible that crimes that occure at a residence will be less impacted by storm and seasonal effects of weather
    - Null values can be kept, and excluded if needed for specific analyses
* Community Area
    - It is possible that in certain areas of Chicago crime follows seasonal patterns, while it other parts of the city, it does not
    - Null values can be kept, and excluded if needed for specific analyses
    - coalesce() null values to 'no_community_reported'
* Crime Count
    - How many crimes occured on a given day, in a specific community, and the specific type of crime that occured
    - A distinct count of the ID column

Read in data and show the Schema

In [4]:
# Read in the crime data
crime_raw = spark.read.csv("s3a://chicago-crime-ddd/Chicago_Crime_2001_to_Present.csv", header=True)

In [5]:
# create crime_raw view
crime_raw.createOrReplaceTempView("crime_raw")

Select variables of interest

In [6]:
crime_small = spark.sql('''
                        SELECT
                             ID,
                             Date,
                             IUCR,
                             Domestic,
                             `Community Area`,
                             `Case Number`
                        FROM crime_raw                          
                        ORDER BY DATE
                        ''')

# create crime_small view
crime_small.createOrReplaceTempView("crime_small")

In [7]:
# Show the schema
crime_small.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- Case Number: string (nullable = true)



#### Determine primary key
##### Is the 'ID' colume truely the primary key?    
* Yes, the ID is the primary key. But Case Numbers can have multiple records.

In [22]:
# check that row count is the ID column is indeed the Primary Key
# a very small number of Case Numbers have multiple records. 
spark.sql('''
          SELECT COUNT(*) AS row_count,
                 COUNT(distinct ID) as ID_dist_cnt,
                 COUNT(distinct `Case Number`) as CASE_NUM_dist_cnt
          FROM crime_small
          ''').show()

+---------+-----------+-----------------+
|row_count|ID_dist_cnt|CASE_NUM_dist_cnt|
+---------+-----------+-----------------+
|  6975703|    6975703|          6975280|
+---------+-----------+-----------------+



##### Find the Case Numbers with the most records
* since the max records for a Case Number is 6, this is not a data quality concern

In [23]:
spark.sql('''
          SELECT DISTINCT
             `Case Number` as case_num,
              COUNT(*) as row_count
          FROM crime_small          
          GROUP BY 1
          HAVING row_count > 1
          ORDER BY row_count DESC
          LIMIT 5
          ''').show()

+--------+---------+
|case_num|row_count|
+--------+---------+
|HZ140230|        6|
|HJ590004|        6|
|HS256531|        5|
|HP296582|        5|
|HJ104730|        4|
+--------+---------+



##### Look at all records for the Case Numbers with 6 records each
* records are written to s3 for easy viewing

In [24]:
# I know that Case Number HZ140230 and HJ590004 have the most records, with 6 records each. So I want to take a look and see what is going on
test = spark.sql('''
          SELECT *
          FROM crime_small          
          WHERE `Case Number` IN ('HZ140230', 'HJ590004')
          ''')

# convert to pandas df
test_pd = test.toPandas()

# write to csv on s3
test.write.csv(os.path.join("s3a://chicago-crime-ddd/", 'test'), 'overwrite')

##### Check for null values in Case Number
* four values are null; since it is so few lines are null, I am comfortable throwing them out

In [8]:
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN `Case Number` IS NULL THEN 'bad_null'
                  WHEN `Case Number` IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS case_num_null_status,    
              COUNT(*) as row_count
          FROM crime_small          
          GROUP BY 1
          ''').show()

+--------------------+---------+
|case_num_null_status|row_count|
+--------------------+---------+
|                good|  6975699|
|            bad_null|        4|
+--------------------+---------+



#### Formate Date
##### Test and verify date formating functions
Format date to standard database form ('yyyy-MM-dd')    
The date from the crime date and the date from the weather data must be the same data type.

In [9]:
spark.sql('''
          SELECT DISTINCT 
              Date,
              substr(Date, 1, 2) AS sub_month,
              substr(Date, 4, 2) AS sub_day,
              substr(Date, 7, 4) AS sub_year,
              to_date(concat(substr(Date, 7, 4), '-', substr(Date, 1, 2), '-', substr(Date, 4, 2)), 'yyyy-MM-dd') AS good_date,
              month(to_date(concat(substr(Date, 7, 4), '-', substr(Date, 1, 2), '-', substr(Date, 4, 2)), 'yyyy-MM-dd')) AS good_month
          FROM crime_small
          LIMIT 5
          ''').show() 

+--------------------+---------+-------+--------+----------+----------+
|                Date|sub_month|sub_day|sub_year| good_date|good_month|
+--------------------+---------+-------+--------+----------+----------+
|01/01/2001 01:00:...|       01|     01|    2001|2001-01-01|         1|
|01/01/2001 01:00:...|       01|     01|    2001|2001-01-01|         1|
|01/01/2001 01:00:...|       01|     01|    2001|2001-01-01|         1|
|01/01/2001 01:01:...|       01|     01|    2001|2001-01-01|         1|
|01/01/2001 01:02:...|       01|     01|    2001|2001-01-01|         1|
+--------------------+---------+-------+--------+----------+----------+



##### Check for null values in Date
* no null values

In [10]:
# check if the date is ever null - no nulls!
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN Date IS NULL THEN 'bad_null'
                  WHEN Date IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS Date_null_status,    
              COUNT(*) as row_count
          FROM crime_small          
          GROUP BY 1
          ''').show()

+----------------+---------+
|Date_null_status|row_count|
+----------------+---------+
|            good|  6975703|
+----------------+---------+



#### IUCR -- check for null values
* no null values

In [11]:
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN IUCR IS NULL THEN 'bad_null'
                  WHEN IUCR IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS IUCR_null_status,    
              COUNT(*) as row_count
          FROM crime_small          
          GROUP BY 1
          ''').show()

+----------------+---------+
|IUCR_null_status|row_count|
+----------------+---------+
|            good|  6975703|
+----------------+---------+



#### Community Area -- check for null values
* Community Area is null 8.8% of the time
* Null values will be included in fact table
* Null can easily be excluded when performing analyses at the community level

In [12]:
# check if community area is ever null
# community area is null a lot, 8.8% of the time -- 
# I want to be able to evalutate community areas, so it determined that we 
# still have plenty of data to pick up on seasonal trends
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN `Community Area` IS NULL THEN 'bad_null'
                  WHEN `Community Area` IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS CommArea_null_status,    
              COUNT(*) as row_count
          FROM crime_small          
          GROUP BY 1
          ''').show()

+--------------------+---------+
|CommArea_null_status|row_count|
+--------------------+---------+
|                good|  6362209|
|            bad_null|   613494|
+--------------------+---------+



#### Domestic -- check for null values
* no null values

In [13]:
# check if Domestic is ever null - Domestic is never null!
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN Domestic IS NULL THEN 'bad_null'
                  WHEN Domestic IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS Domestic_null_status,    
              COUNT(*) as row_count
          FROM crime_small          
          GROUP BY 1
          ''').show()

+--------------------+---------+
|Domestic_null_status|row_count|
+--------------------+---------+
|                good|  6975703|
+--------------------+---------+



### Weather Data

After reviewing the data and schema of the weather data, I am primarying interested in the following variables, and will need to perform the following data wrangling steps:
* DATE
    - Verify the is the primary key of the dataset. 
* PRCP
    - precipitation (inches)
    - column should never be null
* SNOW
    - snowfall (inches)
    - columm should never be null
* SNWD
    - snow depth (inches)
    - coalesce() null values to zero
* TMAX
    - max daily temp (F)
    - column should never be null
* TMIN
    - TMIN - min daily temp (F)
    - column should never be null    

Read in the data and show the schema

In [14]:
# read in data
weather_raw = spark.read.csv("s3a://chicago-crime-ddd/chicago-weather.csv", header=True)

Create `weather_raw` Spark SQL View

In [15]:
# weather df
# 6,836
weather_raw.createOrReplaceTempView("weather_raw")

Select only most pertinent columns

In [16]:
weather_small = spark.sql('''
                          SELECT DISTINCT
                              DATE,
                              PRCP,     
                              SNOW,     
                              SNWD,     
                              TMAX,     
                              TMIN,    
                              TSUN                           
                          FROM weather_raw                          
                          ORDER BY DATE
                          ''')

weather_small.show(5)

+----------+----+----+----+----+----+----+
|      DATE|PRCP|SNOW|SNWD|TMAX|TMIN|TSUN|
+----------+----+----+----+----+----+----+
|2001-01-01|0.00| 0.0|17.0|  24|   5|null|
|2001-01-02|0.00| 0.0|15.0|  19|   5|null|
|2001-01-03|0.00| 0.0|14.0|  28|   7|null|
|2001-01-04|0.00| 0.0|14.0|  30|  19|null|
|2001-01-05|0.00| 0.0|13.0|  36|  21|null|
+----------+----+----+----+----+----+----+
only showing top 5 rows



In [17]:
# Show the schema
weather_small.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNOW: string (nullable = true)
 |-- SNWD: string (nullable = true)
 |-- TMAX: string (nullable = true)
 |-- TMIN: string (nullable = true)
 |-- TSUN: string (nullable = true)



Create `weather_small` Spark SQL View

In [18]:
# create view
weather_small.createOrReplaceTempView("weather_small")

#### Determine Primary Key
The primary key should be the DATE column
* Yes, the DATE column is the primary key

In [19]:
# check that row count is the DATE column is indeed the Primary Key
spark.sql('''  
          SELECT COUNT(*) AS row_count,
                 COUNT(distinct DATE) as DATE_dist_cnt,
                 COUNT(distinct STATION) as STATION_dist_cnt
          FROM weather_raw
          ''').show()

+---------+-------------+----------------+
|row_count|DATE_dist_cnt|STATION_dist_cnt|
+---------+-------------+----------------+
|     6836|         6836|               1|
+---------+-------------+----------------+



Verify date range is the same at the crime data
* Yes, the date range in the same as the crime data

In [20]:
spark.sql('''  
          SELECT DISTINCT
              MIN(DATE) AS min_date,
              MAX(DATE) AS max_date
          FROM weather_raw
          ''').show()

+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2001-01-01|2019-09-19|
+----------+----------+



#### PRCP -- check for null values

In [21]:
# null values in my variables of interest

# PRCP - no nulls
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN PRCP IS NULL THEN 'bad_null'
                  WHEN PRCP IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS PRCP_null_status,    
              COUNT(*) as row_count
          FROM weather_small          
          GROUP BY 1
          ''').show()

+----------------+---------+
|PRCP_null_status|row_count|
+----------------+---------+
|            good|     6836|
+----------------+---------+



#### SNOW -- check for null values

In [22]:
# SNOW
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN SNOW IS NULL THEN 'bad_null'
                  WHEN SNOW IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS SNOW_null_status,    
              COUNT(*) as row_count
          FROM weather_small          
          GROUP BY 1
          ''').show()

+----------------+---------+
|SNOW_null_status|row_count|
+----------------+---------+
|            good|     6836|
+----------------+---------+



#### SNWD -- check for null values

In [23]:
# SNWD - colese nulls to zero
# test the nullable false thing here
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN SNWD IS NULL THEN 'bad_null'
                  WHEN SNWD IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS SNWD_null_status,    
              COUNT(*) as row_count
          FROM weather_small          
          GROUP BY 1
          ''').show()

+----------------+---------+
|SNWD_null_status|row_count|
+----------------+---------+
|        bad_null|       31|
|            good|     6805|
+----------------+---------+



Testing coalesce()

In [24]:
# replacing null values with zeros
snow_col = spark.sql('''
                     SELECT DISTINCT
                         DATE,
                         coalesce(SNWD, 0.0) as good_snwd
                     FROM weather_small          
                     ''')

In [25]:
# create spark sql view
snow_col.createOrReplaceTempView("snow_col")

In [26]:
# make sure the nulls are gone - they are gone!
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN good_snwd IS NULL THEN 'bad_null'
                  WHEN good_snwd IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS good_snwd_null_status,    
              COUNT(*) as row_count
          FROM snow_col          
          GROUP BY 1
          ''').show()

+---------------------+---------+
|good_snwd_null_status|row_count|
+---------------------+---------+
|                 good|     6836|
+---------------------+---------+



#### TMAX -- check for null values

In [27]:
# TMAX
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN TMAX IS NULL THEN 'bad_null'
                  WHEN TMAX IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS TMAX_null_status,    
              COUNT(*) as row_count
          FROM weather_small          
          GROUP BY 1
          ''').show()

+----------------+---------+
|TMAX_null_status|row_count|
+----------------+---------+
|            good|     6836|
+----------------+---------+



#### TMIN -- check for null values

In [28]:
# TMIN
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN TMIN IS NULL THEN 'bad_null'
                  WHEN TMIN IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS TMIN_null_status,    
              COUNT(*) as row_count
          FROM weather_small          
          GROUP BY 1
          ''').show()

+----------------+---------+
|TMIN_null_status|row_count|
+----------------+---------+
|            good|     6836|
+----------------+---------+



#### TSUN -- check for null values

In [29]:
# TSUN - a lot of nulls, so I'm going to throw this out
spark.sql('''
          SELECT DISTINCT
              CASE
                  WHEN TSUN IS NULL THEN 'bad_null'
                  WHEN TSUN IS NOT NULL THEN 'good'
                  ELSE 'weird'
              END AS TSUN_null_status,    
              COUNT(*) as row_count
          FROM weather_small          
          GROUP BY 1
          ''').show()

+----------------+---------+
|TSUN_null_status|row_count|
+----------------+---------+
|        bad_null|     6101|
|            good|      735|
+----------------+---------+



### IUCR Data

After reviewing the data and schema of the IUCR data, I am primarying interested in the following variables, and will need to perform the following data wrangling steps:
* IUCR
    - Verify the is the primary key of the dataset. 
* PRIMARY DESCRIPTION / SECONDARY DESCRIPTION
    - Verify that the combination of these two fields is unique.

In [30]:
IUCR_raw = spark.read.csv("s3a://chicago-crime-ddd/IUCR_Codes.csv", header=True)

In [31]:
IUCR_raw.printSchema()

root
 |-- IUCR: string (nullable = true)
 |-- PRIMARY DESCRIPTION: string (nullable = true)
 |-- SECONDARY DESCRIPTION: string (nullable = true)
 |-- INDEX CODE: string (nullable = true)



In [32]:
IUCR_raw.createOrReplaceTempView("IUCR_raw")

In [33]:
spark.sql('''  
          SELECT COUNT(*) AS row_count,
                 COUNT(distinct IUCR) as IUCR_dist_cnt,
                 COUNT(distinct `PRIMARY DESCRIPTION` || `SECONDARY DESCRIPTION`) as PDSD_dist_cnt
          FROM IUCR_raw
          ''').show()

+---------+-------------+-------------+
|row_count|IUCR_dist_cnt|PDSD_dist_cnt|
+---------+-------------+-------------+
|      401|          401|          401|
+---------+-------------+-------------+



## Defining the Data Model
#### Schema

`crime_fact`
* primary key: date, IUCR, domestic, community_area
- type: fact table
    - col: date
    - col: year
    - col: month
    - col: IUCR
    - col: domestic
    - col: community_area
    - col: crime_count

`weather_dim`
* primary key: DATE
* type: dimension table
    - col: DATE
    - col: PRCP
    - col: SNOW
    - col: SNWD
    - col: TMAX
    - col: TMIN

`iucr_dim`
* primary key: IUCR code
* type: dimension table
    - col: IUCR
    - col: primary_description
    - col: secondary_description



#### Mapping of Data Pipeline
1. Load CSVs from s3
2. Create Spark SQL views
3. Create Fact and Dimension tables
    - Crime fact
    - Weather dim
    - IUCR dim
4. Perform data quality checks
    - test data load row counts
    - check for null values
    - test primary keys in both raw uploads and clean tables
5. Write as parquet files back to s3     

### Run ETL

#### Load CSVs from s3

In [4]:
crime_raw = spark.read.csv("s3a://chicago-crime-ddd/Chicago_Crime_2001_to_Present.csv", header=True)
weather_raw = spark.read.csv("s3a://chicago-crime-ddd/chicago-weather.csv", header=True)
IUCR_raw = spark.read.csv("s3a://chicago-crime-ddd/IUCR_Codes.csv", header=True)

#### Create Spark SQL views

In [5]:
crime_raw.createOrReplaceTempView("crime_raw")
weather_raw.createOrReplaceTempView("weather_raw")
IUCR_raw.createOrReplaceTempView("IUCR_raw")

#### Create fact and dimension tables 
##### Create CRIME fact table

In [6]:
crime_fact = spark.sql('''
                       SELECT DISTINCT
                           to_date(concat(substr(Date, 7, 4), '-', substr(Date, 1, 2), '-', substr(Date, 4, 2)), 'yyyy-MM-dd') AS date,
                           CAST(TRIM(Year) AS string) AS year,
                           lpad(CAST(month(to_date(concat(substr(Date, 7, 4), '-', substr(Date, 1, 2), '-', substr(Date, 4, 2)), 'yyyy-MM-dd')) AS string), 2, '0') AS month,
                           CAST(TRIM(IUCR) AS string) AS IUCR,
                           CAST(TRIM(Domestic) AS string) AS domestic,
                           CAST(TRIM(coalesce(`Community Area`, 'no_community_reported')) AS string) AS community_area,
                           COUNT(distinct ID) as crime_count
                       FROM crime_raw          
                       WHERE
                           `Case Number` IS NOT NULL
                           AND
                           `Community Area` IS NOT NULL
                       GROUP BY 1,2,3,4,5,6
                       ORDER BY date
                       ''')

##### Create WEATHER dimension table

In [11]:
weather_dim = spark.sql('''
                        SELECT
                            to_date(DATE, 'yyyy-MM-dd') AS DATE,
                            CAST(PRCP AS FLOAT) AS PRCP,  
                            CAST(SNOW AS FLOAT) AS SNOW,                                    
                            CAST(coalesce(SNWD, 0.0) AS FLOAT) AS SNWD,
                            CAST(TMAX AS INT) AS TMAX,  
                            CAST(TMIN AS INT) AS TMIN
                        FROM weather_raw          
                        ''')

##### Create IUCR dimension table

In [12]:
IUCR_dim = spark.sql('''
                     SELECT
                         CAST(TRIM(IUCR) AS string) AS IUCR,
                         CAST(TRIM(`PRIMARY DESCRIPTION`) AS string) AS primary_description,
                         CAST(TRIM(`SECONDARY DESCRIPTION`) AS string) AS secondary_description
                     FROM IUCR_raw          
                     ''')

#### Perform Data Qualty Checks
##### Test that data loads match expected values

In [13]:
def upload_row_count_check(df, target_row_count):    
    row_count = df.count()
    
    if row_count != target_row_count:
        print(f"Upload row count test fail. Row count does not match expected number of rows")
    else:
        print(f"Test passed! Upload was sucessfull")

In [14]:
print("crime_raw row count test")
upload_row_count_check(crime_raw, 6975703)

print("weather_raw row count test")
upload_row_count_check(weather_raw, 6836)

print("IUCR_raw row count test")
upload_row_count_check(IUCR_raw, 401)

crime_raw row count test
Test passed! Upload was sucessfull
weather_raw row count test
Test passed! Upload was sucessfull
IUCR_raw row count test
Test passed! Upload was sucessfull


##### Test that variables of interest do not contain unsuspected null values in data uploads

In [15]:
def check_col_for_nulls(df, column_name, threshold=1, stat='identity'):
        
    null_count = df.where(col(column_name).isNull()).count()
    row_count = df.count()
    percent_null = ((null_count / row_count) * 100.0)
    
    print(f"----null column check / {column_name}----")
    print(f"null count: {null_count}")
    print(f"row count: {row_count}")
    print(f"percent nulls in column: {percent_null}")
    print(f"threshold: {threshold}")
    print(f"stat: {stat}")
    
    if stat == 'identity':
        if null_count >= threshold:
            print("Test failed: null threshold met or supassed")
        else:
            print("Null test passed")
            
    if stat == 'percent':
        if percent_null >= threshold:
            print("Test failed: threshold met or supassed")
        else:
            print("Null test passed")

In [16]:
print("crime_raw null value checks")
check_col_for_nulls(crime_raw, 'Date', threshold=1, stat='identity')
check_col_for_nulls(crime_raw, 'IUCR', threshold=1, stat='identity')
check_col_for_nulls(crime_raw, 'ID', threshold=1, stat='identity')
check_col_for_nulls(crime_raw, '`Community Area`', threshold=10, stat='percent')

crime_raw null value checks
----null column check / Date----
null count: 0
row count: 6975703
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed
----null column check / IUCR----
null count: 0
row count: 6975703
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed
----null column check / ID----
null count: 0
row count: 6975703
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed
----null column check / `Community Area`----
null count: 613494
row count: 6975703
percent nulls in column: 8.794726495666458
threshold: 10
stat: percent
Null test passed


In [17]:
print("weather_raw null value checks")
check_col_for_nulls(weather_raw, 'DATE', threshold=1, stat='identity')
check_col_for_nulls(weather_raw, 'PRCP', threshold=1, stat='identity')
check_col_for_nulls(weather_raw, 'SNOW', threshold=1, stat='identity')
check_col_for_nulls(weather_raw, 'SNWD', threshold=1, stat='percent')
check_col_for_nulls(weather_raw, 'TMAX', threshold=1, stat='identity')
check_col_for_nulls(weather_raw, 'TMIN', threshold=1, stat='identity')

weather_raw null value checks
----null column check / DATE----
null count: 0
row count: 6836
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed
----null column check / PRCP----
null count: 0
row count: 6836
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed
----null column check / SNOW----
null count: 0
row count: 6836
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed
----null column check / SNWD----
null count: 31
row count: 6836
percent nulls in column: 0.45348156816851964
threshold: 1
stat: percent
Null test passed
----null column check / TMAX----
null count: 0
row count: 6836
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed
----null column check / TMIN----
null count: 0
row count: 6836
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed


In [18]:
print("IUCR_raw null value checks")
check_col_for_nulls(IUCR_raw, 'IUCR', threshold=1, stat='identity')
check_col_for_nulls(IUCR_raw, '`PRIMARY DESCRIPTION`', threshold=1, stat='identity')
check_col_for_nulls(IUCR_raw, '`SECONDARY DESCRIPTION`', threshold=1, stat='identity')

IUCR_raw null value checks
----null column check / IUCR----
null count: 0
row count: 401
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed
----null column check / `PRIMARY DESCRIPTION`----
null count: 0
row count: 401
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed
----null column check / `SECONDARY DESCRIPTION`----
null count: 0
row count: 401
percent nulls in column: 0.0
threshold: 1
stat: identity
Null test passed


##### Test Primary Keys of raw and cleaned tables

In [19]:
def test_primary_key(df, pk_column):
    
    row_count = df.count()
    pk_test_count = df.select(pk_column).distinct().count()
    
    if pk_test_count != row_count:
        print(f"Primary key test failed. {pk_column} is not the primary key of this table")
    else:
        print(f"Test passed! {pk_column} is the primary key of this table")    

- Test PK of raw tables

In [20]:
print("crime_raw pk test")
test_primary_key(crime_raw, 'ID')

print("weather_raw pk test")
test_primary_key(weather_raw, 'DATE')

print("IUCR_raw pk test")
test_primary_key(IUCR_raw, 'IUCR')

crime_raw pk test
Test passed! ID is the primary key of this table
weather_raw pk test
Test passed! DATE is the primary key of this table
IUCR_raw pk test
Test passed! IUCR is the primary key of this table


- Test PK of final fact and dimension tables

In [10]:
print("weather_raw pk test")
test_primary_key(weather_raw, 'DATE')

print("IUCR_raw pk test")
test_primary_key(IUCR_raw, 'IUCR')

# manual test for crime_fact
print("crime_fact pk test")
crime_fact.createOrReplaceTempView("crime_fact")
spark.sql('''  
          SELECT DISTINCT
                 COUNT(*) AS row_count,                 
                 COUNT(distinct date || IUCR || domestic || community_area) as crime_composite_key
          FROM crime_fact
          ''').show()

weather_raw pk test
Test passed! DATE is the primary key of this table
IUCR_raw pk test
Test passed! IUCR is the primary key of this table
crime_fact pk test
+---------+-------------------+
|row_count|crime_composite_key|
+---------+-------------------+
|  4502692|            4502692|
+---------+-------------------+



### Write tables to parquet files

Convert Spark SQL tables to Pandas df

In [13]:
crime_fact = crime_fact.toPandas()
weather_dim = weather_dim.toPandas()
IUCR_dim = IUCR_dim.toPandas()

Write tables to s3 as Parquet files

In [None]:
crime_fact.write.partitionBy('year').parquet(os.path.join("s3a://chicago-crime-ddd/", 'crime_fact'), 'overwrite')
weather_dim.write.parquet(os.path.join("s3a://chicago-crime-ddd/", 'weather_dim'), 'overwrite')
IUCR_dim.write.parquet(os.path.join("s3a://chicago-crime-ddd/", 'IUCR_dim'), 'overwrite')

## Data dictionary 

`crime_fact`
* date
    - Day crime occured
* year
    - Year crime occured
* month
    - Month crime occured
* IUCR
    - Illinois Unifrom Crime Reporting code. This is directly linked to the Primary Type and Description
* domestic
    - Indicates whether the incident was domestic-related as defined by the Illinois Domestic Violence Act 
* community_area
    - Indicates the community area where the incident occurred. Chicago has 77 community areas.
* crime_count
    - Count of crimes at the day, IUCR, domestic, community area level

`weather_dim`
* DATE
    - Day of weather statistics 
* PRCP
    - Precipitation (inches)
* SNOW
    - Snowfall (inches)
* SNWD
    - Snow depth (inches)
* TMAX
    - Max daily temp (F)
* TMIN
    - Min daily temp (F)

`iucr_dim`
* IUCR
    - Illinois Unifrom Crime Reporting code
* primary_description
    - Main description of crime. Example: Homicide 
* secondary_description
    - Secondary descrition of crime. Example: Aggravated: Handgun 

### Project Write Up

* *Clearly state the rationale for the choice of tools and technologies for the project.*
    - Spark was chosen because the primary aim of the project is the clean, verify, and store the data in a format that allows data scientists and data analyst the ability to quickly load the tables and run timeseries and machine learning algorthms.
    - By using the Spark and s3, a data scientist will only have to load the parquet files, which only takes a few seconds, and then they can immediatly run timeseries and machine learning algorthms in the same environment.
    - Because the primary aim is to run statistical models, and not to have the data availble to perform descriptive, SQL analyses -- writing the data out as a parquet file was chosen over loading it into a database. Writing the data out as parquet files and storing it on s3 is also a lot cheaper than storing it in a Postgres or Redshift database.     


* *Propose how often the data should be updated and why.*
    - The data should be updated monthly. If there are seasonal patterns, they will month likely be detected at the month level. 
    - Month level aggregations will be frequent. Therefore, it is important that data is not upload until there is a complete month's of data to upload, as partial months could lead to deceptive results. 


* *Write a description of how you would approach the problem differently under the following scenarios:*
 * The data was increased by 100x.
     - Since I am in spark, I am good to go. As the data increases, I can just use more machines and pool more memory. Especially since the primary goal is to run machine learning learning algorthms.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - In this situation, it would be best to template my data pipeline using Airflow. By using Airflow, I can easily schedule and monitor my pipeline. It will also be really easy to check the status of the pipeline through Airflow's GUI and to quickly detech errors. 
 * The database needed to be accessed by 100+ people.
     - If the database needed to be accessed by 100+ people, then a true database would need to be created. Since the purpose of this data piple line is to perform data analysis, the two best choices would either be Postgres or Redshift. Postgres is a cheaper and more straightforward option, and with the data at its current size, would be plenty efficient. But even with the data at its current size, Redshift could potentially be more performant. And as the data continues to grow, if say the db is around for the next 10 years, then using a distributed SQL framwork like Redshift is the right tool for the job.

## Stop Spark

In [21]:
spark.stop()