# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [94]:
# Do all imports and installs here
import pandas as pd
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, row_number, lit
#from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F
from pyspark.sql import types 
import configparser
import psycopg2

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
1. Data: The scope of this project, is to gather immigration data based on state level, to count how many immigration happened for 2016 in each month, and provide the possibility to analyze how the number is related to the state population/ household size/ median age etc.
2. End solution: I collected data using pyspark as it is a big data set, and then do data cleansing+transformation using pyspark dataframe. After that, I uploaded the datasets to AWS S3 to store the data, and then I copied the data from S3 to AWS Redshift, to have clean data structure on a data warehouse, and the data would be ready to use to perform analysis.
3. Tools included: Spark, AWS EMR cluster, S3 and Redshift.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 
1. Dataset 1: SAS I94 Immigration Data: This data comes from the US National Tourism and Trade Office. Data provided by udacity course workspace. The dataset includes info about when and where has happened immigration, and also some info like gender, visa type on the immigrants data.
2. Dataset 2: U.S. City Demographic Data: This data comes from OpenSoft. Data provided by udacity course workspace. The dataset includes info about states in the US, and how much the population is/ what the median age(etc) for each state/city.
3. Both dataset have state_code(i94addr in immigration dataset), that can be used to join the 2 datasets.

#### 1. Read immigration data

In [104]:
# Read immigration sample data and look at the data structure.
df = pd.read_csv("immigration_data_sample.csv")

In [105]:
df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [2]:
# Read AWS configuration
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
# Build spark session

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0").\
enableHiveSupport().getOrCreate()
# df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat')


In [26]:
# Read all months immigration data
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

    
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)
i = 0
files = ['../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_oct16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat',
         '../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat']
for file in files:
    if(i==0):
        df = spark.read.format('com.github.saurfang.sas.spark').load(file)
        cols= df.columns
        print(df.count())
    if(i>0):
        df = unionAll(df,spark.read.format('com.github.saurfang.sas.spark').load(file).select([col for col in cols]))
        print(df.count())
    i = i+1
df.count()

2847924
5418467
8575539
11671852
15116101
18691090
22956121
27059691
30793477
34442613
37357539
40790529


40790529

In [48]:
df.count()

9997

In [4]:
# Read temp aggregated immigration data from local file, as the loading time for above full data set is too long.
# df = spark.read.parquet("sas_data_new")
# df.count()

9997

In [9]:
df.describe

<bound method DataFrame.describe of DataFrame[i94yr: double, i94mon: double, i94addr: string, gender: string, count: bigint]>

#### 2. Read demographic data

In [5]:
# Read data from project workspace
dem_df = spark.read.csv("us-cities-demographics.csv", sep=';', header=True)

In [9]:
dem_df.head(5)

[Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924'),
 Row(City='Quincy', State='Massachusetts', Median Age='41.0', Male Population='44129', Female Population='49500', Total Population='93629', Number of Veterans='4147', Foreign-born='32935', Average Household Size='2.39', State Code='MA', Race='White', Count='58723'),
 Row(City='Hoover', State='Alabama', Median Age='38.5', Male Population='38040', Female Population='46799', Total Population='84839', Number of Veterans='4819', Foreign-born='8229', Average Household Size='2.58', State Code='AL', Race='Asian', Count='4759'),
 Row(City='Rancho Cucamonga', State='California', Median Age='34.5', Male Population='88127', Female Population='87105', Total Population='175232', Number of Veterans='5821', Foreign-born='3387

In [98]:
dem_df.count()

2891

In [5]:
dem_df.describe

<bound method DataFrame.describe of DataFrame[City: string, State: string, Median Age: string, Male Population: string, Female Population: string, Total Population: string, Number of Veterans: string, Foreign-born: string, Average Household Size: string, State Code: string, Race: string, Count: string]>

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data.
1. Drop duplicated data for both datasets.
2. As the main join would be performed on state_code/i94addr, so need to drop all null values in both datasets to avoid invalid join.
3. Aggregation on dataset of immigration: Check how many immigrations happened group by year, month, state code and gender
4. Change column types in both datasets to match the real values in the columns.
5. Change column names to a good format, as S3 doesn't allow column names to have space in between.

#### 1. Drop duplicates for immigration data

In [38]:
# Drop duplicate data in immigration dataset
df = df.dropDuplicates()

In [40]:
# Drop null values of stats code in immigration dataset
df = df.filter(df['i94addr'].isNotNull())

In [6]:
# Change column type
df = df.withColumn("i94yr", df["i94yr"].cast(types.IntegerType())).withColumn("i94mon", df["i94mon"].cast(types.IntegerType()))

In [7]:
df.describe

<bound method DataFrame.describe of DataFrame[i94yr: int, i94mon: int, i94addr: string, gender: string, count: bigint]>

In [13]:
# Data aggregation for immigration dataset
df = df.groupBy('i94yr','i94mon','i94addr','gender').count()
df.show()

+------+------+-------+------+-----+
| i94yr|i94mon|i94addr|gender|count|
+------+------+-------+------+-----+
|2016.0|   1.0|     US|     F|    1|
|2016.0|   1.0|     FI|     M|    1|
|2016.0|   1.0|     CU|     F|    1|
|2016.0|   2.0|     ME|     M|    1|
|2016.0|   2.0|     IQ|     F|    1|
|2016.0|   2.0|     TS|     F|    1|
|2016.0|   3.0|     AR|     M|    1|
|2016.0|   3.0|     NJ|     U|    1|
|2016.0|   4.0|     VA|     M|    1|
|2016.0|   4.0|     BO|     M|    1|
|2016.0|   4.0|     TC|     M|    1|
|2016.0|   4.0|     NM|     U|    1|
|2016.0|   5.0|     CD|     F|    1|
|2016.0|   5.0|     CE|     F|    1|
|2016.0|   5.0|     WV|     X|    1|
|2016.0|   7.0|     UH|     F|    1|
|2016.0|   8.0|     CT|     M|    1|
|2016.0|   8.0|     NT|     M|    1|
|2016.0|   9.0|     DL|     F|    1|
|2016.0|   9.0|      A|     M|    1|
+------+------+-------+------+-----+
only showing top 20 rows



In [14]:
df.count()

9997

In [37]:
# Save aggregated immigration data to local path to read next time
#df.write.save("sas_data_new", format = "parquet")

In [42]:
# Drop duplicate data in demographics dataset
dem_df = dem_df.dropDuplicates()
dem_df.count()

2891

In [43]:
# Drop null state code values in demographics dataset
dem_df = dem_df.filter(dem_df['State Code'].isNotNull())
dem_df.count()

2891

In [8]:
# Rename column names to better format
dem_df = dem_df.withColumnRenamed("Median Age","MedianAge").withColumnRenamed("Male Population","MalePopulation").withColumnRenamed("Female Population","FemalePopulation")\
.withColumnRenamed("Total Population","TotalPopulation").withColumnRenamed("Number of Veterans","NumberofVeterans").withColumnRenamed("Foreign-born","ForeignBorn")\
.withColumnRenamed("Average Household Size","AverageHouseholdSize").withColumnRenamed("State Code","StateCode")

In [9]:
# Change column types according to values
dem_df = dem_df.withColumn("MedianAge", dem_df["MedianAge"].cast(types.DoubleType())).withColumn("MalePopulation", dem_df["MalePopulation"].cast(types.LongType()))\
.withColumn("FemalePopulation", dem_df["FemalePopulation"].cast(types.LongType())).withColumn("TotalPopulation", dem_df["TotalPopulation"].cast(types.LongType()))\
.withColumn("NumberofVeterans", dem_df["NumberofVeterans"].cast(types.LongType())).withColumn("ForeignBorn", dem_df["ForeignBorn"].cast(types.LongType()))\
.withColumn("AverageHouseholdSize", dem_df["AverageHouseholdSize"].cast(types.DoubleType())).withColumn("Count", dem_df["Count"].cast(types.LongType()))

In [10]:
dem_df.describe

<bound method DataFrame.describe of DataFrame[City: string, State: string, MedianAge: double, MalePopulation: bigint, FemalePopulation: bigint, TotalPopulation: bigint, NumberofVeterans: bigint, ForeignBorn: bigint, AverageHouseholdSize: double, StateCode: string, Race: string, Count: bigint]>

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model
1. Fact table: Immigration cound based on year month, and gender.
2. Dimension table: Cities, states in the US and the population, median age,NumberofVeterans, ForeignBorn, AverageHouseholdSize. I kept dimension table at city level, to have more flexibility in the future, but when doing joins in between, I used states level aggregation.
3. 2 tables joined by i94addr in Immigration data and state code in demographics data.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
1. Upload cleaned dataset to S3 using spark + EMR cluster.
2. Create tables on Redshift.
3. Copy data from S3 to Redshift tables.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### 1. Upload data to S3 using EMR cluster

In [13]:
# Define S3 bucket path
output_data = "s3a://udacityprojectwencapstone/"

In [48]:
df.head()

Row(i94yr=2016, i94mon=10, i94addr='SF', gender=None, count=47)

In [14]:
# Write immigration data to S3 in parquet format
df.write.mode("overwrite").parquet(output_data + "immigration/")

In [15]:
# Write demographic data to S3 in parquet format
dem_df.write.mode("overwrite").parquet(output_data + "demographic/")

#### 2. S3 to Redshift

In [72]:
# Read configuration of AWS Redshift
config = configparser.ConfigParser()
config.read('dl.cfg')
# Build connection to redshift and be ready to execute operations
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [60]:
# Immigration table drop/creation sqls
immigration_table_drop = "Drop table if exists immigration"
immigration_table_create= ("""
CREATE TABLE immigration
(
  immi_id  bigint identity(0, 1) primary key,
  i94yr integer NULL,
  i94mon       integer NULL,
  i94addr       VARCHAR(64) NULL,
  gender       VARCHAR(64) NULL,
  count     BIGINT NULL
);""")

In [61]:
# Demographic table drop/creation sqls
demographic_table_drop = "Drop table if exists demographic"
demographic_table_create= ("""
CREATE TABLE demographic 
(
  city_id   bigint identity(0, 1) primary key,
  city      VARCHAR(256) NULL,
  state     VARCHAR(256) NULL,
  median_age    FLOAT NULL,
  male_population     BIGINT NULL,
  female_population     BIGINT NULL,
  total_population BIGINT NULL,
  number_of_veterans BIGINT NULL,
  foreign_born BIGINT NULL,
  average_household_size FLOAT NULL,
  state_code VARCHAR(256) NULL,
  race VARCHAR(256) NULL,
  count BIGINT NULL
);""")

In [62]:
drop_table_queries = [immigration_table_drop, demographic_table_drop]
create_table_queries = [immigration_table_create, demographic_table_create]

In [63]:
# Excecute tables drop
for query in drop_table_queries:
    cur.execute(query)
    conn.commit()

In [64]:
# Excecute tables creation
for query in create_table_queries:
    cur.execute(query)
    conn.commit()

In [65]:
# Read S3 path and IAM role
IMMI_DATA=config.get('S3','IMMI_DATA')
DEM_DATA=config.get('S3','DEM_DATA')
DWH_ROLE_ARN=config.get('IAM_ROLE','ARN')

In [73]:
# S3 copy to redshift command
immigration_copy = """
    copy immigration from {}
    credentials 'aws_iam_role={}'
    FORMAT AS PARQUET;
""".format(IMMI_DATA,DWH_ROLE_ARN)

In [74]:
# S3 copy to redshift command
demographic_copy = """
    copy demographic from {}
    credentials 'aws_iam_role={}'
    FORMAT AS PARQUET;
""".format(DEM_DATA,DWH_ROLE_ARN)

In [75]:
# Execute S3 copy
cur.execute(immigration_copy)
conn.commit()

In [76]:
# Execute S3 copy
cur.execute(demographic_copy)
conn.commit()

In [77]:
# Closed connection to redshift
conn.close()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
 1. Primary key has been created for incremental values in the table creation.
 2. SQL Qyeries about possible analysis.
 3. Count table records, should be more than 0.
 
Run Quality Checks

In [78]:
# Perform quality checks here
%load_ext sql

In [83]:
%sql postgresql://awsuser:Passw0rd@redshift-cluster-1.ctagmngdnkvt.us-west-2.redshift.amazonaws.com:5439/dev

'Connected: awsuser@dev'

In [84]:
# Check if records >0 in immigration table
%sql SELECT COUNT(*) FROM IMMIGRATION;

 * postgresql://awsuser:***@redshift-cluster-1.ctagmngdnkvt.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
9997


In [85]:
# Check if records >0 in demographic table
%sql SELECT COUNT(*) FROM DEMOGRAPHIC;

 * postgresql://awsuser:***@redshift-cluster-1.ctagmngdnkvt.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
2891


In [99]:
# Possible analysis 1- See in 2016, which month has highest immigration.
%sql select i94yr, i94mon, sum(count) from immigration group by i94yr, i94mon order by sum(count) desc;

 * postgresql://awsuser:***@redshift-cluster-1.ctagmngdnkvt.us-west-2.redshift.amazonaws.com:5439/dev
12 rows affected.


i94yr,i94mon,sum
2016,7,4075542
2016,8,3919108
2016,9,3567083
2016,10,3487868
2016,6,3388925
2016,5,3271548
2016,12,3266165
2016,3,2993671
2016,4,2943721
2016,11,2756885


In [103]:
# Possible analysis 2- Check the relationship between total immigration group by states in 2016, compare to total population in the same states.
%%sql
select * from (select i94addr, sum(count) as total_immi from immigration a group by i94addr)a 
left join (select state_code, sum(total_population) as total_population from demographic group by state_code) b 
on a.i94addr = b.state_code
where b.state_code is not null order by a.total_immi desc;

 * postgresql://awsuser:***@redshift-cluster-1.ctagmngdnkvt.us-west-2.redshift.amazonaws.com:5439/dev
49 rows affected.


i94addr,total_immi,state_code,total_population
FL,8156192,FL,32306132
NY,6764396,NY,49002055
CA,6531491,CA,123444353
HI,2338444,HI,1763830
TX,1690521,TX,70553853
NV,1387457,NV,11203720
IL,1085621,IL,22514390
MA,1057261,MA,9997045
NJ,993293,NJ,6931024
WA,822812,WA,12500535


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

[Immigration table]:
1. immi_id: bigint (Primary key to identify the only record in the table)
2. i94yr: int (Year)
3. i94mon: int (Month)
4. **i94addr: string (State code, for example: New York = NY)**
5. gender: string (Gender indication)
6. count: bigint (How many immigrations happened)

[Demographic table]:
1. city_id: bigint (Primary key)
2. City: string (City name in the US)
3. State: string (states in the US)
4. MedianAge: double (MedianAge of the city)
5. MalePopulation: bigint (Male Population of the city)
6. FemalePopulation: bigint (Female Population of the city)
7. TotalPopulation: bigint (Total Population of the city)
8. NumberofVeterans: bigint (Number of Veterans of the city)
9. ForeignBorn: bigint (ForeignBorn numbers of the city)
10. AverageHouseholdSize; double (Average HouseholdSize of the city)
11. **StateCode: string (State code in the US, used as join field with immigration table)**
12. Race: string (People race)
13. Count: bigint (Count of the record)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### 1. Clearly state the rationale for the choice of tools and technologies for the project

1. Spark: The data set of full year 2016 immigration data has 40790529 records, using spark can significantly reduce the data processing time.
2. AWS EMR cluster: To corporate with spark to upload the data to S3 using cloud server instead of local computer.
3. S3: Data lake storage, to store the data on AWS.
4. Redshift: To store the structured tables, and perform queries to do analysis.

#### 2. Propose how often the data should be updated and why.

The data update frequency could be monthly, as the least granularity of time is month in this data structure.

#### 3. Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. If the data was increased by 100x, then I would still use spark+EMR, but I will provision more nodes in EMR cluster, so it has better persormance. Also I will use parallel data handling when copy data from S3 to Redshift.

2. I will use airflow to realize the schedule, as airflow is a popular and good tool to schedule flows, and has good integration with AWS.

3. The Redshift can handle max 2000 connections, so 100+ people won't be a problem to Redshift. However, it will need to assign IAM users to different people, and shall have good access/permission control following the least permission granted rules. So need be really carefull when assign edit permission to people.