# United States Immigration Data Lake built using Apache Spark
### Data Engineering Capstone Project

#### Project Summary

We wanted to design a datalake which would aid us in analyzing behaviour of travellers entering the United States. We wanted to understand what factors attract travellers to a certain area such as whether it is low crime rates or large population or a large percentage of foreign born residents. We have therefore gathered all the needed data from various sources and added it to our datalake. This data can be easily read by any data analyst who can then run any query required.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Define the Data Model
* Step 3: ETL of i94 Data
    * Explore and Assess the data
    * Data Wrangling and Cleaning
    * Loading onto our datalake
* Step 4: ETL of dimension tables
    * Explore and Assess the data
* Step 5: ETL of US Cities + Population tabe as well as the US Crime data.
    * Explore and Assess the data
* Step 6: Data Quality Checks and Data Dictionary
* Step 7: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
##### Explain what you plan to do in the project in more detail.
* In our project we plan on building a datalake which will be used by a team of data scientists and data analysts to study behaviour of people travelling to the United States. Primarily, they want to study why travellers prefer some areas over the others but also our dataset allows them to study year wise patterns. Passengers can also be segregated in different age groups or nationalities.

##### What data do you use?
* We have used different datasets. Some provided on the udacity workspace and some gathered from elsewhere.

##### What is your end solution look like? What tools did you use?
* Our end solution is a datalake stored either on Amazon's S3 or local storage as per user's choice and convenience. 
* For our analytical and wrangling needs we have relied on Apache Spark due to its ease of handling large data and its ability to work with various file formats.
* Moreover, where ever possible we have relied on storing our data in the parquet file format. This is a columnar format which allows for high speed reads.

#### Describe and Gather Data 
In our datalake we are using data from different sources.
* Our main fact tabe containing large number of immigration events comes from the US National Tourism and Trade Office. There is a dictionary assosiated with this large dataset that we have used to build up a number of smaller dimension tables.
* Our next major data source is the us-cities-demographic data. This table contains population demographics of a large number of cities in the United States. This data comes to us from OpenSoft. https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
* Lastly we are using the uscrimes dataset from kaggle. It contains yearly crime rates in major US jurisdictions. https://www.kaggle.com/ayush1498/analysis-of-us-crime-data 

### Step 2: Define the Data Model

* We will using the i94 events data as our fact table. Our dimension tables will number 7 and these shall be i94addr, i94cit_res, i94mode, i94port, i94visa, uscities, and uscrime.

* The data model designed will be as shown in the pic below.

![image info](./data_model.PNG)


Starting our code by initilizating libraries and functions

In [1]:
import configparser
from datetime import datetime
import datetime as dt
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, split, first
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import types as T
import pandas as pd
from pyspark.sql.types import *

A config file 'dl.cfg' has been included which contains our AWS account information. Also it contains the location of the s3bucket where we want to store our datalake.

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
s3bucket = config['AWS']['S3_bucket']
localoutput = './outputdata2/'



In [3]:
s3bool = False # If true write to s3bucket given in dl.cfg, if False write to local disk

Creating Our Spark Session

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

### Step 3: ETL of i94 Data
* We are going to read in our i94 events data into a spark dataframe. Moreover, this data being in raw farm has to be cleaned and edited before we can store it into our datalake.

Reading the parquet files. In the next line we will print the schema of this table to see what columns are present and their data type.

In [5]:
path = './sas_data'
i94_raw = spark.read.option("header",True).parquet(path)

In [6]:
i94_raw.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

Printing the table to see the values

In [7]:
i94_raw.limit(2).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1


We are only interested in a few columns.

In [8]:
i94_raw.select("i94res","i94port","i94addr", "arrdate","i94mode","depdate","i94bir","i94visa","count" \
                  ,"gender","admnum").show(2)

+------+-------+-------+-------+-------+-------+------+-------+-----+------+--------------+
|i94res|i94port|i94addr|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|        admnum|
+------+-------+-------+-------+-------+-------+------+-------+-----+------+--------------+
| 438.0|    LOS|     CA|20574.0|    1.0|20582.0|  40.0|    1.0|  1.0|     F|9.495387003E10|
| 438.0|    LOS|     NV|20574.0|    1.0|20591.0|  32.0|    1.0|  1.0|     F|9.495562283E10|
+------+-------+-------+-------+-------+-------+------+-------+-----+------+--------------+
only showing top 2 rows



Let us now correct the data types.

In [9]:
i94fact = i94_raw.select(col("cicid").cast(IntegerType()),col("i94res").cast(IntegerType()),col("i94port"),
                           col("arrdate").cast(IntegerType()), \
                           col("i94mode").cast(IntegerType()), col("i94addr"), col("depdate").cast(IntegerType()),
                           col("i94bir").cast(IntegerType()),col("i94visa").cast(IntegerType()), 
                           col("count").cast(IntegerType()), \
                              "gender",col("admnum").cast(LongType()))

In [10]:
i94fact.show(2)

+-------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|  cicid|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|gender|     admnum|
+-------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|5748517|   438|    LOS|  20574|      1|     CA|  20582|    40|      1|    1|     F|94953870030|
|5748518|   438|    LOS|  20574|      1|     NV|  20591|    32|      1|    1|     F|94955622830|
+-------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 2 rows



Convert arrdate and depdate into proper datetime format.

In [11]:
dt_formatter = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
i94fact = i94fact.withColumn("arrival_date", dt_formatter(i94fact.arrdate))
i94fact = i94fact.withColumn("departure_date", dt_formatter(i94fact.depdate))

In [12]:
i94fact.show(2)

+-------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-----------+------------+--------------+
|  cicid|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|gender|     admnum|arrival_date|departure_date|
+-------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-----------+------------+--------------+
|5748517|   438|    LOS|  20574|      1|     CA|  20582|    40|      1|    1|     F|94953870030|  2016-04-30|    2016-05-08|
|5748518|   438|    LOS|  20574|      1|     NV|  20591|    32|      1|    1|     F|94955622830|  2016-04-30|    2016-05-17|
+-------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-----------+------------+--------------+
only showing top 2 rows



Seems to have worked all good. Now we will drop the arrdate and depdate columns. 

In [13]:
i94fact = i94fact.drop('arrdate','depdate')

Out data has been cleaned out, we will write the table into our datalake in parquet format paritioned by arrival month.

In [14]:
# first add arrival month column to the sparkdf.
i94fact = i94fact.withColumn("arrival_month", date_format('arrival_date','M'))


In [15]:
# Writing the files

if s3bool:
    i94fact.write.option("header",True).partitionBy("arrival_month").mode("overwrite").parquet(s3bucket+"i94fact")
    
else:
    i94fact.write.option("header",True).partitionBy("arrival_month").mode("overwrite").parquet(localoutput+"i94fact")


### Step 4: ETL of dimension tables
Now we must read the I94_SAS_Labels_Descriptions.SAS and extract important information which will help us make sense of our fact tabe.

In [16]:
# From https://knowledge.udacity.com/questions/125439


with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

This function will read the contents of the descriptions file and load them into python

In [17]:
# From https://knowledge.udacity.com/questions/125439

def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    
    return dic

In [18]:
i94cit_res = code_mapper(f_content, "i94cntyl")

i94port = code_mapper(f_content, "i94prtl")

i94mode = code_mapper(f_content, "i94model")

i94addr = code_mapper(f_content, "i94addrl")

i94visa = {'1':'Business', '2': 'Pleasure', '3' : 'Student'}

#### Transformation and Loading of  i94port to a Spark Dataframe

In [19]:
df = pd.DataFrame.from_dict(i94port, orient='index', columns=['city_and_state'])
df = df.reset_index().rename(columns={'index': 'city_code'})
i94port_sparkDF=spark.createDataFrame(df) 
split_col = F.split(i94port_sparkDF['city_and_state'], ',')
i94port_sparkDF = i94port_sparkDF.withColumn('city_name', split_col.getItem(0))
i94port_sparkDF = i94port_sparkDF.withColumn('state_name', split_col.getItem(1))
i94port_sparkDF = i94port_sparkDF.drop('city_and_state')
i94port_sparkDF.show(5)

# Writing the table as a json file into our datalake
if s3bool:
    i94port_sparkDF.write.option("header",True).partitionBy("state_name").mode("overwrite").json(s3bucket+"i94port")
else:
    i94port_sparkDF.write.option("header",True).partitionBy("state_name").mode("overwrite").json(localoutput+"i94port")

+---------+--------------------+----------+
|city_code|           city_name|state_name|
+---------+--------------------+----------+
|      ALC|               ALCAN|        AK|
|      ANC|           ANCHORAGE|        AK|
|      BAR|BAKER AAF - BAKER...|        AK|
|      DAC|       DALTONS CACHE|        AK|
|      PIZ|DEW STATION PT LA...|        AK|
+---------+--------------------+----------+
only showing top 5 rows



#### Transformation and Loading of  i94cit_res to a Spark Dataframe

In [20]:
df = pd.DataFrame.from_dict(i94cit_res, orient='index', columns=['country_of_residence'])
df = df.reset_index().rename(columns={'index': 'res_code'})
i94cit_res_sparkDF=spark.createDataFrame(df) 
i94cit_res_sparkDF.show(5)

if s3bool:
    i94cit_res_sparkDF.write.option("header",True).mode("overwrite").json(s3bucket+"i94cit_res")
else:
    i94cit_res_sparkDF.write.option("header",True).mode("overwrite").json(localoutput+"i94cit_res")
        

+--------+--------------------+
|res_code|country_of_residence|
+--------+--------------------+
|     582|MEXICO Air Sea, a...|
|     236|         AFGHANISTAN|
|     101|             ALBANIA|
|     316|             ALGERIA|
|     102|             ANDORRA|
+--------+--------------------+
only showing top 5 rows



#### Transformation and Loading of  i94mode to a Spark Dataframe

In [21]:
df = pd.DataFrame.from_dict(i94mode, orient='index', columns=['mode_of_travel'])
df = df.reset_index().rename(columns={'index' : 'i94mode_code'})
i94mode_sparkDF = spark.createDataFrame(df)
i94mode_sparkDF.show()

if s3bool:
    i94mode_sparkDF.write.option("header",True).mode("overwrite").json(s3bucket+"i94mode")
else:
    i94mode_sparkDF.write.option("header",True).mode("overwrite").json(localoutput+"i94mode")

+------------+--------------+
|i94mode_code|mode_of_travel|
+------------+--------------+
|           1|           Air|
|           2|           Sea|
|           3|          Land|
|           9|  Not reported|
+------------+--------------+



#### Transformation and Loading of  i94addr to a Spark Dataframe

In [22]:
df = pd.DataFrame.from_dict(i94addr, orient='index', columns=['residence_address'])
df = df.reset_index().rename(columns={'index':'i94addr_code'})
i94addr_sparkDF = spark.createDataFrame(df)
i94addr_sparkDF.show(5)

if s3bool:
    i94addr_sparkDF.write.option("header",True).mode("overwrite").json(s3bucket+"i94addr")
else:
    i94addr_sparkDF.write.option("header",True).mode("overwrite").json(localoutput+"i94addr")

+------------+-----------------+
|i94addr_code|residence_address|
+------------+-----------------+
|          AL|          ALABAMA|
|          AK|           ALASKA|
|          AZ|          ARIZONA|
|          AR|         ARKANSAS|
|          CA|       CALIFORNIA|
+------------+-----------------+
only showing top 5 rows



#### Transformation and Loading of  i94visa to a Spark Dataframe

In [23]:
df = pd.DataFrame.from_dict(i94visa, orient='index', columns=['visa_type'])
df = df.reset_index().rename(columns={'index':'visa_code'})
i94visa_sparkDF = spark.createDataFrame(df)
i94visa_sparkDF.show()
if s3bool:
    i94visa_sparkDF.write.option("header",True).mode("overwrite").json(s3bucket+"i94visa")
else:
    i94visa_sparkDF.write.option("header",True).mode("overwrite").json(localoutput+"i94visa")

+---------+---------+
|visa_code|visa_type|
+---------+---------+
|        1| Business|
|        2| Pleasure|
|        3|  Student|
+---------+---------+



### Step 5: ETL of US Cities + Population tabe as well as the US Crime data.

Reading the data into a spark dataframe. It is not a usual csv file seperated by commas. It is instead seperated by semi colons and hence that has to mentioned in the function.

In [24]:
uscities_df=spark.read.csv("./us-cities-demographics.csv", sep=';', header=True)

Princting the schema to see the columns present and their data types.

In [25]:
uscities_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



Printing the values to sneakpeak into the data.

In [26]:
uscities_df.limit(3).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759


Looking for duplicate values.

In [27]:
uscities_df.where("city == 'Quincy'").toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,Hispanic or Latino,2566
2,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,American Indian and Alaska Native,351
3,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,Black or African-American,3917
4,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,Asian,30473


The above results shows that each city is entered multiple times with different ethnicites population mentioned seperately

In [28]:
uscities_df.createOrReplaceTempView("uscities_table")

In [29]:
spark.sql(
"""
    SELECT COUNT(City) AS total_city_entries, COUNT(DISTINCT(City)) as different_cities
    FROM uscities_table

""").show()



+------------------+----------------+
|total_city_entries|different_cities|
+------------------+----------------+
|              2891|             567|
+------------------+----------------+



Now taking out only the columns required with select. Next we are going to eliminate duplicate entries for cities by turning different ethnicity rows into different columns.

In [30]:
uscities_withrace_df=(uscities_df.select("city","state code","Total Population","Foreign-born", "Race","count").groupby(uscities_df.City, "state code", "Total Population", "Foreign-born").pivot("Race").agg(first("Count")))

In [31]:
uscities_withrace_df.count()

596

The difference in count and distinct(cities) can be attributed to different cities with the same name but in different states

In [32]:
uscities_withrace_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- state code: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- American Indian and Alaska Native: string (nullable = true)
 |-- Asian: string (nullable = true)
 |-- Black or African-American: string (nullable = true)
 |-- Hispanic or Latino: string (nullable = true)
 |-- White: string (nullable = true)



Changing Column names to make using Spark SQL easier in the future

In [33]:
uscities_withrace_df = uscities_withrace_df.withColumnRenamed("city","city_name")\
                    .withColumnRenamed("American Indian and Alaska Native","native_pop")\
                    .withColumnRenamed("state code","state_code")\
                    .withColumnRenamed("Total Population","total_pop")\
                    .withColumnRenamed("Asian","asian_pop")\
                    .withColumnRenamed("Black or African-American","aa_pop")\
                    .withColumnRenamed("Hispanic or Latino","hispanic_pop")\
                    .withColumnRenamed("White","white_pop")\
                    .withColumnRenamed("Foreign-born","foreign_born_pop")

uscities_withrace_df.show(2)

+----------+----------+---------+----------------+----------+---------+------+------------+---------+
| city_name|state_code|total_pop|foreign_born_pop|native_pop|asian_pop|aa_pop|hispanic_pop|white_pop|
+----------+----------+---------+----------------+----------+---------+------+------------+---------+
|Framingham|        MA|    71210|           19070|       849|     5993|  6944|       13000|    52205|
| Rock Hill|        SC|    71567|            2413|       610|     1073| 28204|        2845|    41652|
+----------+----------+---------+----------------+----------+---------+------+------------+---------+
only showing top 2 rows



Now we must change the string types to Integer for easier calculation in the future

In [34]:
uscities_withrace_df = uscities_withrace_df.select(col("city_name"), col("state_code"), col("total_pop").cast(IntegerType()),\
                            col("native_pop").cast(IntegerType()),\
                            col("asian_pop").cast(IntegerType()),\
                            col("aa_pop").cast(IntegerType()),\
                            col("hispanic_pop").cast(IntegerType()),\
                            col("white_pop").cast(IntegerType()),\
                            col("foreign_born_pop").cast(IntegerType()))

Printing out the data to see if everything is good to proceed

In [35]:
uscities_withrace_df.printSchema()
uscities_withrace_df.show(2)

root
 |-- city_name: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- total_pop: integer (nullable = true)
 |-- native_pop: integer (nullable = true)
 |-- asian_pop: integer (nullable = true)
 |-- aa_pop: integer (nullable = true)
 |-- hispanic_pop: integer (nullable = true)
 |-- white_pop: integer (nullable = true)
 |-- foreign_born_pop: integer (nullable = true)

+----------+----------+---------+----------+---------+------+------------+---------+----------------+
| city_name|state_code|total_pop|native_pop|asian_pop|aa_pop|hispanic_pop|white_pop|foreign_born_pop|
+----------+----------+---------+----------+---------+------+------------+---------+----------------+
|Framingham|        MA|    71210|       849|     5993|  6944|       13000|    52205|           19070|
| Rock Hill|        SC|    71567|       610|     1073| 28204|        2845|    41652|            2413|
+----------+----------+---------+----------+---------+------+------------+---------+---------------

Output the data from uscities_withrace_df into our datalake in the parquet format. It will be paritioned on the "state_code" column to make for more efficient reading later on.

In [36]:
if s3bool:
    uscities_withrace_df.write.option("header",True).partitionBy("state_code").mode("overwrite").parquet(s3bucket+"uscities")
else:
    uscities_withrace_df.write.option("header",True).partitionBy("state_code").mode("overwrite").parquet(localoutput+"uscities")

#### ETL on the US Crime Data

Reading the data from a csv file stored locally.

In [37]:
uscrime_df=spark.read.csv("./uscrime.csv", header=True)

Printing the schema to see what the csv file contains.

In [38]:
uscrime_df.printSchema()

root
 |-- report_year: string (nullable = true)
 |-- agency_code: string (nullable = true)
 |-- agency_jurisdiction: string (nullable = true)
 |-- population: string (nullable = true)
 |-- violent_crimes: string (nullable = true)
 |-- homicides: string (nullable = true)
 |-- rapes: string (nullable = true)
 |-- assaults: string (nullable = true)
 |-- robberies: string (nullable = true)
 |-- months_reported: string (nullable = true)
 |-- crimes_percapita: string (nullable = true)
 |-- homicides_percapita: string (nullable = true)
 |-- rapes_percapita: string (nullable = true)
 |-- assaults_percapita: string (nullable = true)
 |-- robberies_percapita: string (nullable = true)



Viewing the contents 

In [39]:
uscrime_df.limit(2).toPandas()

Unnamed: 0,report_year,agency_code,agency_jurisdiction,population,violent_crimes,homicides,rapes,assaults,robberies,months_reported,crimes_percapita,homicides_percapita,rapes_percapita,assaults_percapita,robberies_percapita
0,1975,NM00101,"Albuquerque, NM",286238,2383,30,181,1353,819,12,832.52,10.48,63.23,472.68,286.13
1,1975,TX22001,"Arlington, TX",112478,278,5,28,132,113,12,247.16,4.45,24.89,117.36,100.46


We need to first do proper conversion of strings to integer and double type

In [40]:
uscrime_df = uscrime_df.select(col("report_year").cast(IntegerType()), col("agency_code"), col("agency_jurisdiction"),\
                               col("population").cast(IntegerType()),\
                               col("violent_crimes").cast(IntegerType()),\
                               col("homicides").cast(IntegerType()),\
                               col("rapes").cast(IntegerType()),\
                               col("assaults").cast(IntegerType()),\
                               col("robberies").cast(IntegerType()),\
                               col("months_reported").cast(IntegerType()),\
                               col("crimes_percapita").cast(FloatType()),\
                               col("homicides_percapita").cast(FloatType()),\
                               col("rapes_percapita").cast(FloatType()),\
                               col("assaults_percapita").cast(FloatType()),\
                               col("robberies_percapita").cast(FloatType()))

In [41]:
uscrime_df.printSchema()
uscrime_df.limit(2).toPandas()


root
 |-- report_year: integer (nullable = true)
 |-- agency_code: string (nullable = true)
 |-- agency_jurisdiction: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- violent_crimes: integer (nullable = true)
 |-- homicides: integer (nullable = true)
 |-- rapes: integer (nullable = true)
 |-- assaults: integer (nullable = true)
 |-- robberies: integer (nullable = true)
 |-- months_reported: integer (nullable = true)
 |-- crimes_percapita: float (nullable = true)
 |-- homicides_percapita: float (nullable = true)
 |-- rapes_percapita: float (nullable = true)
 |-- assaults_percapita: float (nullable = true)
 |-- robberies_percapita: float (nullable = true)



Unnamed: 0,report_year,agency_code,agency_jurisdiction,population,violent_crimes,homicides,rapes,assaults,robberies,months_reported,crimes_percapita,homicides_percapita,rapes_percapita,assaults_percapita,robberies_percapita
0,1975,NM00101,"Albuquerque, NM",286238,2383,30,181,1353,819,12,832.52002,10.48,63.23,472.679993,286.130005
1,1975,TX22001,"Arlington, TX",112478,278,5,28,132,113,12,247.160004,4.45,24.889999,117.360001,100.459999


Now that our schema looks alright, we must seperate the agency_jurisdiction to make seperate city and state columns.

In [42]:
split_col = F.split(uscrime_df['agency_jurisdiction'], ', ')
uscrime_df = uscrime_df.withColumn('city_name', split_col.getItem(0))
uscrime_df = uscrime_df.withColumn('state_code', split_col.getItem(1))
uscrime_df = uscrime_df.drop('agency_jurisdiction')
uscrime_df.limit(2).toPandas()


Unnamed: 0,report_year,agency_code,population,violent_crimes,homicides,rapes,assaults,robberies,months_reported,crimes_percapita,homicides_percapita,rapes_percapita,assaults_percapita,robberies_percapita,city_name,state_code
0,1975,NM00101,286238,2383,30,181,1353,819,12,832.52002,10.48,63.23,472.679993,286.130005,Albuquerque,NM
1,1975,TX22001,112478,278,5,28,132,113,12,247.160004,4.45,24.889999,117.360001,100.459999,Arlington,TX


Re ordering the columns to bring city_name and state_code in front

In [43]:


uscrime_df = uscrime_df.select(["city_name","state_code","report_year","agency_code","population","violent_crimes",\
                                "homicides","rapes","assaults","robberies","months_reported","crimes_percapita","homicides_percapita",\
                                "rapes_percapita","assaults_percapita","robberies_percapita"])

Writing the table as a parquet into our datalake.

In [44]:
if s3bool:
    uscrime_df.write.option("header",True).mode("overwrite").parquet(s3bucket+"uscrime")
else:
    uscrime_df.write.option("header",True).mode("overwrite").parquet(localoutput+"uscrime")

### Step 6: Data Quality Checks and Data Dictionary

#### Data Quality Checks
We need to include a few data quality checks to ensure our pipeline is working properly.

**1. First we can check our uscrime data to see that there are no duplicate values entered.**

In [45]:
uscrime_df.groupBy("city_name","state_code","report_year").count().filter("count > 1").show()

+---------+----------+-----------+-----+
|city_name|state_code|report_year|count|
+---------+----------+-----------+-----+
+---------+----------+-----------+-----+



The empty results above show no duplicate value for any city for a particular year.

**2. Now let us check to see if there are any duplicate city values in our uscities data.**

In [46]:
uscities_withrace_df.groupBy("city_name","state_code").count().filter("count > 1").show()

+---------+----------+-----+
|city_name|state_code|count|
+---------+----------+-----+
+---------+----------+-----+



The empty results show no duplicate values for any city in our uscities date.

**3. Check to see no NULL value for the primary key in our main facts table.**

In [47]:
i94fact.where("cicid IS NULL").show()

+-----+------+-------+-------+-------+------+-------+-----+------+------+------------+--------------+-------------+
|cicid|i94res|i94port|i94mode|i94addr|i94bir|i94visa|count|gender|admnum|arrival_date|departure_date|arrival_month|
+-----+------+-------+-------+-------+------+-------+-----+------+------+------------+--------------+-------------+
+-----+------+-------+-------+-------+------+-------+-----+------+------+------------+--------------+-------------+



Empty results show no such row where cicid is NULL

#### Data Dictionary
Let us now include a data dictionary for all our tables.

1. i94fact
    * cicid - primary key of our fact table. a unique identifier for each row of data, stored as an integer.
    * i94res - this value identifies the country of residence of the traveller.
    * i94port - this value identifies the port of arrival of the traveller.
    * i94mode - the mode of transport of the traveller.
    * i94addr - the address mentioned by the traveller in the united states where he or she will be staying on their trip.
    * i94bir - the age of the traveller at the time of the trip.
    * i94visa - the visa type of the traveller.
    * count
    * gender
    * admnum - admission number assigned
    * arrival_date
    * departure_date
2. i94port
    * city_code - primary key to identify city
    * city_name
    * state_code
3. i94cit_res
    * res_code - primary key to identify the country of original residence
    * country_of_residence - string value
4. i94mode
    * i94mode_code - primary key
    * mode_of_travel - mode of travel in string value
5. i94addr
    * i94addr_code - primary key
    * residence_address
6. i94visa
    * visa_code - primary key
    * visa_type - type visa - either business, pleasure or student
7. uscities
    * city_name - name of the city in string
    * state_code - 2 letter code for the state
    * total_pop - total population of the city
    * native_pop - population of native americans
    * asian_pop - population of asian americans
    * aa_pop - population of african americans
    * hispanic_pop - population of hispanics
    * white_pop - population of whites
    * foreign_born_pop - population of residents of the city who were born outside the United States
8. uscrime
    * city_name 
    * state_code
    * report_year - the year for which statistics are given
    * agency_code - the agency which reported the statistics
    * population
    * violent_crimes - number of violent crimes in the year
    * homicides - number of homicide crimes
    * rapes - number of rapes
    * assaults - number of assaults
    * robberies - number of robberies
    * months_reported - the number of months in a year for which the data is reported
    * crimes_percapita - total crimes divided by the population
    * homicides_percapita
    * rapes_percapita
    * robberies_percapita

### Step 7: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

    - For our dataengineering capstone project. We have designed a datalake stored on either Amazon's S3 service or a local folder depending on data requirements. We have used Apache Spark to transform and wrangle our data due to the large size of the fact table. (More than 3 million rows). Apache Spark being a distributed software can easily handle large data and perform much better than pandas library.
    - Moreover, in our datalake all files have been stored in the parquet format.

* Propose how often the data should be updated and why.
    - Taking a general use case, the fact table can be updated once or twice a day depending on the volume of events generated. The frequency can also be increased as per business demand.

* Write a description of how you would approach the problem differently under the following scenarios:

 * The data was increased by 100x.
    - If the data were to be increased by 100x then the Apache Spark program should be run on Amazon's EMR cluster on AWS. The number of nodes should be high as well as their processing power. This will ensure fast wrangling of the data.
    - Also, the "s3bool" trigger should be set to True, this will write the output to a datalake on Amazon's S3 service which has near unlimited storage.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - If the data must be updated every day by 7am then the best solution would be to run the commands on an Apache Airflow dashboard.
    - Different functions performed in this notebook can be divided into tasks and subtasks. Each of these can be scheduled as per their dependencies on AirFlow. Moreover, Airflow will also allow us to parallelize different functions that are not dependent on each other. Another useful feature of AirFlow is email reminders to update us on the status of the pipeline operation.

 * The database needed to be accessed by 100+ people.
 
    - An S3 data lake is the best way to make the data accessible for a large number of people. If we were to instead populate an RDBS on Amazon Redshift, we would incur high cost of data transfer. Moreover their will be a slowing down of each query due to overload.
    - Plus we will also create IAM roles for different departments on AWS to ensure segregation of policies.