In [1]:
# Github URL: https://github.com/ahmemam/Udacity_Capstone_Project

### Data Engineering Capstone Project

#### Project Summary
The project joins multiple datasets (I94 Immigration Data, U.S. City Demographic Data and Airport Codes) into one data warehouse with a star schema in order to make it easier for further analysis.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import substring, length, col, expr
from sqlalchemy import create_engine
import configparser
import psycopg2

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

The project utilizes the data coming from variuos sources -listed below- to establish a data warehouse that can be used in analytics and collecting insights and metrics.

Datasets:
1. I94 Immigration Data.
2. U.S. City Demographic Data.
3. I94_SAS_Labels_Descriptions Data.

Tools used:
1. Pandas Libraries, to manipulate data.
2. PySpark, to process data in large scale efficintly. 
3. SqlAlchemy, to send data to AWS Redshift.
4. psycopg2, to query AWS Redshift.
5. AWS Redshift, to store the data warehouse.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?

##### 1. I94 Immigration Data:
    
    Contains info about immigrants; arrive date, departure date, type of visa, city code and other data, provided by US National Tourism and Trade Office, the file type is CSV.


##### 2. U.S. City Demographic Data:
    
    Contains info about the demographics of all US cities and census-designated places with a population greater or equal to 65,000, provided by the US Census Bureau's 2015 American Community Survey, the file type is CSV.

##### 3. I94_SAS_Labels_Descriptions Data:
    
    Contains Countries, states, Airport and cities codes and names.

In [3]:
# Read in the data here
df_img = pd.read_csv("immigration_data_sample.csv")
df_img.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
df_dmg = pd.read_csv("us-cities-demographics.csv", delimiter=";")
df_dmg.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [6]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

1. Removing rows with Null values on all columns.
2. Removing Duplicated rows.
3. Transforming Dates (arrdate, depdate) from Immigration table.
4. Parsing SAS file to get Country_Code, City_Code and State_Code.
5. Change letters case of "City" and "State" columns in "Demographics" table to upper case to match the data from SAS file.
6. Fill NaN values with 0.

In [7]:
# Performing cleaning tasks here
# 1- Removing rows with Null values on all columns.
# 2- Removing Duplicated rows.
# Immigration Table
df_img = df_img.dropna(how='all').drop_duplicates()

In [8]:
# 1- Removing rows with Null values on all columns.
# 2- Removing Duplicated rows.
# Demographics Table
df_dmg = df_dmg.dropna(how='all').drop_duplicates()

In [9]:
# 3- Transforming Dates (arrdate, depdate) from Immigration table.
df_img['arrdate'] = pd.to_timedelta(df_img['arrdate'], unit='D') + pd.Timestamp('1960-1-1')
df_img['depdate'] = pd.to_timedelta(df_img['depdate'], unit='D') + pd.Timestamp('1960-1-1')
df_img.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,2016-04-23,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,2016-04-07,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,2016-04-28,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,2016-04-06,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [10]:
# 4- Parsing SAS file to get Country_Code, City_Code and State_Code.
with open("I94_SAS_Labels_Descriptions.SAS") as file:
    SAS = file.readlines()
    
country_code = {}
for countries in SAS[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country
    
airport_code = {}
for airports in SAS[303:962]:
    pair = airports.split('=')
    code, city = pair[0].strip("\t").strip().strip("'"), pair[1].strip('\t').strip().strip("''")
    airport_code[code] = city
    
state_code = {}
for states in SAS[982:1036]:
    pair = states.split('=')
    code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    state_code[code] = state

In [11]:
df_country_code = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
df_country_code.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [12]:
df_airport_code = pd.DataFrame(list(airport_code.items()), columns=['code', 'city'])
df_airport_code.head(5)

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"


In [13]:
df_state_code = pd.DataFrame(list(state_code.items()), columns=['code', 'state'])
df_state_code.head(5)

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


In [14]:
# 5- Change letters case of "City" and "State" columns in "Demographics" table to upper case to match the data from SAS file.
df_dmg['City'] = df_dmg['City'].str.upper()
df_dmg['State'] = df_dmg['State'].str.upper()
df_dmg.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,SILVER SPRING,MARYLAND,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,QUINCY,MASSACHUSETTS,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,HOOVER,ALABAMA,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,NEWARK,NEW JERSEY,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [15]:
# 6-Fill NaN values with zero
# df_img
df_img['insnum'] = df_img['insnum'].fillna(0)
df_img.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,...,,M,1955.0,7202016,F,0.0,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,2016-04-23,1.0,TX,...,,M,1990.0,10222016,M,0.0,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,2016-04-07,1.0,FL,...,,M,1940.0,7052016,M,0.0,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,2016-04-28,1.0,CA,...,,M,1991.0,10272016,M,0.0,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,2016-04-06,3.0,NY,...,,M,1997.0,7042016,F,0.0,,42322570000.0,LAND,WT


In [16]:
# 7-Fill NaN values with zero
# df_dmg
df_dmg['Male Population'] = df_dmg['Male Population'].fillna(0)
df_dmg['Female Population'] = df_dmg['Female Population'].fillna(0)
df_dmg['Number of Veterans'] = df_dmg['Number of Veterans'].fillna(0)
df_dmg['Foreign-born'] = df_dmg['Foreign-born'].fillna(0)
df_dmg.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,SILVER SPRING,MARYLAND,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,QUINCY,MASSACHUSETTS,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,HOOVER,ALABAMA,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,NEWARK,NEW JERSEY,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [17]:
# Pivot 'Race' column for each 'State' and sum up 'Count' column
df_dmg_pivot = df_dmg.pivot_table(index='State', columns='Race', values='Count', aggfunc='sum')
df_dmg_pivot.head()

Race,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
State,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
ALABAMA,8084.0,28769.0,521068.0,39313.0,498920.0
ALASKA,36339.0,36825.0,23107.0,27261.0,212696.0
ARIZONA,129708.0,229183.0,296222.0,1508157.0,3591611.0
ARKANSAS,9381.0,22062.0,149608.0,77813.0,384733.0
CALIFORNIA,401386.0,4543730.0,2047009.0,9856464.0,14905129.0


In [18]:
# Convert pivot to dataframe and remove axis name
df_dmg_table = df_dmg_pivot.reset_index()
df_dmg_table = df_dmg_table.rename_axis(None, axis=1)
df_dmg_table.head()

Unnamed: 0,State,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,ALABAMA,8084.0,28769.0,521068.0,39313.0,498920.0
1,ALASKA,36339.0,36825.0,23107.0,27261.0,212696.0
2,ARIZONA,129708.0,229183.0,296222.0,1508157.0,3591611.0
3,ARKANSAS,9381.0,22062.0,149608.0,77813.0,384733.0
4,CALIFORNIA,401386.0,4543730.0,2047009.0,9856464.0,14905129.0


In [19]:
# Group by 'State' on the original demograpics dataframe (df_dmg) after dropping {'Race', 'Count' and 'State Code'} columns
# and sum up the rest of columns
df_dmg_sub = df_dmg[['State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans',\
'Foreign-born', 'Average Household Size']]
df_dmg_sub = df_dmg_sub.groupby(by='State').sum()
df_dmg_sub.head()

Unnamed: 0_level_0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size
State,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
ALABAMA,1229.5,2448200.0,2715106.0,5163306,352896.0,252541.0,82.62
ALASKA,161.0,764725.0,728750.0,1493475,137460.0,166290.0,13.85
ARIZONA,2803.0,11137275.0,11360435.0,22497710,1322525.0,3411565.0,221.95
ARKANSAS,949.4,1400724.0,1482165.0,2882889,154390.0,307753.0,73.28
CALIFORNIA,24453.6,61055672.0,62388681.0,123444353,4617022.0,37059662.0,2092.44


In [20]:
# Join df_dmg_table and df_dmg_sub
df_dmg_new = df_dmg_sub.merge(df_dmg_table, on='State', how='outer')
df_dmg_new.head()

Unnamed: 0,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,ALABAMA,1229.5,2448200.0,2715106.0,5163306,352896.0,252541.0,82.62,8084.0,28769.0,521068.0,39313.0,498920.0
1,ALASKA,161.0,764725.0,728750.0,1493475,137460.0,166290.0,13.85,36339.0,36825.0,23107.0,27261.0,212696.0
2,ARIZONA,2803.0,11137275.0,11360435.0,22497710,1322525.0,3411565.0,221.95,129708.0,229183.0,296222.0,1508157.0,3591611.0
3,ARKANSAS,949.4,1400724.0,1482165.0,2882889,154390.0,307753.0,73.28,9381.0,22062.0,149608.0,77813.0,384733.0
4,CALIFORNIA,24453.6,61055672.0,62388681.0,123444353,4617022.0,37059662.0,2092.44,401386.0,4543730.0,2047009.0,9856464.0,14905129.0


In [21]:
# Join df_dmg_new and df_dmg orignal dataframe to get rest of the columns ('State_Code')
df_dmg_temp = df_dmg[['State', 'State Code']].drop_duplicates()
df_dmg_new = df_dmg_new.merge(df_dmg_temp, on= 'State', how= 'outer')
df_dmg_new.head()

Unnamed: 0,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White,State Code
0,ALABAMA,1229.5,2448200.0,2715106.0,5163306,352896.0,252541.0,82.62,8084.0,28769.0,521068.0,39313.0,498920.0,AL
1,ALASKA,161.0,764725.0,728750.0,1493475,137460.0,166290.0,13.85,36339.0,36825.0,23107.0,27261.0,212696.0,AK
2,ARIZONA,2803.0,11137275.0,11360435.0,22497710,1322525.0,3411565.0,221.95,129708.0,229183.0,296222.0,1508157.0,3591611.0,AZ
3,ARKANSAS,949.4,1400724.0,1482165.0,2882889,154390.0,307753.0,73.28,9381.0,22062.0,149608.0,77813.0,384733.0,AR
4,CALIFORNIA,24453.6,61055672.0,62388681.0,123444353,4617022.0,37059662.0,2092.44,401386.0,4543730.0,2047009.0,9856464.0,14905129.0,CA


In [22]:
# Rename df_dmg_new
df_dmg_new.columns = ['state', 'median_age', 'male_population', 'female_population', 'total_population', 'number_of_veterans',\
'foreign-born', 'avg_household_size', 'american_indian_and_alaska_native', 'asian', 'black_or_african-american', 'hispanic_or_latino',\
'white', 'state_code']
df_dmg_new.head()

Unnamed: 0,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign-born,avg_household_size,american_indian_and_alaska_native,asian,black_or_african-american,hispanic_or_latino,white,state_code
0,ALABAMA,1229.5,2448200.0,2715106.0,5163306,352896.0,252541.0,82.62,8084.0,28769.0,521068.0,39313.0,498920.0,AL
1,ALASKA,161.0,764725.0,728750.0,1493475,137460.0,166290.0,13.85,36339.0,36825.0,23107.0,27261.0,212696.0,AK
2,ARIZONA,2803.0,11137275.0,11360435.0,22497710,1322525.0,3411565.0,221.95,129708.0,229183.0,296222.0,1508157.0,3591611.0,AZ
3,ARKANSAS,949.4,1400724.0,1482165.0,2882889,154390.0,307753.0,73.28,9381.0,22062.0,149608.0,77813.0,384733.0,AR
4,CALIFORNIA,24453.6,61055672.0,62388681.0,123444353,4617022.0,37059662.0,2092.44,401386.0,4543730.0,2047009.0,9856464.0,14905129.0,CA


In [23]:
# Join df_img with df_country_code
df_img_temp = df_img[['cicid', 'i94cit', 'i94res']]
df_img_temp = df_img_temp.rename(columns={'i94cit':'code'})
df_img_temp = df_img_temp.astype({'code':'int', 'i94res':'int'})
df_country_code = df_country_code.astype({'code':'int'})
df_img_temp = df_img_temp.merge(df_country_code, on='code', how='outer')
df_img_temp = df_img_temp[['cicid', 'i94res', 'country']]
df_img_temp = df_img_temp.rename(columns={'i94res':'code', 'country':'citizenship_country'})
df_img_temp = df_img_temp.merge(df_country_code, on='code', how='outer')
df_img_temp = df_img_temp[['cicid', 'citizenship_country', 'country']]
df_img_temp = df_img_temp.rename(columns={'i94res':'code', 'country':'resiedence_country'})
df_img = df_img.merge(df_img_temp, on='cicid', how='outer')
df_img.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,citizenship_country,resiedence_country
0,2027561.0,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,...,1955.0,7202016,F,0.0,JL,56582670000.0,00782,WT,JAPAN,JAPAN
1,2171295.0,4422636.0,2016.0,4.0,582.0,582.0,MCA,2016-04-23,1.0,TX,...,1990.0,10222016,M,0.0,*GA,94362000000.0,XBLNG,B2,,
2,589494.0,1195600.0,2016.0,4.0,148.0,112.0,OGG,2016-04-07,1.0,FL,...,1940.0,7052016,M,0.0,LH,55780470000.0,00464,WT,,GERMANY
3,2631158.0,5291768.0,2016.0,4.0,297.0,297.0,LOS,2016-04-28,1.0,CA,...,1991.0,10272016,M,0.0,QR,94789700000.0,00739,B2,QATAR,QATAR
4,3032257.0,985523.0,2016.0,4.0,111.0,111.0,CHM,2016-04-06,3.0,NY,...,1997.0,7042016,F,0.0,,42322570000.0,LAND,WT,FRANCE,FRANCE


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The data model uses star schema (Kimball model) to use the output in Analytics.

![data_model](dataModel.png)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Reading the data from the sources.
2. Cleaning the raw data.
3. Tranforming the data (Removing Nulls and Duplicates, transforming Dates, Parsing SAS file, etc.).
4. Creating the tables according to the star schema data model.
    1. Creating the fact and dimension dataframes out of the original dataframes.
    2. Changing the column names to a more understandable names.
5. Filling the AWS Redshift cluster DBtables with the transformed data.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [24]:
# Configuring AWS Redshift
config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

In [25]:
conn = create_engine('postgresql://{}:{}@{}'\
.format(config['CLUSTER']['DB_USER'], config['CLUSTER']['DB_PASSWORD'], config['CLUSTER']['DB_URL']))

In [26]:
# 1. Creating the fact table fact_immigration out of the original dataframe.
# 2. Changing the column names to a more understandable names.
# Filling the AWS Redshift cluster DBtables with the transformed data.
#fact_immigration Table
fact_img = df_img[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr']]
# fact_img = fact_img.astype({'cicid':'int', 'i94yr':'int', 'i94mon':'int'})
fact_img.columns = ['cicid', 'year', 'month', 'city_code', 'state_code']
fact_img.to_sql('fact_immigration', conn, index=False, if_exists='replace')

In [27]:
# 1. Creating the dimension table dim_immigration_citizen out of the original dataframe.
# 2. Changing the column names to a more understandable names.
# Filling the AWS Redshift cluster DBtables with the transformed data.
#dim_img_citizen
dim_img_citizen = df_img[['cicid', 'citizenship_country', 'resiedence_country', 'biryear', 'gender', 'insnum']]
# dim_img_citizen = dim_img_citizen.astype({'cicid':'int', 'i94cit':'int', 'i94res':'int', 'biryear':'int', 'insnum':'int'})
dim_img_citizen.columns = ['cicid', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']
dim_img_citizen.to_sql('dim_immigration_citizen', conn, index=False, if_exists='replace')

In [28]:
# 1. Creating the dimension table dim_date out of the original dataframe.
# 2. Changing the column names to a more understandable names.
# Filling the AWS Redshift cluster DBtables with the transformed data.
#dim_date
dim_date = df_img[['cicid', 'arrdate', 'depdate']]
dim_date.columns = ['cicid', 'arrive_date', 'departure_date']
dim_date.to_sql('dim_date', conn, index=False, if_exists='replace')

In [29]:
# 1. Creating the dimension table dim_visa out of the original dataframe.
# 2. Changing the column names to a more understandable names.
# Filling the AWS Redshift cluster DBtables with the transformed data.
#dim_visa
dim_visa = df_img[['cicid', 'i94mode', 'i94visa']]
dim_visa.columns = ['cicid', 'mode', 'visa']
dim_visa.to_sql('dim_visa', conn, index=False, if_exists='replace')

In [30]:
# 1. Creating the dimension table dim_demographics_population out of the original dataframe.
# 2. Changing the column names to a more understandable names.
# Filling the AWS Redshift cluster DBtables with the transformed data.
#dim_dmg_popultion
dim_dmg_pop = df_dmg_new[['state_code', 'state','male_population', 'female_population', 'total_population', 'number_of_veterans',\
'foreign-born','american_indian_and_alaska_native', 'asian', 'black_or_african-american', 'hispanic_or_latino', 'white']]
dim_dmg_pop.to_sql('dim_demographics_population', conn, index=False, if_exists='replace')

In [31]:
# 1. Creating the dimension table dim_demographics_statistics out of the original dataframe.
# 2. Changing the column names to a more understandable names.
# Filling the AWS Redshift cluster DBtables with the transformed data.
#dim_dmg_statistics
dim_dmg_stat = df_dmg_new[['state_code', 'state', 'median_age', 'avg_household_size']]
dim_dmg_stat.to_sql('dim_demographics_statistics', conn, index=False, if_exists='replace')

In [32]:
# 1. Creating the dimension table dim_city out of the original dataframe.
# 2. Changing the column names to a more understandable names.
# Filling the AWS Redshift cluster DBtables with the transformed data.
#dim_city_code
dim_city = df_airport_code[['code', 'city']]
# dim_city['state_code'] = dim_city['city'].str.split(',').str[1]
# dim_city['city'] = dim_city['city'].str.split(',').str[0]
dim_city.to_sql('dim_airport_code', conn, index=False, if_exists='replace')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

1. Integrity, check if joinning the fact table's primary key with the dimensions table's primary Keys gives correct values,
    the process goes by droping all join matches, if the result is empty, then the data follows Integrity constraints. 
2. Redshift tables are not empty.

In [33]:
connec = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = connec.cursor()

In [34]:
# 1- Integrity, check if joinning the fact table with the dimensions gives correct values
# Joinning fact table with dim_img_citizen
outer = fact_img[['cicid']].drop_duplicates().merge(dim_img_citizen[['cicid']], on='cicid', indicator=True)
anti_join = outer[(outer._merge=='left_only')].drop('_merge', axis=1)
anti_join.head()

Unnamed: 0,cicid


In [35]:
# 1- Integrity, check if joinning the fact table with the dimensions gives correct values
# Joinning fact table with dim_date
outer = fact_img[['cicid']].drop_duplicates().merge(dim_date[['cicid']], on='cicid', indicator=True)
anti_join = outer[(outer._merge=='left_only')].drop('_merge', axis=1)
anti_join.head()

Unnamed: 0,cicid


In [36]:
# 1- Integrity, check if joinning the fact table with the dimensions gives correct values
# Joinning fact table with dim_visa
outer = fact_img[['cicid']].drop_duplicates().merge(dim_visa[['cicid']], on='cicid', indicator=True)
anti_join = outer[(outer._merge=='left_only')].drop('_merge', axis=1)
anti_join.head()

Unnamed: 0,cicid


In [37]:
# 1- Integrity, check if joinning the fact table with the dimensions gives correct values
# Joinning fact table with dim_dmg_pop
outer = fact_img[['state_code']].drop_duplicates().merge(dim_dmg_pop[['state_code']], on='state_code', indicator=True)
anti_join = outer[(outer._merge=='left_only')].drop('_merge', axis=1)
anti_join.head()

Unnamed: 0,state_code


In [38]:
# 1- Integrity, check if joinning the fact table with the dimensions gives correct values
# Joinning fact table with dim_dmg_stat
outer = fact_img[['state_code']].drop_duplicates().merge(dim_dmg_pop[['state_code']], on='state_code', indicator=True)
anti_join = outer[(outer._merge=='left_only')].drop('_merge', axis=1)
anti_join.head()

Unnamed: 0,state_code


In [39]:
# 1- Integrity, check if joinning the fact table with the dimensions gives correct values
# Joinning fact table with dim_city
dim_city.columns = ['city_code', 'city']
outer = fact_img[['city_code']].drop_duplicates().merge(dim_city[['city_code']], on='city_code', indicator=True)
anti_join = outer[(outer._merge=='left_only')].drop('_merge', axis=1)
anti_join.head()

Unnamed: 0,city_code


In [40]:
# 2- Redshift tables are not empty
empty_flag = False

In [41]:
# fact_immigration
list=[]
cur.execute('select * from fact_immigration limit 1')
list = cur.fetchall()
if list:
    print('check')
else:
    empty_flag = True
    print("fact_immigration is empty")

check


In [42]:
# dim_immigration_citizen
list=[]
cur.execute('select * from dim_immigration_citizen limit 1')
list = cur.fetchall()
if list:
    print('check')
else:
    empty_flag = True
    print("dim_immigration_citizen is empty")

check


In [43]:
# dim_demographics_population
list=[]
cur.execute('select * from dim_demographics_population limit 1')
list = cur.fetchall()
if list:
    print('check')
else:
    empty_flag = True
    print("dim_demographics_population is empty")

check


In [44]:
# dim_demographics_statistics
list=[]
cur.execute('select * from dim_demographics_statistics limit 1')
list = cur.fetchall()
if list:
    print('check')
else:
    empty_flag = True
    print("dim_demographics_statistics is empty")

check


In [45]:
# dim_visa
list=[]
cur.execute('select * from dim_visa limit 1')
list = cur.fetchall()
if list:
    print('check')
else:
    empty_flag = True
    print("dim_visa is empty")

check


In [46]:
# dim_airport_code
list=[]
cur.execute('select * from dim_airport_code limit 1')
list = cur.fetchall()
if list:
    print('check')
else:
    empty_flag = True
    print("airport_code is empty")

check


In [47]:
# dim_date
list=[]
cur.execute('select * from dim_date limit 1')
list = cur.fetchall()
if list:
    print('check')
else:
    empty_flag = True
    print("dim_date is empty")

check


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

| fact_immigration | Type      | Description                                                |
|------------------|-----------|------------------------------------------------------------|
| cic_id           | INT       | [Primary Key], an integer no. that represent the immigrant |
| year             | INT       | Integer representing the year the data is collected        |
| month            | INT       | Integer representing the month the data is collected       |
| airport_code     | varchar   | 3 charcters that represent the airport                     |
| state_code       | varchar   | 2 charcters that represent the state                       |

| dim_visa         | Type      | Description                                                |
|------------------|-----------|------------------------------------------------------------|
| cic_id           | INT       | [Primary Key], an integer no. that represent the immigrant |
| mode             | INT       | Integer that represent the visa mode                       |
| visa             | INT       | Integer that represent the visa type                       |


| dim_date         | Type      | Description                                                |
|------------------|-----------|------------------------------------------------------------|
| cic_id           | INT       | [Primary Key], an integer no. that represent the immigrant |
| arrive_date      | TIMESTAMP | Arrive date                                                |
| departure_date   | TIMESTAMP | Departure date                                             |


| dim_immigrationg_citizen  | Type    | Description                                                |
|---------------------------|---------|------------------------------------------------------------|
| cic_id                    | INT     | [Primary Key], an integer no. that represent the immigrant |
| citizen_country           | varchar | The country of citizenship of the immigrant                |
| residence_country         | varchar | The country of resiedence of the immigrant                 |
| birth_year                | INT     | Immigrant's birth year                                     |
| gender                    | varchar | Immigrant's gender                                         |
| ins_num                   | INT     | Immigrant's insurence number                               |


| dim_demographics_population       | Type    | Description                                            |
|-----------------------------------|---------|--------------------------------------------------------|
| state_code                        | varchar | [primary key], state code                              |
| state                             | varchar | state name                                             |
| male_population                   | integer | male population in the state                           |
| female_population                 | integer | female population in the state                         |
| total_population                  | integer | total population in the state                          |
| number_of_veterans                | integer | no. of veterans                                        |
| foreign_born                      | integer | no. of outcomers (born abroad)                         |
| american_indian_and_alaska_native | integer | count of  american_indian_and_alaska_native population |
| asian                             | integer | count of  asian population                             |
| black_or_african_american         | integer | count of  black_or_african_american population         |
| hispanic_or_latino                | integer | count of  hispanic_or_latino population                |
| white                             | integer | count of  white population                             |

| dim_demographics_statistics | Type    | Description                                         |
|-----------------------------|---------|-----------------------------------------------------|
| state_code                  | varchar | [primary key], 2 charcters that represent the state |
| state                       | varchar | the state name                                      |
| median_age                  | float   | the median age of population                        |
| avg_household_size          | float   | the average household size in the state             |


| dim_city     | Type    | Description                                       |
|--------------|---------|---------------------------------------------------|
| airport_code | varchar | [Primary Key], 3 char. that represnet the airport |
| city         | varchar | the city name                                     |



#### Step 5: Complete Project Write Up
#### * Clearly state the rationale for the choice of tools and technologies for the project.
    1. Pandas is used to manipulate the data in its easy-to-use dataframe
    2. AWS Redshift is used to hold the data in a data warehose that is distributed and widly accessable.
#### * Propose how often the data should be updated and why.
    1. Data that comes from Immigration dataset should be updated monthly.
    2. Data that comes from Demographics dataset should be updated annually.
    3. Data about Citys, States, and Countries should be updated on demand.
    4. Data about Airports codes should be updated on demand.
#### * Write a description of how you would approach the problem differently under the following scenarios:
#### * The data was increased by 100x.
     1. Apache Spark will be used instead of Pandas libiraries to leverage the advantages of distributed proccessing.
     2. AWS EMR will be used to easily manage the Apache Spark cluster.
#### * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     1. Apache Airflow will be used to schedule the run of the pipline.
#### * The database needed to be accessed by 100+ people.
     1. AWS Redshift database can handle up to 500 connections simultaneously, to handle more than 500 connections, using another Airflow pipline to duplicate the database periodically to work as a load balancer would be a suggested solution.

#### Pipline purpose
##### The is made to use the output in analytics and to be consumed by BI tools and in dashboards.
##### Users can accsess the output through AWS Redshift data warhouse, and they can use its API to connect to their apps and tools

#### Query Example

In [48]:
# Query that gives the percintage of the immigrnats gender population in the state they traveled to.
cur.execute("""SELECT fact_immigration.cicid, dim_demographics_population.state, citizen_country, residence_country, gender,
CASE
WHEN gender='M' THEN male_population/total_population*100
WHEN gender='F' THEN female_population/total_population*100
ElSE 0
END AS "Traveler_gender_percentage_in_state"
FROM fact_immigration

join dim_demographics_population on fact_immigration.state_code=dim_demographics_population.state_code
join dim_immigration_citizen on fact_immigration.cicid=dim_immigration_citizen.cicid
LIMIT 5""")
cur.fetchall()

[(4084316.0, 'HAWAII', 'JAPAN', 'JAPAN', 'F', 49.8798070108797),
 (4422636.0, 'TEXAS', None, None, 'M', 49.41217597287),
 (1195600.0, 'FLORIDA', None, 'GERMANY', 'M', 47.8606878718876),
 (5291768.0, 'CALIFORNIA', 'QATAR', 'QATAR', 'M', 49.4600769627753),
 (985523.0, 'NEW YORK', 'FRANCE', 'FRANCE', 'F', 52.2003740455375)]

In [49]:
# Query that combines immigrants data with the state and the race percintage
cur.execute("""SELECT fact_immigration.cicid, fact_immigration.state_code, state, male_population, female_population,
total_population, asian/total_population as "asian_percintage", arrive_date, departure_date
FROM fact_immigration
join dim_demographics_population on fact_immigration.state_code=dim_demographics_population.state_code
join dim_date on fact_immigration.cicid=dim_date.cicid
limit 5""")
cur.fetchall()

[(4084316.0,
  'HI',
  'HAWAII',
  884035.0,
  879795.0,
  1763830,
  0.136622010057659,
  datetime.datetime(2016, 4, 22, 0, 0),
  datetime.datetime(2016, 4, 29, 0, 0)),
 (4422636.0,
  'TX',
  'TEXAS',
  34862194.0,
  35691659.0,
  70553853,
  0.0131042028278739,
  datetime.datetime(2016, 4, 23, 0, 0),
  datetime.datetime(2016, 4, 24, 0, 0)),
 (1195600.0,
  'FL',
  'FLORIDA',
  15461937.0,
  16626425.0,
  32306132,
  0.00820070319777063,
  datetime.datetime(2016, 4, 7, 0, 0),
  datetime.datetime(2016, 4, 27, 0, 0)),
 (5291768.0,
  'CA',
  'CALIFORNIA',
  61055672.0,
  62388681.0,
  123444353,
  0.0368079210557327,
  datetime.datetime(2016, 4, 28, 0, 0),
  datetime.datetime(2016, 5, 7, 0, 0)),
 (985523.0,
  'NY',
  'NEW YORK',
  23422799.0,
  25579256.0,
  49002055,
  0.0280268449966027,
  datetime.datetime(2016, 4, 6, 0, 0),
  datetime.datetime(2016, 4, 9, 0, 0))]