# Project Title
### Data Engineering Capstone Project

#### Project Summary
This projects uses Spark to consume 3 different datasets in order to analyize, clean, and then output parquet files for future analysis

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import configparser
import os
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType, DateType

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will consume and analyze data on NYC Crime stats, population count, and temperature. It will use Spark to analyze the datasets and then write parquet files to an S3 storage for downstream analysis in order the answer the following questions:
1) What differences in crime exist between the 5 boroughs?
2) What is the crime-per-population count in the 5 boroughs?
3) Is there a correlation between rate of crime and the temperature?


#### Describe and Gather Data 
**NYPD Complaint Data**: Comes from a Kaggle Data set and describes crimes reported the various NYC burrows from 2013 - 2015.

**World Temperature Data**: comes from Kaggle and contains average weather temperatures hourly by city in Kelvin from 2013 - 2017.

**NYC Borough Population**: Provided by NYC Open Data. Lists the population of each of the 5 boroughs from 1950 through projected 2040. This is calculated every 10 years, so to fit with the rest of the data set timeframe the population count from 2010 will be used.

In [5]:
# Read in the data here
input_path = config['S3']['IN_BUCKET']
census_df = spark.read.json(input_path + 'New_York_City_Population_by_Borough__1950_-_2040.json')
temp_df = spark.read.option('header',True).csv(input_path + 'US_City_Hourly_Temp_2013-2017.csv')
crime_df = spark.read.option('header',True).csv(input_path + 'NYPD_Complaint_Data_Historic.csv')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Census Data: 
    1) Drop unnecessary columns
    2) format the population count as an integer
    3) trim borough names to remove whitespace
    
Temperature Data:
    1) Drop unnecessary columns
    2) drop records with null temperature values
    3) cast values to correct data types
    4) use timestamp value to group and average temperature for each day
    5) convert kelvin data to celsius and fahrenheit 
    6) round values to 2 significant digits
   
Crime Data:
    1) drop unnecessary columns
    2) drop records will null values that will impede analysis
    3) convert text value fields into usable TimeStamp type
    4) cast columns into correct data type(s)
    

In [None]:
#CENSUS DATA#
trunc_census_df = census_df.select(F.col("borough"),
                                 F.col("2010")
                                )

#type case columns
cleaned_census_df = trunc_census_df.withColumn("borough",F.trim(F.col("borough")).cast(StringType())) \
                    .withColumn("2010",F.regexp_replace(F.col("2010"), ",", "").cast(IntegerType()))

#stage dataframe for final output
staging_census_df = cleaned_census_df.select(F.col("borough"),
                                           F.col("2010").alias("pop_count")
                                          ).drop_duplicates()

print('Total Rows: {}'.format(staging_census_df.count()))
staging_census_df.printSchema()
staging_census_df.show()


In [None]:
#TEMPERATURE DATA#
trunc_temp_df = temp_df.select(F.col("datetime").cast(TimestampType()),
                                 F.col("New York")
                                )

#drop null temperature values, convert remaining temps to double
trunc_temp_df = trunc_temp_df.dropna(how='any', subset=['New York'])
trunc_temp_df = trunc_temp_df.withColumn("New York", F.col("New York").cast(DoubleType()))
trunc_temp_df = trunc_temp_df.withColumn("date", F.to_date(F.col("DateTime")))

# Create udf to convert Kelvin temp to Celsius
@F.udf(DoubleType())
def kelvin_to_celsius(x):
    if x:
        return x - 273.15
    return None

#create udf to convert celsius to fahrenheit
@F.udf(DoubleType())
def kelvin_to_fahrenheit(x):
    if x:
        return (x - 273.15) * (9/5) + 32
    return None

#collapse dates, calculate average temperature
avg_temp_df = trunc_temp_df.groupBy("date").agg(F.mean('New York').cast(DoubleType()).alias("kelvin"))

#calculate celcius and farenheit
staging_temp_df = avg_temp_df.withColumn("celsius", kelvin_to_celsius(F.col("kelvin"))) \
                             .withColumn("fahrenheit", kelvin_to_fahrenheit(F.col("kelvin")))

#round temp to 2 decimals
staging_temp_df = staging_temp_df.withColumn("kelvin", F.round(F.col("kelvin"),2)) \
                         .withColumn("celsius", F.round(F.col("celsius"),2)) \
                         .withColumn("fahrenheit", F.round(F.col("fahrenheit"),2))

staging_temp_df.show()

In [None]:
#CRIME DATA#
#limit df to selected columns
trunc_crime_df = crime_df.select(F.col("CMPLNT_NUM"),
                                 F.col("BORO_NM"),
                                 F.col("CMPLNT_FR_DT"),
                                 F.col("CMPLNT_FR_TM"),
                                 F.col("OFNS_DESC"),
                                 F.col("LAW_CAT_CD")
                                )

#drop null complaint date values -- can't tie to specific date-time
#also drop any nulls in crime description or type columns
trunc_crime_df = trunc_crime_df.dropna(how='any', subset=['CMPLNT_FR_DT','CMPLNT_FR_TM','OFNS_DESC','LAW_CAT_CD'])

#combine and format date and time column into single column to generate date value
cleaned_crime_df = trunc_crime_df.withColumn('TimeStamp',F.from_unixtime(F.unix_timestamp( \
                                          F.concat_ws(' ',trunc_crime_df.CMPLNT_FR_DT, trunc_crime_df.CMPLNT_FR_TM), 'MM/dd/yyy HH:mm:ss')))
    
cleaned_crime_df = cleaned_crime_df.withColumn('CMPLNT_FR_DT', F.to_date(F.col("TimeStamp")))

#type case columns
cleaned_crime_df = cleaned_crime_df.withColumn("CMPLNT_NUM",F.col("CMPLNT_NUM").cast(IntegerType())) \
                    .withColumn("CMPLNT_FR_DT",F.col("CMPLNT_FR_DT").cast(DateType())) \
                    .withColumn("OFNS_DESC",F.col("OFNS_DESC").cast(StringType())) \
                    .withColumn("LAW_CAT_CD",F.col("LAW_CAT_CD").cast(StringType())) \
                    .withColumn("BORO_NM",F.initcap(F.col("BORO_NM")).cast(StringType()))

#stage dataframe for final output
staging_crime_df = cleaned_crime_df.select(F.col("CMPLNT_NUM").alias("id"),
                                           F.col("BORO_NM").alias("borough"), 
                                           F.col("CMPLNT_FR_DT").alias("date"),
                                           F.col("OFNS_DESC").alias("crime_desc"),
                                           F.col("LAW_CAT_CD").alias("crime_cat")
                                          )

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Dimension tables:

    crime_categories_table:
        crime_index
        crime_cat
        crime_desc

    boroughs_table:
        borough_index
        borough
        pop_count

    temperature_table:
        temp_index
        temp_date
        kelvin
        celsius
        fahrenheit

    date_table:
        date_index
        date
        dayofweek
        month
        year
        
Fact Table:

    crime_stats_df:
    id
    date_index
    borough_index
    temp_index
    crime_index

#### 3.2 Mapping Out Data Pipelines
1) using above created dataframes join them together to populate the subsequent dimension and fact tables
2) output to parquet files for later analysis



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
#Temperature dimension table
temp_dim_df = staging_temp_df.join(date_dim_df, staging_temp_df.date == date_dim_df.date)\
              .select(date_dim_df.date.alias('temp_date'),
                      F.col("kelvin"),
                      F.col("celsius"),
                      F.col("fahrenheit")
                     ).sort("temp_date").drop_duplicates()

temp_dim_df = temp_dim_df.withColumn('temp_index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))

temp_dim_df.show()

In [None]:
#borough dimension table
borough_dim_df = staging_crime_df.join(staging_census_df, staging_crime_df.borough == staging_census_df.borough)\
                .select(staging_crime_df.borough,staging_census_df.pop_count).sort(F.col("borough")).drop_duplicates()

borough_dim_df = borough_dim_df.withColumn('borough_index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
borough_dim_df.show()


In [None]:
#Crime category dimension table
crime_dim_df = staging_crime_df.select(F.col("crime_desc"),F.col("crime_cat")).drop_duplicates()

crime_dim_df = crime_dim_df.withColumn('crime_index', F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))

crime_dim_df.show()

In [None]:
#Final Fact table for cime stats
fact_dim_df = staging_crime_df.join(date_dim_df, staging_crime_df.date == date_dim_df.date)\
                              .join(crime_dim_df,'crime_desc')\
                              .join(borough_dim_df,'borough')\
                              .join(temp_dim_df,staging_crime_df.date == temp_dim_df.temp_date)\
                              .select(staging_crime_df.id, date_dim_df.date_index, borough_dim_df.borough_index, temp_dim_df.temp_index, crime_dim_df.crime_index)\
                              .sort(date_dim_df.date)\
                              .drop_duplicates()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
def table_exists(df):
    if df is not None:
        return True
    else:
        return False
        
if table_exists(fact_dim_df) & table_exists(crime_dim_df) & table_exists(borough_dim_df) & table_exists(date_dim_df) & table_exists(temp_dim_df):
    print("data quality check passed")
    print()
else:
    print("data quality check failed")

In [None]:
def table_not_empty(df):
    return df.count() != 0 

if table_not_empty(fact_dim_df) & table_not_empty(crime_dim_df) & table_not_empty(borough_dim_df) & table_not_empty(date_dim_df) & table_not_empty(temp_dim_df):
    print("data quality check passed!")
    print()
else:
    print("data quality check failed!")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [None]:
The data included in this project was designed as a one-time snap shot of crime statistics and as such need not necessarily be updated again. However, it can be updated as frequently as supported by fresh data sets and from business requirements on analysis output.

In [None]:
Explanation Spark is chosen for this project as it is known for processing large amount of data fast (with in-memory compute), scales easily with additional worker nodes, has the ability to digest different data formats (e.g. Parquet, CSV, JSON), and integrates nicely with cloud storage like S3 and warehouse like Redshift.

There are also considerations in terms of scaling existing solution.

In [None]:
If the data was increased by 100x:

We could spin up larger instances of EC2s hosting Spark and/or additional Spark work nodes. With added capacity arising from either vertical scaling or horizontal scaling, we should be able to accelerate processing time.

In [None]:
If the data populates a dashboard that must be updated on a daily basis by 7am every day:

In this instance the pipeline could be shifted to Airflow to schedule and automate the data pipeline jobs. Built-in retry and monitoring mechanism can enable us to meet user requirement.

In [None]:
If the database needed to be accessed by 100+ people:

We could host the solution in a cloud data warehouse, with larger capacity to serve more users, and workload management to ensure equitable usage of resources across users.