# New York City Crime Data

## Project Summary
To mitigate crime it is essential to understand which factores drives lawless behaviour. This project creates a data model that can be used for analyzing crimes between 2012 and 2017 in New York City. The data model consist of crime complaint data, demographics of New York City's neighbourhoods and weather data. These are examples of questions that can be answered using the data model:
* *Which borough has the most crime?*
* *What type of crime is most common for each borough?*
* *Does the weather effect the number of crime reports?*
* *What charcterizes neighbourhoods with a high level of crime?*

The project follows the following steps which is described further in the next sections:
* 1. Scope the Project and Gather Data
* 2. Explore and Assess the Data
* 3. Define the Data Model
* 4. Run ETL to Model the Data
* 5. Complete Project Write Up

An Jupyter Notebook running on an EMR cluster have been used to gather data from the data sources and perform the ETL process, as well as the quality checks.

## 1 Project scope and data sources

### 1.1 Scope 
The scope of this project is to create a data model that can be used to investigate crime statistics of New York City using weather data and demographics. 

### 1.2 Data sources
This project combine data on reported crimes, weather and the demographics of New York City. The data sources are elborated on below.

#### Crime Data
The crime data is extracted from NYPD Complaint Data Historic from [NYC Open Data](https://data.cityofnewyork.us/Public-Safety/NYPD-Complaint-Data-Historic/qgea-i56i) and have been accessed using their API which is powered by Socrata. The dataset includes all valid felony, misdemeanor, and violation crimes reported to the New York City Police Department (NYPD) from 2006 - 2019, for this project data for 2012 - 2017 are extracted. The collected data comprises daily data on borough level.

#### Demographic Data
The demographic data consist of yearly population and income estimates on borough level and is gathered from the [American Community Survey](https://www.census.gov/data/developers/data-sets/acs-5year.html) (acs) by US Census Buerau. The data is extraced using the [census library](https://pypi.org/project/census/) which is a simple wrapper for the United States Census Bureau's API.

An API key is needed to access the data. It can be retreived [here](https://www.census.gov/data/developers/guidance/api-user-guide.html)

#### Weather data
The weather data contains weather data for 36 cities around the world (including New York City) and have been downloaded from [Kaggle](https://www.kaggle.com/datasets/selfishgene/historical-hourly-weather-data?select=city_attributes.csv). The collected data comprises hourly temprature and weather descriptions on city level for 2012 to 2017.

See the data dictionary in *Section 5* for a more detailed description of the used data fields. 

In [None]:
sc.install_pypi_package("boto3")

In [None]:
sc.install_pypi_package("census")

In [None]:
import json
import boto3
from census import Census
from urllib.request import urlopen

In [None]:
# connect to Amazone S3
s3 = boto3.resource('s3')

In [None]:
#### LOAD CRIME DATA ####

# data source
url = "https://data.cityofnewyork.us/resource/qgea-i56i.json"

# query to get the correct date interval
date_interval = "$where=RPT_DT%20between%20'2012-01-01T00:00:00'%20and%20'2017-12-31T00:00:00'&"

# number of rows (max 50.000)
limit = 50000

# start row
offset = 0

# call api for each page
while offset < 3000000:
    
    # create path
    path = url + f"?{date_interval}$limit={limit}&$offset={offset}"

    # read data
    jsonData = urlopen(path).read().decode('utf-8')
    
    print(path)
    
    # stop looping when current page is empty
    if len(jsonData) <= 3:
        print("Final page reached")
        break;
    
    # print current start row
    print(offset)
    
    # save data to S3
    s3.Bucket('udacity-mikb').put_object(Key=f"nyc_crime_data/crime_data_{offset}.json", Body=jsonData)
    
    offset += 50000

In [None]:
#### LOAD CENSUS DATA ####

# set API-key
c = Census("[INSERT API KEY]")

# list with relevant years
years = [2017, 2016, 2015, 2014, 2013, 2012]

# call api for all years
for y in years:
    # load data (returns list) 
    jsonData = c.acs5.state_county_subdivision(('NAME','B01003_001E','B05004_001E','B19013_001E','B06012_001E','B06012_002E'), '36', Census.ALL, Census.ALL, year=y)

    # add year to data
    jsonData = [dict(item, year=y) for item in jsonData]

    # convert list to json format
    jsonData = json.dumps(jsonData)
    
    # save data to S3
    s3.Bucket('udacity-mikb').put_object(Key=f"nyc_demographic_data/demographic_data_{y}.json", Body=jsonData)

## 2 Explore and Assess the Data
All data sets have been explored to understand what cleaning and transformation steps that should be implemented in the ETL Pipeline.

### Crime data
The Crime Data consist of 35 columns and 2.919.428 rows. Not all columns are relvant for the final data model and must therefore be excluded while processing the data. There is no corrupted records to be adressed. However, if more data is added to the table this must be reconsidered. All columns are read as strings, hence it is necessary to cast columns containing integers and dates. All the complaint reports in the data set has an id, when including the id there is no duplicate values, however we find duplicate values when excluding the id from the table. For the purpose of the project we assume that these are seperate incidents and hence not duplicates.

Columns for age group, sex and race of the suspect contains null values in around 50% of the data set. Additionally about 20% of the data set is missing the age group of the victim. It can be considered to exclude these columns due to the high amount of null values. For this case we have decided to keep them, however one must be caution to this when analyzing the data. All other relevant columns have an accepted number of null values. The table below show null values for all the extraced columns.

|cmplnt_num|cmplnt_fr_dt|rpt_dt|boro_nm|ofns_desc|pd_desc|crm_atpt_cptd_cd|law_cat_cd|prem_typ_desc|susp_age_group|susp_sex|susp_race|vic_age_group|vic_race|vic_sex|
|----------|------------|------|-------|---------|-------|----------------|----------|-------------|--------------|--------|---------|-------------|--------|-------|
|         0|         128|     0|   2106|     5972|   2066|               2|         0|        12723|       1769362| 1226867|  1226867|       585698|       6|      6|

### Demographic data
The demographic data has 10 columns. It contains data for several neighbourhoods in the US. When extracting the neighbourhoods in New York 30 rows are left. There is no dupliactes or null values in the remaining data set. The column names are not easily understandable and must be changed to something more explanatory. Furthermore, we want to include the poverty rate of each neighbourhood which must be calculated based on column B06012_001E (population with determined poverty status) and B06012_002E (population below poverty level). Below is the original column names and their meaning. 

**B01003_001E:** Population  
**B05004_001E:** Median Age  
**B19013_001E:** Median Household Income  
**B06012_001E:** Population whom poverty status is determined  
**B06012_002E:** Population below poverty level  

### Weather data
The weather data is extracted from two different csv-files, one with hourly temprature data and one with hourly weather descriptions. The data set must bed joined to one weather table in the ETL pipeline. Both data sets contain data for several cities, however we only want data for New York City and must therefore drop all other columns. When only keeping New York City each of the data sets containt 2 columns and 45253 rows. The data sets has 793 missing values which make up 1.75 % of the data set. 

The weather data consist of hourly data, however since the crime data only have reports on daily level the weather data must be aggregated to daily level as well. To aggreagte we find the daily average temprature for each day for the temprature data, while for the weather description we use the mode for each day.

In [None]:
from pyspark.sql.functions import col, count, when, to_date, lit

#### CODE FOR EXPLORING CRIME DATA

In [None]:
#### EXPLORE CRIME DATA ####
df = spark.read.json("s3://udacity-mikb/nyc_crime_data/*.json", multiLine = True)

In [None]:
# check number of columns
df.printSchema()

In [None]:
# check data types
df.head()

In [None]:
# check number of rows
df.count()

In [None]:
# exclude columns that are not relevant
df = df.select("cmplnt_num",
                "cmplnt_fr_dt",
                "rpt_dt",
                "boro_nm",
                "ofns_desc",
                "pd_desc",
                "crm_atpt_cptd_cd",
                "law_cat_cd",
                "prem_typ_desc",
                "susp_age_group",
                "susp_sex",
                "susp_race",
                "vic_age_group",
                "vic_race",
                "vic_sex")

In [None]:
# check number of duplicates
df.count() - df.dropDuplicates().count()

In [None]:
# check number of duplicates when excluding id column
df.count() - df.drop("cmplnt_num").dropDuplicates().count()

In [None]:
# check number of null values for each column
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

#### CODE FOR EXPLORING DEMOGRAPHIC DATA

In [None]:
#### EXPLORE DEMOGRAPHIC DATA ####
df = spark.read.json("s3://udacity-mikb/nyc_demographic_data/*.json", multiLine = True)

In [None]:
# check data types
df.printSchema()

In [None]:
# check columns
df.head()

In [None]:
# select only relevant cities
df = df.filter(col("county subdivision").isin({"08510", "10022", "44919", "60323", "70915"}))

In [None]:
# check number of rows
df.count()

In [None]:
# check duplicates
df.count() - df.dropDuplicates().count()

In [None]:
# check number of null values for each column
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

In [None]:
df.show(30)

#### CODE FOR EXPLORING WEATHER DATA

In [None]:
#### EXPLORE TEMPRATURE DATA ####
df = spark.read.csv("s3://udacity-mikb/nyc_weather_data/temperature.csv", header=True)

In [None]:
# check data types
df.printSchema()

In [None]:
# check columns
df.head()

In [None]:
# select new york city only
df = df.select("datetime", "new york")

In [None]:
# count rows
df.count()

In [None]:
# check duplicates
df.count() - df.dropDuplicates().count()

In [None]:
# check number of null values for each column
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

In [None]:
#### EXPLORE WEATHER DATA ####
df = spark.read.csv("s3://udacity-mikb/nyc_weather_data/weather_description.csv", header=True)

In [None]:
# check data types
df.printSchema()

In [None]:
# check columns
df.head()

In [None]:
# select new york city only
df.select("datetime", "new york")

In [None]:
# check duplicates
df.count() - df.dropDuplicates().count()

In [None]:
# check number of null values for each column
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

## 3. ETL Pipeline
The ETL pipeline creates a data lake by extracting the raw data which is was loaded to S3, clean and transform the data, and load the cleaned data back to S3. A date lake is chosen due to its ability to store big data and the schema flexibility. The data model use a star schema logic which ensures that queries are handeled efficient. The crime table act as the fact table and the weather and demograhpic tables act as dimensional tables. The demographics data and the crime tables is connected by year and neighbourhood, while the weather data can be connected using the date fields.  

### How to run the ETL Pipeline
The code in this project is set up to be run on a jupyter notebook on an EMR cluster in AWS. The notebook is installed with release 5.30.0 and the applications chosen is Spark 2.4.8 on Hadoop 2.10.1 YARN and Zeppelin 0.10.0. Data is stored on S3. When running the code the input and output files must be changed to match your preferred storage. Before running the ETL pipeline you must also retrive data from the data sources. Information on this is found in *Section 1*. When you have retrived the raw data the ETL pipeline can be run by executing the code in this section sequentially.

### 3.1 Extract
The data sets are extracted from S3 on AWS where they are stored as JSON and CSV files. 

### 3.2 Transform
The extracted data is cleaned and transformed in to three tables to fit the star schema: (i) crime_table, (ii) demographic_table, (iii) weater_table. All irrelevant columns and rows have been removed, missing data and duplicates are handled, and aggregations have been performed when necessary. The column with borough names in the crime data and demographic data have been transformed to have the same formatting so that the tables can be joined together by the column. The crime table and demographic table must also use year to be joined, hence a year column have been made and extracted the report date column in the crime data. Lastly, the weather data have been aggregated on daily level so that it can be connected to the crime table through the date fields. The cleaning and tranformation steps are also described in further detail in *Section 2*.

### 3.3 Load
The transformed data get loaded back to new folders in S3 as parquet files.

In [None]:
import os
from pyspark.sql.window import Window
from pyspark.sql.functions import col, to_timestamp, to_date, year, regexp_extract, initcap, count, when, row_number

In [None]:
input_data = "s3://udacity-mikb/"
output_data = "s3://udacity-mikb/"

In [None]:
# get filepath to crime data file
crime_data = os.path.join(input_data, "nyc_crime_data/*.json")
    
# read crime data file
crime_data = spark.read.json(crime_data, multiLine = True)

In [None]:
crime_data.head()

In [None]:
def process_crime_data(input_data, output_data):
    """
    Process, create tables from crime data loads data back.
    
    Create song and artist tables by processing song data from S3.
    Load tables to S3 using parquet files and partitioning. 
    
    Paramteres:
    arg1 (SparkSession): running spark session
    arg2 (str): part of path for input data
    arg3 (str): part of path for output data
    
    """
    
    print("processing crime data")
    
    # get filepath to crime data file
    crime_data = os.path.join(input_data, "nyc_crime_data/*.json")
    
    # read crime data file
    crime_data = spark.read.json(crime_data, multiLine = True)
    
    # cast compliant number to integer
    crime_data = crime_data.withColumn("cmplnt_num", col("cmplnt_num").cast("Integer"))
  
    # cast compliant from data and report date to date
    crime_data = crime_data.withColumn("cmplnt_fr_dt", to_date("cmplnt_fr_dt"))
    crime_data = crime_data.withColumn("rpt_dt", to_date("rpt_dt"))
    
    # extract year from report date
    crime_data = crime_data.withColumn("rpt_year", year("rpt_dt"))
    
    # capitalize borough name
    crime_data = crime_data.withColumn("boro_nm", initcap(col("boro_nm")))
    
    # extract relevant columns and cast cmplnt_num and date variables
    crime_table = crime_data.select("cmplnt_num",
                "cmplnt_fr_dt",
                "rpt_dt",
                "rpt_year",
                "boro_nm",
                "ofns_desc",
                "pd_desc",
                "crm_atpt_cptd_cd",
                "law_cat_cd",
                "prem_typ_desc",
                "susp_age_group",
                "susp_sex",
                "susp_race",
                "vic_age_group",
                "vic_race",
                "vic_sex").dropDuplicates(["cmplnt_num"])
    
    # load data back to S3
    crime_table.write.mode("overwrite").parquet(os.path.join(output_data, "nyc_crime_table"))

In [None]:
def process_demographic_data(input_data, output_data):
    """
    Process, create tables from log data and loads data back.
    
    Create demographic tables by processing demographic data from S3.
    Load tables to S3 using parquet files and partitioning. 
    
    Paramteres:
    arg1 (SparkSession): running spark session
    arg2 (str): part of path for input data
    arg3 (str): part of path for output data
    
    """
    
    print("processing demographic data")
    
    # get filepath to demographic data file
    demographic_table = os.path.join(input_data, "nyc_demographic_data/*.json")

    # read demographic data file
    demographic_table = spark.read.json(demographic_table)
    
    # extract neighbourhoods of new york
    demographic_table = demographic_table \
    .filter(col("county subdivision").isin({"08510", "10022", "44919", "60323", "70915"}))
    
    # rename columns
    demographic_table = demographic_table.withColumnRenamed("B01003_001E", "population")
    demographic_table = demographic_table.withColumnRenamed("B19013_001E", "income_median")
    demographic_table = demographic_table.withColumnRenamed("B05004_001E", "age_median")
    
    # cast year from string to int
    demographic_table = demographic_table.withColumn("year", col("year").cast("Integer"))

    # add corrct formatting on borough column
    demographic_table = demographic_table.withColumn("borough", regexp_extract(col("NAME"), "(.+) borough", 1))

    # create poverty rate column
    demographic_table = demographic_table.withColumn("poverty_rate",
                                                     demographic_table["B06012_002E"]/demographic_table["B06012_001E"])
    
    # select columns
    demographic_table =  demographic_table.select("year",
                                         "borough",
                                         "population",
                                         "age_median",
                                         "income_median",
                                         "poverty_rate")
    
    # load data back to S3
    demographic_table.write.mode("overwrite").parquet(os.path.join(output_data, "nyc_demographic_table"))

In [None]:
def process_weather_data(input_data, output_data):
    """
    Process, create tables from log data and loads data back.
    
    Create demographic tables by processing demographic data from S3.
    Load tables to S3 using parquet files and partitioning. 
    
    Paramteres:
    arg1 (SparkSession): running spark session
    arg2 (str): part of path for input data
    arg3 (str): part of path for output data
    
    """
    
    print("processing weather data")
    
    # get filepaths to weather data file
    temprature_data = os.path.join(input_data, "nyc_weather_data/temperature.csv")
    weather_data = os.path.join(input_data, "nyc_weather_data/weather_description.csv")
    
    # load weather data files
    temprature_data = spark.read.csv(temprature_data, header=True)
    weather_data = spark.read.csv(weather_data, header=True)
    
    # select only date and new york
    temprature_data = temprature_data.select("datetime", "new york")
    weather_data  = weather_data.select("datetime", "new york")
    
    # create date column wihtout time stamp
    temprature_data = temprature_data.withColumn("date",to_date("datetime"))
    weather_data = weather_data.withColumn("date",to_date("datetime"))
    
    # aggregate temprature data
    temprature_data = temprature_data.groupBy("date").agg({"new york": "avg"})
    
    # convert degrees from kelvin to celcius
    temprature_data = temprature_data.withColumn("avg_temperature", (temprature_data["avg(new york)"] - 273.15))
                                                                                     
    # remove column with kelvin values
    temprature_data = temprature_data.drop("avg(new york)")
    
    # find mode in weather description data
    weather_data = weather_data.groupBy('date', 'new york').agg({'datetime': 'count'})
    
    windowDate = Window.partitionBy("date").orderBy(col("date").asc(), col("count(datetime)").desc())
    weather_data = weather_data.withColumn("row",row_number().over(windowDate)) \
                  .filter(col("row") == 1).drop("row")
    
    # remove row number column and count column
    weather_data = weather_data.drop("row")
    weather_data = weather_data.drop("count(datetime)")
    
    # rename columns
    weather_data = weather_data.withColumnRenamed("new york", "weather_desc")
    
    # join tables
    weather_table = temprature_data.join(weather_data, ["date"])
    
    # load data back to S3
    weather_table.write.mode("overwrite").parquet(os.path.join(output_data, "nyc_weather_table"))

In [None]:
# process crime data
process_crime_data(input_data,output_data)    

In [None]:
# process crime data
process_demographic_data(input_data, output_data)

In [None]:
# process crime data
process_weather_data(input_data,output_data)

## 4. Data Quality Checks
To ensure that the piplines ran as expected a few data quality checks have been implemented. The first one checks if the files contain data, while the second one check for duplicate values. The quality checks are only performed on the weather and crime table as the demographic table is so small that it can easily be checked by viewing the table.

In [None]:
def check_row_count(df):
    """
    Check if table is empty
    
    Paramteres:
    arg1 (dataframe): table to check
    
    """
    
    df = df

    row_count = df.count()

    if row_count == 0:
        raise ValueError("Data has zero columns")

    print(f"Data has {row_count} rows")

In [None]:
def check_unique_value(df, from_column, to_column):
    """
    Check if column has unique values
    
    Paramteres:
    arg1 (dataframe): datafram to check
    arg2 (int): from column to check
    arg3 (int): to column to check
    
    """
    
    df = df

    f = from_column
    t = to_column

    no_duplicates = df.select(df.columns[f:t]).count() - df.select(df.columns[f:t]).dropDuplicates().count()

    if no_duplicates > 0:
        raise ValueError(f"The column has {no_duplicates} duplicate values")

    print(f"The column has unqiue values")

In [None]:
# CRIME TABLE INPUT
df = spark.read.parquet("s3://udacity-mikb/nyc_crime_table/*.parquet")
from_column = 0
to_column = 1

In [None]:
# WEATHER TABLE INPUT
df = spark.read.parquet("s3://udacity-mikb/nyc_weather_table/*.parquet")
from_column = 0
to_column = 1

In [None]:
# RUN QUALITY CHECK
check_row_count(df)

In [None]:
check_unique_value(df, from_column, to_column)

## 5 Data dictionary 
Below is a description of all the columns in the data model

### Crime tables
cmplnt_num: The id of complaint   
cmplnt_fr_dt: The date of event
rpt_dt: The date when event was reported to the police  
rpt_year: The year when event was reported to the police  
boro_nm: The name of the borough in which the incident occurred  
ky_cd: Short classification of event   
pd_cd: Long classification of event  
crm_atpt_cptd_cd: Indicator of whether crime was successfully completed or attempted, but failed or was interrupted prematurely  
law_cat_cd: Level of offense: felony, misdemeanor, violation  
prem_typ_desc: Specific description of premises; grocery store, residence, street, etc.  
susp_age_group: Age group of suspect  
susp_sex: Sex of suspect  
susp_race: Race of suspect  
vic_age_group: Age group of victim  
vic_race: Race of victim  
vic_sex: Sex of victim  

### Demograhpics table
year: Year of estimates  
borough: Neigbourhood in New York City  
population: Number of inhabitants  
age_median: Median age of popluation  
income_median: Median income of population  
poverty_rate: Number of people below the poverty level  

### Weather table
date: Date  
temp: Average temprature in celcius  
weather_desc: The weather type with highest frequency that day  

## 5 Discussion of data model

### 5.1 Techology choice
This project use a date lake on the cloud platform AWS together with pysprak to ensure an efficient work flow and satisfy the need for flexibilty. The fact table of the data model contains crime data for 2012-2017 which comprises almost 3 million rows. Furthermore, the data model will be used for ad hoc analysis which requeries a great extent of flexibility both in term of querying, but also in terms of adding new data sources to the model. Through schema-on-read the chosen technology ensures the needed flexibilty for the project, while paralleism makes handeling the data load efficient. 

### 5.2 Updateing the data
This data model builds upon historical data, hence there is no need to update the data model as is. However, dealing with more recent data would be useful. We suggest that the data is updated monthly or quartarly in that case as we find it most interesting to do analysis for longer timeperiods and compare the results to previous timeperiods.

### Potential new user needs and requirement changes

#### Increasing data by 100x
If increasing the data by 100x one must reconsider the chosen hardware of the EMR cluster to ensure that it can handle the amount of data.

#### The data populates a dashboard that must be updated on a daily basis by 7am every day.
If the data is being used for a dashboard that must be updated on a daily basis at 7am every day more logic to capture potential data errors must be implemented and logged. For instance more quality checks should be added, as well as implenting logic during the processing that allow to capture potential corrupted records, missing values or duplicates that can occur when new data is added. Further more, extracting data from the sources and the ETL pipeline should be automated using AirFlow or another similar tool.
 
#### The database needed to be accessed by 100+ people.
If the database needs to be accessed by 100+ people one must ensure that access control is configurated properly so that each individual only has access to what they need and nothing more. Additionally, it should be reconsidered if a data lake is the right format. A data lake is very flexible, but the flexiblity in combination of many users can make the data lake unorganized and data governance issues can occur. Hence, a more strict system such as a dataware house might be better in this case.