# 2016 NY-CRIME DATA MODEL
### Data Engineering Capstone Project

#### Project Summary
To building an ETL-Pipeline with Spark I loaded a ny-crime dataset from the OpenData Platform of NY and processed it together with weather and airbnb data from Kaggle. The data gets transformed into an relational data model and passed to an postgreSQL database.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import configparser
from datetime import datetime,timedelta
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
import sql_queries as sql
import pandas as pd
import psycopg2

In [2]:
config = configparser.ConfigParser()
config.read('aws.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config["CREDENTIALS"]['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config["CREDENTIALS"]['AWS_SECRET_ACCESS_KEY']
AWS_ACCESS_KEY=config["CREDENTIALS"]['AWS_ACCESS_KEY_ID']
AWS_SECRET_KEY=config["CREDENTIALS"]['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession.builder \
    .config("fs.s3a.awsAccessKeyId", AWS_ACCESS_KEY)\
    .config("fs.s3a.awsSecretAccessKey", AWS_SECRET_KEY)\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.3,org.postgresql:postgresql:42.2.6")\
    .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
I used a ny crime dataset and joined it with a weather and an airbnb listing dataset. As the data is stored inside an AWS S3 Bucket, Spark downloads the files and stores them in the partitioned parquet format. An ETL-Pipeline written in Spark loads the data, transformes it into the facts and dimensions and writes it to a local postgreSQL database, where the data is stored in several fact and dimension tables. 

#### Describe and Gather Data 

- **For Github**: To load the Data from S3 you have to run `LoadData.ipynb` with your AWS Credentials and Bucket. 
- **For Udacity**: It's already loaded to the Workspace

### NY Crime Dataset

Source: https://www1.nyc.gov/site/nypd/stats/crime-statistics/historical.page

Before loading the file to S3 I reduced the data to the year 2016

In [4]:
nycrime_df = spark.read.parquet("data/nycrime.parquet")

In [5]:
nycrime_df.limit(3).toPandas()

Unnamed: 0,_c0,CMPLNT_NUM,CMPLNT_FR_DT,CMPLNT_FR_TM,CMPLNT_TO_DT,CMPLNT_TO_TM,ADDR_PCT_CD,RPT_DT,KY_CD,OFNS_DESC,...,SUSP_SEX,TRANSIT_DISTRICT,Latitude,Longitude,Lat_Lon,PATROL_BORO,STATION_NAME,VIC_AGE_GROUP,VIC_RACE,VIC_SEX
0,1061614,615854967,09/11/2016,08:40:00,,,52.0,09/11/2016,106,FELONY ASSAULT,...,M,,40.880468411,-73.877197456,"(40.880468411, -73.877197456)",PATROL BORO BRONX,,25-44,WHITE HISPANIC,F
1,983674,618842805,11/09/2016,22:45:00,11/09/2016,22:45:00,113.0,11/09/2016,236,DANGEROUS WEAPONS,...,F,,40.679980738000005,-73.77623390699999,"(40.679980738, -73.776233907)",PATROL BORO QUEENS SOUTH,,UNKNOWN,UNKNOWN,E
2,1077779,781774402,08/30/2016,20:30:00,08/30/2016,20:35:00,6.0,08/30/2016,578,HARRASSMENT 2,...,U,,40.736973397,-74.00905145,"(40.736973397, -74.00905145)",PATROL BORO MAN SOUTH,,45-64,WHITE,F


### Airbnb Dataset

Source: https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data

In [6]:
airbnb_df = spark.read.parquet("data/airbnb.parquet")

In [7]:
airbnb_df.limit(3).toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365


### Weather Dataset

Source: https://www.kaggle.com/mathijs/weather-data-in-new-york-city-2016

In [8]:
clima_df = spark.read.parquet("data/clima.parquet")

In [9]:
clima_df.limit(3).toPandas()

Unnamed: 0,date,maximumtemperature,minimumtemperature,averagetemperature,precipitation,snowfall,snowdepth
0,1-1-2016,42,34,38.0,0.0,0.0,0
1,2-1-2016,40,32,36.0,0.0,0.0,0
2,3-1-2016,45,35,40.0,0.0,0.0,0


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

---

### At first lets see the shapes of the given data

In [10]:
print("NY Crime")
crime_df_count=nycrime_df.count()
print(f"Rows: {crime_df_count}")
print(f"Columns: {len(nycrime_df.columns)}")
print()
print("Airbnb")
airbnb_df_count=airbnb_df.count()
print(f"Rows: {airbnb_df_count}")
print(f"Columns: {len(airbnb_df.columns)}")
print()
print("Weather")
clima_df_count=clima_df.count()
print(f"Rows: {clima_df_count}")
print(f"Columns: {len(clima_df.columns)}")

NY Crime
Rows: 477639
Columns: 36

Airbnb
Rows: 49079
Columns: 16

Weather
Rows: 366
Columns: 7


### Any duplicates?

In [11]:
duplicates={}

duplicates["NY"]=\
nycrime_df.groupBy(nycrime_df.columns)\
    .count()\
    .where(F.col('count') > 1)\
    .select(F.sum('count')).take(1)[0]["sum(count)"] 

duplicates["Airbnb"]=\
airbnb_df.groupBy(airbnb_df.columns)\
    .count()\
    .where(F.col('count') > 1)\
    .select(F.sum('count')).take(1)[0]["sum(count)"] 

duplicates["Weather"]=\
clima_df.groupBy(clima_df.columns)\
    .count()\
    .where(F.col('count') > 1)\
    .select(F.sum('count')).take(1)[0]["sum(count)"] 

print("Duplicates")
for key in duplicates:
    print(f"{key:<8}: {duplicates[key]}")

Duplicates
NY      : None
Airbnb  : 2
Weather : None


### Any NULL values?

In [12]:
def countNull(df):
    nulls_l=[]
    for col in df.columns:
        nulls_d={}
        n_null=df.select(col).filter(F.col(col).isNull()).count()
        if n_null > 0:
            nulls_d["Column"]=col
            nulls_d["Nulls"]=n_null
            nulls_l.append(nulls_d)
    if len(nulls_l)>0:
        display(pd.DataFrame(nulls_l))
    else: print("No Nulls")

#### NY Crime Nulls

In [13]:
countNull(nycrime_df)

Unnamed: 0,Column,Nulls
0,CMPLNT_TO_DT,80981
1,CMPLNT_TO_TM,80778
2,OFNS_DESC,49
3,PD_CD,329
4,PD_DESC,329
5,BORO_NM,335
6,LOC_OF_OCCUR_DESC,96149
7,PREM_TYP_DESC,1894
8,JURISDICTION_CODE,329
9,PARKS_NM,472645


#### Airbnb Nulls

In [14]:
countNull(airbnb_df)

Unnamed: 0,Column,Nulls
0,name,32
1,host_id,185
2,host_name,206
3,neighbourhood_group,185
4,neighbourhood,185
5,latitude,185
6,longitude,185
7,room_type,185
8,price,185
9,minimum_nights,185


#### Weather Nulls

In [15]:
countNull(clima_df)

No Nulls


### Let's dive a bit deeper into the Null values

In [16]:
airbnb_df.withColumn('numNulls', sum(airbnb_df[col].isNull().cast('int') for col in airbnb_df.columns))\
.orderBy(F.desc("numNulls")).groupBy("numNulls").count().toPandas()

Unnamed: 0,numNulls,count
0,15,16
1,14,169
2,3,35
3,2,10016
4,1,158
5,0,38685


In [17]:
airbnb_df.withColumn('numNulls', sum(airbnb_df[col].isNull().cast('int') for col in airbnb_df.columns)).orderBy(F.desc("numNulls")).limit(10).toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,numNulls
0,( 1 Bdrm. Apt ),,,,,,,,,,,,,,,,15
1,By Greenwood cemetery,,,,,,,,,,,,,,,,15
2,COZY WATERFRONT APARTMENT,,,,,,,,,,,,,,,,15
3,SoBro,,,,,,,,,,,,,,,,15
4,near Central Park&,,,,,,,,,,,,,,,,15
5,Beautiful Rooms,,,,,,,,,,,,,,,,15
6,Room Florence,,,,,,,,,,,,,,,,15
7,",6024006,Johnny,Staten Island,St. George,40.64...",,,,,,,,,,,,,,,,15
8,Room Cozy,,,,,,,,,,,,,,,,15
9,SHORT/ LONG TERM STAYS,,,,,,,,,,,,,,,,15


Okay we have some quality issues inside the airbnb dataframe with some bad lines. Let's try to fix this.

In [18]:
airbnb_df.withColumn('numNulls', sum(airbnb_df[col].isNull().cast('int') for col in airbnb_df.columns))\
.filter(airbnb_df.host_id.cast(IntegerType()).isNotNull())\
.orderBy(F.desc("numNulls")).groupBy("numNulls").count().toPandas()

Unnamed: 0,numNulls,count
0,3,15
1,2,10014
2,1,22
3,0,38678


It seems that the id column is also broken without getting filtered by the host_id step we got 9 broken columns left which we have also to filter out

In [19]:
airbnb_df.filter(airbnb_df.host_id.cast(IntegerType()).isNotNull()).filter(airbnb_df.id.cast(IntegerType()).isNull()).count()

9

When we drop out the bad rows by casting the Strings to Integers and dropping the now present Null value in these rows it seems to fix the data from all lines with a high amount of Null values

Let's do the same with the nycrime dataset

In [20]:
nycrime_df.withColumn('numNulls', sum(nycrime_df[col].isNull().cast('int') for col in nycrime_df.columns))\
.orderBy(F.desc("numNulls")).groupBy("numNulls").count().toPandas()

Unnamed: 0,numNulls,count
0,16,76
1,15,10
2,13,232
3,12,25
4,11,4413
5,10,15079
6,9,21626
7,8,94348
8,7,47235
9,6,53963


In [21]:
nycrime_df.withColumn('numNulls', sum(nycrime_df[col].isNull().cast('int') for col in nycrime_df.columns)).orderBy(F.desc("numNulls")).limit(3).toPandas().T

Unnamed: 0,0,1,2
_c0,1116041,966742,1163097
CMPLNT_NUM,820817726,645878003,380350077
CMPLNT_FR_DT,08/03/2016,11/23/2016,06/29/2016
CMPLNT_FR_TM,15:25:00,20:30:00,21:17:00
CMPLNT_TO_DT,,,
CMPLNT_TO_TM,,,
ADDR_PCT_CD,23.0,81.0,46.0
RPT_DT,01/26/2017,11/23/2016,06/29/2016
KY_CD,101,101,101
OFNS_DESC,MURDER & NON-NEGL. MANSLAUGHTER,MURDER & NON-NEGL. MANSLAUGHTER,MURDER & NON-NEGL. MANSLAUGHTER


So it seems in this case, the high amount null values is okay, as there are a lot of columns which could possibly not be filled

#### Cleaning Steps

As said inside the past step we have to clear the airbnb dataset from the bad id rows in the host_id and listing_id columns

In [22]:
airbnb_df=airbnb_df.withColumn("host_id", airbnb_df.host_id.cast(IntegerType()))
airbnb_df=airbnb_df.filter(airbnb_df.host_id.isNotNull())
airbnb_df=airbnb_df.withColumnRenamed("id","listing_id")
airbnb_df=airbnb_df.withColumn("listing_id", airbnb_df.listing_id.cast(IntegerType()))
airbnb_df=airbnb_df.filter(airbnb_df.listing_id.isNotNull())

In [23]:
airbnb_df_count=airbnb_df.count()
print(f"Num of rows left: {airbnb_df_count}")

Num of rows left: 48720


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

<div style="width: 90%">As we I three different datasets. I chose to split them up in some dimensions and match them on some of them. There is a lot of information in each table but at most point they  are about very different specific topics in each column. In the case of the weather table it made no sense to split it into further dimensions except the date, which is also the primary key as there can only be one entry for each date. The airbnb got split up into the fact table listing, which holds up one entry for each listing and the host table because there is a 1 host to many listings relation. Also the airbnb rooms have a location information together with the given neighboorhood. I put them into a location dimension which is also part of the ny crime dataset table. Last but not least the ny crime dataset got split up into the weather matching date dimension at three points in each row and the tables crime_type and jurisdicton which having a 1:many relation to the crime table. The other information inside the crime table is not worth to put into dimensions as there are far to less 1:many relations.  </div>

<img src="./images/ERD.png" width=600>

#### 3.2 Mapping Out Data Pipelines

The easiest tables are the weather and date table so I will go first with them. Second are the host and location dimensions followed by the listing table. After this crime_type and jurisdiction will get filled and the model gets finished by the crime table.

1. Filling the date table beforehand as we can fill it without any external incomplete information.
2. Joining the date information on the date from the date table to get the date_id to the given date. Afterwards the weather data set gets inserted into the weather table
3. Spliting the unique hosts from the airbnb dataset to fill the host table
4. Adding the location information from airbnb and ny crime datasets together to fill the location table
5. Joining the airbnb dataset with the host and location tables to get the needed ids to fill the listing table
6. Spliting the unique jurisdiction and crime_type information to fill their tables
7. Joining the ny crime dataset with all needed dimensions to get their ids. Afterwards filling the crime table

### Step 4: Run Pipelines to Model the Data 

#### 4.0 Build the database

As the postgresql is installed on the workstation I can use it to build my datamodel inside it on the local machine.

In [24]:
# connect to default database
conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS capstone_db")
cur.execute("CREATE DATABASE capstone_db WITH ENCODING 'utf8' TEMPLATE template0")
conn.close()
cur.close()

In [25]:
conn = psycopg2.connect("host=127.0.0.1 dbname=capstone_db user=student password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()

In [26]:
cur.execute(sql.host_table_create)
cur.execute(sql.getStruct,("host",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,host_id,integer,NO
1,host_name,character varying,YES


In [27]:
cur.execute(sql.location_table_create)
cur.execute(sql.getStruct,("location",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,location_id,integer,NO
1,neighbourhood_group,character varying,YES
2,neighbourhood,character varying,YES
3,long,character varying,NO
4,lat,character varying,NO


In [28]:
cur.execute(sql.listing_table_create)
cur.execute(sql.getStruct,("listing",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,listing_id,integer,NO
1,name,character varying,YES
2,host_id,integer,NO
3,location_id,integer,NO
4,room_type,character varying,YES
5,price,integer,YES
6,minimum_nights,integer,YES
7,number_of_reviews,integer,YES
8,last_review,date,YES
9,reviews_per_month,double precision,YES


In [29]:
cur.execute(sql.date_table_create)
cur.execute(sql.getStruct,("date",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,date_id,integer,NO
1,year,integer,NO
2,month,integer,NO
3,day,integer,NO


In [30]:
cur.execute(sql.jurisdiction_table_create)
cur.execute(sql.getStruct,("jurisdiction",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,jurisdiction_id,integer,NO
1,jdesc,character varying,NO


In [31]:
cur.execute(sql.crime_type_table_create)
cur.execute(sql.getStruct,("crime_type",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,ctype_id,integer,NO
1,ky_cd,integer,NO
2,ofns_desc,character varying,YES
3,pd_cd,integer,YES
4,pd_desc,character varying,YES


In [32]:
cur.execute(sql.crime_table_create)
cur.execute(sql.getStruct,("crime",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,crime_id,integer,NO
1,precinct,integer,NO
2,location_id,integer,YES
3,start_date_id,integer,NO
4,end_date_id,integer,YES
5,completed,character varying,NO
6,hadevelopt,character varying,YES
7,housing_psa,integer,YES
8,jurisdiction_id,integer,YES
9,law_cat_cd,character varying,NO


In [33]:
cur.execute(sql.weather_table_create)
cur.execute(sql.getStruct,("weather",))
column_names = [desc[0] for desc in cur.description]
pd.DataFrame(cur.fetchall(),columns=column_names)

Unnamed: 0,column_name,data_type,is_nullable
0,date_id,integer,NO
1,max_temp,integer,NO
2,min_temp,integer,NO
3,avg_temp,integer,NO
4,precipitation,double precision,NO
5,snowfall,double precision,NO
6,snowdepth,double precision,NO


In [34]:
cur.close()
conn.close()

#### 4.1 Create the data model

Lets start with a fresh connection and the sql driver properties for spark

In [35]:
conn = psycopg2.connect("host=127.0.0.1 dbname=capstone_db user=student password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()

In [36]:
url = "jdbc:postgresql://localhost/capstone_db"
properties = {
    "user": "student",
    "password": "student",
    "driver": "org.postgresql.Driver"
}

#### Date Table

I chose a range between 2012 and 2019 just to be safe

In [37]:
start = datetime.strptime("01-01-2012", "%d-%m-%Y")
end = datetime.strptime("31-12-2019", "%d-%m-%Y")
date_generated = [start + timedelta(days=x) for x in range(0, (end-start).days)]
for date in date_generated:
    cur.execute(sql.date_table_insert, [date.year,date.month,date.day])

## Weather Table

Normaly I could just parse the date and would be fine matching the date dimension. But that would be to easy. 
As I am from Europe and don't need such strange metrics inside my database I also converte Fahrenheit to Celcius for the temperature columns and and Inches to Centimetre in the other.

There is a Character "T" in some rows of precipitation snowfall and snowdepth as a mark for something like "There was some snow but it couldn't get measured". So it is very close to zero and I decided to impute them into zeros. 

In [38]:
get_datetime = F.udf(lambda x: datetime.strptime(x, '%d-%m-%Y') if x!= None else x, DateType())
get_day = F.udf(lambda x : x.day if x!= None else x, IntegerType())
get_year = F.udf(lambda x: x.year if x!= None else x, IntegerType())
get_month = F.udf(lambda x: x.month if x!= None else x, IntegerType())

f2c = F.udf(lambda x: int((float(x)-32)/1.8), IntegerType()) #Fahrenheit to Ceclius
i2cm = F.udf(lambda x: round((float(x)/0.39370),2), FloatType()) #Inches to Centimeter



clima_df = clima_df.withColumn('date_parsed', get_datetime(clima_df.date))
clima_df = clima_df.withColumn('day', get_day(clima_df.date_parsed))
clima_df = clima_df.withColumn('year', get_year(clima_df.date_parsed))
clima_df = clima_df.withColumn('month', get_month(clima_df.date_parsed))

clima_df = clima_df.withColumn('max_temp', f2c(F.col("maximumtemperature")))
clima_df = clima_df.withColumn('min_temp', f2c(F.col("minimumtemperature")))
clima_df = clima_df.withColumn('avg_temp', f2c(F.col("averagetemperature")))

clima_df = clima_df.withColumn("precipitation", F.when(F.col("precipitation")=="T",0).otherwise(F.col("precipitation")))
clima_df = clima_df.withColumn("snowfall", F.when(F.col("snowfall")=="T",0).otherwise(F.col("snowfall")))
clima_df = clima_df.withColumn("snowdepth", F.when(F.col("snowdepth")=="T",0).otherwise(F.col("snowdepth")))

clima_df = clima_df.withColumn("precipitation_cm", i2cm(clima_df.precipitation))
clima_df = clima_df.withColumn("snowfall_cm", i2cm(clima_df.snowfall))
clima_df = clima_df.withColumn("snowdepth_cm", i2cm(clima_df.snowdepth))


clima_df.limit(3).toPandas()

Unnamed: 0,date,maximumtemperature,minimumtemperature,averagetemperature,precipitation,snowfall,snowdepth,date_parsed,day,year,month,max_temp,min_temp,avg_temp,precipitation_cm,snowfall_cm,snowdepth_cm
0,1-1-2016,42,34,38.0,0.0,0.0,0,2016-01-01,1,2016,1,5,1,3,0.0,0.0,0.0
1,2-1-2016,40,32,36.0,0.0,0.0,0,2016-01-02,2,2016,1,4,0,2,0.0,0.0,0.0
2,3-1-2016,45,35,40.0,0.0,0.0,0,2016-01-03,3,2016,1,7,1,4,0.0,0.0,0.0


Loading the date dimension from postgres to get the date_ids

In [39]:
date_df=spark.read.jdbc(url=url, table="date", properties=properties)

Joining date with weather dataset

In [40]:
clima_fact=clima_df.join(date_df, \
                         (clima_df.year == date_df.year) & \
                         (clima_df.month == date_df.month) & \
                         (clima_df.day == date_df.day))\
                         .select("date_id","max_temp","min_temp","avg_temp","precipitation_cm","snowfall_cm","snowdepth_cm")\
                         .withColumnRenamed("precipitation_cm", "precipitation")\
                         .withColumnRenamed("snowfall_cm", "snowfall")\
                         .withColumnRenamed("snowdepth_cm", "snowdepth")

clima_fact.limit(3).toPandas()

Unnamed: 0,date_id,max_temp,min_temp,avg_temp,precipitation,snowfall,snowdepth
0,1462,5,1,3,0.0,0.0,0.0
1,1463,4,0,2,0.0,0.0,0.0
2,1464,7,1,4,0.0,0.0,0.0


Inserting the weather data into the weather table

In [41]:
clima_fact.write.jdbc(url=url,table="weather", properties=properties,mode="append")

In [42]:
spark.read.jdbc(url=url, table="(SELECT * FROM weather limit 3) as t", properties=properties).toPandas()

Unnamed: 0,date_id,max_temp,min_temp,avg_temp,precipitation,snowfall,snowdepth
0,1462,5,1,3,0.0,0.0,0.0
1,1463,4,0,2,0.0,0.0,0.0
2,1464,7,1,4,0.0,0.0,0.0


## Host Dimension

I just pulled the unique information from the airbnb dataset and inserted it into the host table

In [43]:
airbnb_df.select("host_id","host_name").distinct().write.jdbc(url=url,table="host", properties=properties,mode="append")

In [44]:
spark.read.jdbc(url=url, table="(SELECT * FROM host limit 3) as t", properties=properties).toPandas()

Unnamed: 0,host_id,host_name
0,64522,Daniel
1,69942,Victoria
2,361855,Kurt


## Location Dimension

This one was the hardest dimension. I invested a lot of time to get a free python library to calculate the zip code from the given location. There are some wrapper-libaries like geopy using OpenStreetMap or Google APIs but they have a very small limit of calls you can make in a short period of time. Keeping in mind that calling 100 coordinates took several minutes, it would be to long for this course matching all geo inforation inside the half million rows of data inside the nycrime dataset. If this would be a business case and this model would be used other a longer you have the time (or the money for the original GoogleAPI connetion) you could crawl all geoinformation data together and maintain the location dimension other time.

So I had to come up with a solution for this situation. I decided to shorten the geo information to three decimals. This isn't the perfect solution and not all ny crime information could be joined and extended with the neighborhood information from the airbnb dataset but some of them match.

In [45]:
shorten = F.udf(lambda x: '{:.3f}'.format(round(float(x),3)) if x!= None else x, StringType())

airbnb_df = airbnb_df.withColumn("new_lat", shorten(airbnb_df.latitude))
airbnb_df = airbnb_df.withColumn("new_long", shorten(airbnb_df.longitude))
nycrime_df = nycrime_df.withColumn("new_lat", shorten(nycrime_df.Latitude))
nycrime_df = nycrime_df.withColumn("new_long", shorten(nycrime_df.Longitude))

In [46]:
loc_airbnb=airbnb_df.select("neighbourhood_group","neighbourhood","new_lat","new_long").distinct()

In [47]:
loc_nycrime=nycrime_df.select("new_lat","new_long").distinct()

In [48]:
unique_geo_df=loc_nycrime.join(loc_airbnb, \
                         (loc_nycrime.new_lat == loc_airbnb.new_lat) & \
                         (loc_nycrime.new_long == loc_airbnb.new_long), how="leftanti").union(loc_airbnb.select("new_lat","new_long"))

In [49]:
location_df=unique_geo_df.join(loc_airbnb, ["new_lat","new_long"],how="left").dropDuplicates(subset=["new_lat","new_long"])\
                        .filter(F.col("new_lat").isNotNull() | F.col("new_long").isNotNull())\
                        .withColumnRenamed("new_lat", "lat")\
                        .withColumnRenamed("new_long", "long")

In [50]:
location_df.write.jdbc(url=url,table="location", properties=properties,mode="append")

In [51]:
spark.read.jdbc(url=url, table="(SELECT * FROM location limit 3) as t", properties=properties).toPandas()

Unnamed: 0,location_id,neighbourhood_group,neighbourhood,long,lat
0,1,,,-74.228,40.503
1,2,,,-74.183,40.535
2,3,,,-74.154,40.536


### Listing Fact

After the host and location dimensions are made up the listing fact table can be parsed from the airbnb dataset by joining it with it's dimensions.

In [52]:
location_df=spark.read.jdbc(url=url, table="location", properties=properties)

In [53]:
get_datetime = F.udf(lambda x: datetime.strptime(x, '%Y-%m-%d') if x != None else x, DateType())
airbnb_df = airbnb_df.withColumn('last_review', get_datetime(airbnb_df.last_review))
airbnb_df = airbnb_df.withColumn('reviews_per_month', airbnb_df.reviews_per_month.cast(FloatType()))

toInt=["price","minimum_nights","number_of_reviews","calculated_host_listings_count","availability_365"]
for col in toInt:
    airbnb_df = airbnb_df.withColumn(col, F.col(col).cast(IntegerType()))

In [54]:
listing_df=airbnb_df.join(location_df, \
                         (airbnb_df.new_lat == location_df.lat) & \
                         (airbnb_df.new_long == location_df.long),how="left")

In [55]:
listing_df\
    .select("listing_id","host_id","location_id","room_type","price",
            "minimum_nights","number_of_reviews","last_review","reviews_per_month","calculated_host_listings_count","availability_365")\
    .write.jdbc(url=url,table="listing", properties=properties,mode="append")
    

In [56]:
spark.read.jdbc(url=url, table="(SELECT * FROM listing limit 3) as t", properties=properties).toPandas()

Unnamed: 0,listing_id,name,host_id,location_id,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,27700747,,145245549,11,Private room,36,2,18,2019-06-07,1.67,1,178
1,13480090,,77179176,21,Private room,69,1,0,,,1,0
2,13802521,,26022066,33,Private room,50,1,10,2019-04-28,0.3,1,0


## Jurisdiction Dimension

The jurisdiction dimension can be easly extracted as distinct values from the ny crime dataset. The jurisdiction code get's renamed to jurisdiction_id. 

In [57]:
jurisdiction_df= \
nycrime_df.select("JURISDICTION_CODE","JURIS_DESC")\
.distinct().filter(F.col("JURISDICTION_CODE").isNotNull())\
.withColumn('JURISDICTION_CODE', nycrime_df.JURISDICTION_CODE.cast(IntegerType()))\
.withColumnRenamed("JURISDICTION_CODE","jurisdiction_id")\
.withColumnRenamed("JURIS_DESC","jdesc")

In [58]:
jurisdiction_df.write.jdbc(url=url,table="jurisdiction", properties=properties,mode="append")

In [59]:
spark.read.jdbc(url=url, table="(SELECT * FROM jurisdiction limit 3) as t", properties=properties).toPandas()

Unnamed: 0,jurisdiction_id,jdesc
0,4,TRI-BORO BRDG TUNNL
1,12,N.Y. STATE PARKS
2,15,METRO NORTH


## Crime_type Dimension

The crime_type dimension gets extracted from the ny crime dataset as distinct values.

In [60]:
crime_type_df=nycrime_df.select("KY_CD","OFNS_DESC","PD_CD","PD_DESC").distinct()\
.withColumn('KY_CD', nycrime_df.KY_CD.cast(IntegerType()))\
.withColumn('PD_CD', nycrime_df.PD_CD.cast(IntegerType()))

In [61]:
crime_type_df.write.jdbc(url=url,table="crime_type", properties=properties,mode="append")

In [62]:
spark.read.jdbc(url=url, table="(SELECT * FROM crime_type limit 3) as t", properties=properties).toPandas()

Unnamed: 0,ctype_id,ky_cd,ofns_desc,pd_cd,pd_desc
0,1,346,ALCOHOLIC BEVERAGE CONTROL LAW,802,ALCOHOLIC BEVERAGE CONTROL LAW
1,2,676,NEW YORK CITY HEALTH CODE,889,"HEALTH CODE,VIOLATION"
2,3,109,GRAND LARCENY,457,"LARCENY,GRAND OF VEHICULAR/MOTORCYCLE ACCESSORIES"


## Crime Fact

To get the crime fact table the ny crime dataset gets joined with crime_type, jurisdiction, location and date. As I wanted some more clear column names I renamed some of them.

In [63]:
crime_type_df = spark.read.jdbc(url=url, table="crime_type", properties=properties)
jurisdiction_df = spark.read.jdbc(url=url, table="jurisdiction", properties=properties)

get_datetime = F.udf(lambda x: datetime.strptime(x, '%m/%d/%Y') if x!= None else x, DateType())

In [64]:
rename_map={
    "CMPLNT_NUM":"crime_id",
    "ADDR_PCT_CD" : "precinct",
    "CRM_ATPT_CPTD_CD" : "completed"
}

for key in rename_map:
    nycrime_df=nycrime_df.withColumnRenamed(key,rename_map[key])

In [65]:
toInt=["crime_id","precinct","HOUSING_PSA"]
for col in toInt:
    nycrime_df = nycrime_df.withColumn(col, F.col(col).cast(IntegerType()))

In [66]:
nycrime_df=nycrime_df.join(crime_type_df, \
                         (nycrime_df.KY_CD == crime_type_df.ky_cd) & \
                         (nycrime_df.PD_CD == crime_type_df.pd_cd) ,how="left")

In [67]:
nycrime_df=nycrime_df.join(jurisdiction_df, \
               (nycrime_df.JURIS_DESC == jurisdiction_df.jdesc),how="left")

In [68]:
nycrime_df=nycrime_df.join(location_df, \
                         (nycrime_df.new_lat == location_df.lat) & \
                         (nycrime_df.new_long == location_df.long),how="left")

In [69]:
nycrime_df=nycrime_df.withColumn("start_date",get_datetime(nycrime_df.CMPLNT_FR_DT))
nycrime_df=nycrime_df.withColumn("start_day",get_day(nycrime_df.start_date))
nycrime_df=nycrime_df.withColumn("start_year",get_year(nycrime_df.start_date))
nycrime_df=nycrime_df.withColumn("start_month",get_month(nycrime_df.start_date))

nycrime_df=nycrime_df.withColumn("end_date",get_datetime(nycrime_df.CMPLNT_TO_DT))
nycrime_df=nycrime_df.withColumn("end_day",get_day(nycrime_df.end_date))
nycrime_df=nycrime_df.withColumn("end_year",get_year(nycrime_df.end_date))
nycrime_df=nycrime_df.withColumn("end_month",get_month(nycrime_df.end_date))

nycrime_df=nycrime_df.withColumn("rpt_date",get_datetime(nycrime_df.RPT_DT))
nycrime_df=nycrime_df.withColumn("rpt_day",get_day(nycrime_df.rpt_date))
nycrime_df=nycrime_df.withColumn("rpt_year",get_year(nycrime_df.rpt_date))
nycrime_df=nycrime_df.withColumn("rpt_month",get_month(nycrime_df.rpt_date))

In [70]:
nycrime_df=nycrime_df.join(date_df, \
                         (nycrime_df.start_day == date_df.day) & \
                         (nycrime_df.start_month == date_df.month) & \
                         (nycrime_df.start_year == date_df.year), how="left").withColumnRenamed("date_id","start_date_id").drop("year","month","day")

In [71]:
nycrime_df=nycrime_df.join(date_df, \
                         (nycrime_df.end_day == date_df.day) & \
                         (nycrime_df.end_month == date_df.month) & \
                         (nycrime_df.end_year == date_df.year), how="left").withColumnRenamed("date_id","end_date_id").drop("year","month","day")

In [72]:
nycrime_df=nycrime_df.join(date_df, \
                         (nycrime_df.rpt_day == date_df.day) & \
                         (nycrime_df.rpt_month == date_df.month) & \
                         (nycrime_df.rpt_year == date_df.year), how="left").withColumnRenamed("date_id","rpt_date_id").drop("year","month","day")

In [73]:
start = datetime.now()
table_columns=["crime_id","precinct","location_id","start_date_id","end_date_id","completed","HADEVELOPT","HOUSING_PSA","jurisdiction_id",
              "LAW_CAT_CD","LOC_OF_OCCUR_DESC","PARKS_NM","PATROL_BORO","ctype_id","PREM_TYP_DESC","rpt_date_id","STATION_NAME","SUSP_AGE_GROUP","SUSP_RACE","SUSP_SEX",
              "VIC_AGE_GROUP","VIC_RACE","VIC_SEX","TRANSIT_DISTRICT"]

nycrime_df.select(table_columns).write.jdbc(url=url,table="crime", properties=properties,mode="append")
print(f"Time: {datetime.now()-start}")

Time: 0:03:49.628031


In [74]:
spark.read.jdbc(url=url, table="(SELECT * FROM crime limit 3) as t", properties=properties).toPandas()

Unnamed: 0,crime_id,precinct,location_id,start_date_id,end_date_id,completed,hadevelopt,housing_psa,jurisdiction_id,law_cat_cd,...,prem_typ_desc,rpt_date_id,station_name,susp_age_group,susp_race,susp_sex,vic_age_group,vic_race,vic_sex,transit_district
0,260960589,62,30884,1594,2105.0,COMPLETED,,,0,MISDEMEANOR,...,STREET,2105,,,,,25-44,UNKNOWN,M,
1,962119894,113,31838,1504,,COMPLETED,,,0,FELONY,...,RESIDENCE-HOUSE,2105,,UNKNOWN,UNKNOWN,U,25-44,BLACK,M,
2,420713889,108,22912,1767,1796.0,COMPLETED,,,0,MISDEMEANOR,...,RESIDENCE - APT. HOUSE,2105,,18-24,WHITE HISPANIC,M,UNKNOWN,UNKNOWN,D,


#### 4.2 Data Quality Checks 

Checking the number of lines loaded to the database before and after processing and joining the dataframes

In [75]:
crime_db_count=spark.read.jdbc(url=url, table="(SELECT count(*) FROM crime) as t", properties=properties).first()['count'] 
airbnb_db_count=spark.read.jdbc(url=url, table="(SELECT count(*) FROM listing) as t", properties=properties).first()['count'] 
clima_db_count=spark.read.jdbc(url=url, table="(SELECT count(*) FROM weather) as t", properties=properties).first()['count'] 

In [76]:
testdata=\
[{"table": "NY Crime", "before":crime_df_count,"after":crime_db_count,"same":crime_df_count==crime_db_count},
{"table": "Airbnb", "before":airbnb_df_count,"after":airbnb_db_count,"same":airbnb_df_count==airbnb_db_count},
{"table": "Weather","before":clima_df_count,"after":clima_db_count,"same":clima_df_count==clima_db_count}]

cols=testdata[0].keys()
    
pd.DataFrame(testdata,columns=cols)

Unnamed: 0,table,before,after,same
0,NY Crime,477639,477639,True
1,Airbnb,48720,48720,True
2,Weather,366,366,True


Checking udf for parsing from fahrenheit to celcius

In [77]:
data=[(32,0),(100,37),(50,10)]
testdf = spark.createDataFrame(data,["original","expected"])
testdf.withColumn("calc", f2c(testdf.original)).toPandas()

Unnamed: 0,original,expected,calc
0,32,0,0
1,100,37,37
2,50,10,10


Checking the udf for parsing inches to centimetre

In [78]:
data=[(1,2.54),(6,15.24),(42,106.68)]
testdf = spark.createDataFrame(data,["original","expected"])
testdf.withColumn("calc", i2cm(testdf.original)).toPandas()

Unnamed: 0,original,expected,calc
0,1,2.54,2.54
1,6,15.24,15.24
2,42,106.68,106.68


#### 4.3 Data dictionary 


#### Date Table

- **date_id**: Primary key of the table
- **year**: Year of the date
- **month**: Month of the date
- **day**: Day in Month of the date

#### Weather Table

- **date_id**: id of the date and primary key of the table
- **max_temp**: maximal temperature in celcius
- **min_temp**: minimal temperature in celcius
- **avg_temp**: average temperature in celcius
- **precipitation**: precipitation in cm
- **snowfall**: snowfall in cm
- **snowdepth**: snowdepth in cm

#### Host Table

- **host_id**: id of the host and primary key of the table
- **host_name**: name of the host

#### Location Table

- **location_id**: id of the location and primary key of the table
- **neighbourhood_group**: group of the neighbourhood
- **neighbourhood**: neighbourhood
- **long**: longitude
- **lat**: latitude

#### Listing Table 

- **listing_id**: id of the listing and primary key of the table
- **name** : name of the listing
- **host_id**: id of the host
- **location_id**: id of the location
- **room_type**: type of the room
- **price**: price of the listing in dollar
- **minimum_night**: minimum nights someone has to book 
- **number_of_reviews**: number_of_reviews the listing got
- **last_review**: time of the last review
- **reviews_per_month**: avg of reviews the listing got each month
- **calculated_host_listing_counts**: number of listings of the given host
- **availability_365**: number of available bookingdates per year

#### Jurisdiction Table

- **jurisdiction_id**: id of the jurisdiction and primary key of the table
- **jdesc**: description of the jurisdiction

#### Crime Type Table

- **ctype_id**: id of the crime type and primary key of the table
- **ky_cd**: offense classification code
- **ofns_desc**: offense description
- **pd_cd**: internal classification code
- **pd_desc**: description of the offense more granular than offense description

#### Crime Table

- **crime_id**: id of the crime and primary key of the table
- **precinct**: the precinct in which the incident occurred
- **location_id**: id of the location
- **start_date_id**: id of the date where the offense started
- **end_date_id**: id of the date where the offense ended
- **completed**: if the offense failed or succeeded
- **hadevelopt**: name of NYCHA housing development of occurrence, if applicable
- **housing_psa**: development level code
- **jurisdiction**: id of the jurisdiction
- **law_cat_cd**: level of offense
- **loc_of_occur_desc**: specific location of occurrence in or around the premises
- **parks_nm**: name of NYC park, playground or greenspace of occurrence, if applicable
- **patrol_boro**: the name of the patrol borough in which the incident occurred
- **ctype_id**: id of the crime type
- **prem_typ_desc**: specific description of premises
- **rpt_date_id**: id of the date where the offense got reported
- **station_name**: transit station name
- **susp_age_group**: age group of the suspect
- **susp_race**: race of the suspect
- **susp_sex**: sex of the suspect
- **vic_age_group**: age group of the victim
- **vic_race**: race of the victim
- **vic_sex**: sex of the victim
- **transit_district**: name of the transit district


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Technologies

- **Python**: fast and clean language in working with data
- **Spark**: distributed solution for fast data transformation
- **PostgreSQL**: fast and robust relational database

#### Update Cycles

- Weather: comes in daily, but if not needed it would make more sense to integrate it weekly or monthly as this is a very small amount of data
- Airbnb: there could be a lot of updates in listings. It would make much sense to integrate it nightly
- Crime: comes to the update cyclus the goverment of ny publishs this data



#### Scenarios


**The data was increased by 100x**

If the data was increased by 100x I would try to get more nodes to my spark cluster and keeping an eye at the PostgreSQL performance. If the performance get's down it could be a solution to change to a distributed database solution.

**The data populates a dashboard that must be updated on a daily basis by 7am every day**

To get an updated dashboard every morning at 7am you have to integrate the data in a nightly job. The nightly job could be written in airflow and use a time trigger to start the pipeline in a time slot where less ressources are needed by other jobs

**The database needed to be accessed by 100+ people**

The more people accessing the database the more cpu resources you need to get a fast experience. By using a distributed database you can to improve your replications and partitioning to get faster query results for each user.