# Data Engineering Capstone Project

#### Project Summary
With a great number of people immigrate to U.S. each year, the authority is keen to know the information regarding to the immigrants, such as the cities they immigrate to, the weather of the cities, and demographic of the cities etc. By understand the trend of the immigration data, the authority can draw their plan accordingly to provide relevant services and regulations for the immigrates. 

The goal of this project is to create an ETL process and data pipeline to process the data files for immigration data. By going through this process, the data will be made easily accessible for the authority.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [11]:
# Do all imports and installs here
import pandas as pd
import sqlite3 as sq
from load_file_to_s3 import load_to_s3

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will work with four datasets to complete the project. The main dataset includes data on immigration to the United States, and supplementary datasets which include data on airport codes, U.S. city demographics, and temperature data.

#### Describe and Gather Data 
 - Main dataset:  [I94 immigration data](https://www.trade.gov/i-94-arrivals-program) comes from the The National Travel and Tourism Office (NTTO) of U.S. The I-94 provides a count of visitor arrivals to the United States (with stays of 1-night or more and visiting under certain visa types) to calculate U.S. travel and tourism volume exports.
 - Supplementary dataset:
     - [Global city temperatures](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) comes from Kaggle, which includes temperature reports from all over the world throughout the year. This source could help enrich the data by indicating the possible temperature.
     - [US City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/) contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey. This data source could help provide more info about the destinations of immigrations relocating to.
     - [Airport codes](https://datahub.io/core/airport-codes#python) contains the lists of all airport codes and corresponding names, cities, countries around the world. The file 'I94_SAS_Labels_Descriptions.SAS' provides descriptions and mappings for airport codes and their cities and states.

In [12]:
# Read in the data here
i94_fname = '.gitignore/data/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_i94 = pd.read_sas(i94_fname, 'sas7bdat', encoding="ISO-8859-1")

In [13]:
pd.options.display.max_columns = None
df_i94.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [14]:
# Read in the temperature data
temperature_fname = '.gitignore/data/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(temperature_fname)

In [15]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [16]:
# Read in the US city Demograpghic data
df_demographics = pd.read_csv(".gitignore/data/us-cities-demographics.csv", delimiter=";")

In [17]:
df_demographics.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [18]:
# Read in Airport codes data
df_airport_codes = pd.read_csv(".gitignore/data/airport-codes_csv.csv")

In [19]:
df_airport_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


### Step 2: Explore and Assess the Data
#### Explore the Data 
Going through each data file to identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Clean the data, removing missing values etc before stagging the data.

In [20]:
# identify null value in the data files
df_i94.isnull().sum()

cicid             0
i94yr             0
i94mon            0
i94cit            0
i94res            0
i94port           0
arrdate           0
i94mode         239
i94addr      152592
depdate      142457
i94bir          802
i94visa           0
count             0
dtadfile          1
visapost    1881250
occup       3088187
entdepa         238
entdepd      138429
entdepu     3095921
matflag      138429
biryear         802
dtaddto         477
gender       414269
insnum      2982605
airline       83627
admnum            0
fltno         19549
visatype          0
dtype: int64

In [21]:
# Get port locations from SAS text file
with open(".gitignore/data/I94_SAS_Labels_Descriptions.SAS") as f:
    content = f.readlines()
content = [x.strip() for x in content]
ports = content[302:962]
splitted_ports = [port.split("=") for port in ports]
port_codes = [x[0].replace("'","").strip() for x in splitted_ports]
port_locations = [x[1].replace("'","").strip() for x in splitted_ports]
port_cities = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
df_port_locations = pd.DataFrame({"port_code" : port_codes, "port_city": port_cities, "port_state": port_states})
df_port_locations.head(5)

Unnamed: 0,port_code,port_city,port_state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [22]:
# Get unique port codes for irregular ports that can be removed
irregular_ports_df = df_port_locations[df_port_locations["port_city"] == df_port_locations["port_state"]]
irregular_ports = list(set(irregular_ports_df["port_code"].values))
print(*irregular_ports, sep=',')

OAI,ZZZ,MAA,EGE,GPI,OLM,DRV,AMT,ASI,SP0,CXO,PHF,888,NC8,WAS,MAP,BCM,CLX,LIT,NK,PHN,VMB,JIG,BUS,SCH,JMZ,MTH,RYY,JBQ,IAG,SUS,GMT,OGS,A2A,PFN,W55,74S,DEC,ADU,WA5,CPX,CHN,BKF,OTS,FTB,GAC,NGL,TIW,5T6,JSJ,AUH,XXX,ISP,PCW,ATW,FSC,PLB,STN,NYL,060,CP,AKT,OSN,Y62,FRG,HRL,.GA,X96,AG,YGF,UNK,DAY,JFA,WTR,XNA,X44,T01


In [23]:
# drop all irregular ports from i94 data
print(f"i94 data contains {len(df_i94)} rows before cleaning.")
df_i94_filtered = df_i94[~df_i94["i94port"].isin(irregular_ports)]
print(f"i94 data contains {len(df_i94_filtered)} rows after removing irregular ports.")
df_i94_filtered.drop(columns=["insnum", "entdepu", "occup", "visapost"], inplace=True)
df_i94_new = df_i94_filtered.dropna()
print(f"i94 data contains {len(df_i94_new)} rows after removing NaN values.")

i94 data contains 3096313 rows before cleaning.
i94 data contains 2995590 rows after removing irregular ports.


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return super().drop(


i94 data contains 2306750 rows after removing NaN values.


In [24]:
# Group the temperature data by country and only select average temperature, Latitude and Logitude
df_temperature_country = df_temperature.groupby(['Country']).agg({"AverageTemperature":"mean","Latitude":"first","Longitude":"first"}).reset_index()
df_temperature_country.head()

Unnamed: 0,Country,AverageTemperature,Latitude,Longitude
0,Afghanistan,13.816497,36.17N,69.61E
1,Albania,15.525828,40.99N,19.17E
2,Algeria,17.763206,36.17N,3.98E
3,Angola,21.759716,12.05S,13.15E
4,Argentina,16.999216,39.38S,62.43W


In [27]:
# Get country codes from SAS text file
with open(".gitignore/data/I94_SAS_Labels_Descriptions.SAS") as f:
    content = f.readlines()
content = [x.strip() for x in content]
countries = content[9:299]
splitted_countries = [country.split("=") for country in countries]
country_codes = [x[0].strip() for x in splitted_countries]
country_names = [x[-1].replace("'","").strip() for x in splitted_countries]
df_countries = pd.DataFrame({"country_codes":country_codes, "country_names":country_names})
df_countries.head(5)

Unnamed: 0,country_codes,country_names
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [28]:
# Merge df_countries with temperature data for country code
df_temperature_new = pd.merge(df_countries, df_temperature_country, left_on=df_countries['country_names'].str.lower(), right_on=df_temperature_country['Country'].str.lower(), how="inner")

In [29]:
df_temperature_new.head(5)

Unnamed: 0,key_0,country_codes,country_names,Country,AverageTemperature,Latitude,Longitude
0,afghanistan,236,AFGHANISTAN,Afghanistan,13.816497,36.17N,69.61E
1,albania,101,ALBANIA,Albania,15.525828,40.99N,19.17E
2,algeria,316,ALGERIA,Algeria,17.763206,36.17N,3.98E
3,angola,324,ANGOLA,Angola,21.759716,12.05S,13.15E
4,argentina,687,ARGENTINA,Argentina,16.999216,39.38S,62.43W


In [30]:
df_temperature_new['country_code'] = df_temperature_new['country_codes']
cols_to_keep = ['country_code', 'Country', 'AverageTemperature', 'Latitude', 'Longitude']
df_temperature_new = df_temperature_new[cols_to_keep]
df_temperature_new.head(5)

Unnamed: 0,country_code,Country,AverageTemperature,Latitude,Longitude
0,236,Afghanistan,13.816497,36.17N,69.61E
1,101,Albania,15.525828,40.99N,19.17E
2,316,Algeria,17.763206,36.17N,3.98E
3,324,Angola,21.759716,12.05S,13.15E
4,687,Argentina,16.999216,39.38S,62.43W


In [31]:
# Remove missing data for airport code data
print(f"airport codes data contains {len(df_airport_codes)} rows before cleaning.")
df_airport_codes.dropna(subset=['iata_code'], inplace=True)
print(f"airport codes data contains {len(df_airport_codes)} rows after removing NaN values.")

airport codes data contains 55075 rows before cleaning.
airport codes data contains 9189 rows after removing NaN values.


In [32]:
airport_columns_to_keep = ['iata_code', 'name', 'type', 'local_code','municipality','coordinates']
df_airport_codes_new = df_airport_codes[airport_columns_to_keep].drop_duplicates(subset ='iata_code')

In [33]:
df_airport_codes_new.head(5)

Unnamed: 0,iata_code,name,type,local_code,municipality,coordinates
223,UTK,Utirik Airport,small_airport,03N,Utirik Island,"169.852005, 11.222"
440,OCA,Ocean Reef Club Airport,small_airport,07FA,Key Largo,"-80.274803161621, 25.325399398804"
594,PQS,Pilot Station Airport,small_airport,0AK,Pilot Station,"-162.899994, 61.934601"
673,CSE,Crested Butte Airpark,small_airport,0CO2,Crested Butte,"-106.928341, 38.851918"
1088,JCY,LBJ Ranch Airport,small_airport,0TE7,Johnson City,"-98.62249755859999, 30.251800537100003"


In [34]:
# Remove missing data for df_demographics
print(f"demographics data contains {len(df_demographics)} rows before cleaning")
df_demographics.dropna(inplace=True)
print(f"demographics data contains {len(df_demographics)} rows after removing NaN values.")

demographics data contains 2891 rows before cleaning
demographics data contains 2875 rows after removing NaN values.


In [35]:
columns_to_keep = ['State Code', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'Count']
df_demographics_sliced = df_demographics[columns_to_keep]

In [36]:
df_demographics_new = df_demographics_sliced.groupby(['State Code']).agg({'Median Age':'median','Male Population':'sum','Female Population':'sum','Total Population':'sum'\
                                                                         , 'Number of Veterans': 'sum', 'Foreign-born':'sum', 'Average Household Size':'median', 'Count':'sum'}).reset_index()

In [37]:
df_demographics_new.head(5)

Unnamed: 0,State Code,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
0,AK,32.2,764725.0,728750.0,1493475,137460.0,166290.0,2.77,336228
1,AL,38.0,2448200.0,2715106.0,5163306,352896.0,252541.0,2.41,1096154
2,AR,32.6,1400724.0,1482165.0,2882889,154390.0,307753.0,2.44,643597
3,AZ,34.1,11137275.0,11360435.0,22497710,1322525.0,3411565.0,2.835,5754881
4,CA,35.8,61055672.0,62388681.0,123444353,4617022.0,37059662.0,3.06,31753718


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

|Table Type|Table Name|Columns|Primary Key|
|------|------|------|------|
|Fact table  |immigration  |cicid , i94yr, i94mon, country_of_citizenship , country_of_residence , port_of_entry , arrdate , transport_mode , immi_addr , depdate , age , visa , char_date_field , entdepa , entdepd , matflag , biryear , dtaddto , gender , airline , admnum , fltno , visatype | cicid |
|Dimension table  |temperature  |country_codes, Country, average_temperature, latitude, longitude | country_codes |
|Dimension table  |airports  |iata_code, ap_name, ap_type, local_code, municipality, coordinates  | airports |
|Dimension table  |demographics  |state_code, median_Age, male_population, female_population, total_population, number_of_veterans, foreign_born, average_household_size, total_count | state_code |
|Dimension table  |transport  |transport_code, transport_name | transport_code |
|Dimension table  |visa  |visa_code, visa_name | visa_code |


<img src="img/capstone_udacity.jpeg" />

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model<br>
    1. Start execution<br>
    2. After cleaning the raw data, store the data into S3 bucket <br>
    3. Create the staging tables if not exists<br>
    4. Copy data from S3 to staging tables<br>
    5. Insert data into fact and dimension tables<br>
    6. Run data quality checks
    7. Stop execution

### Step 4: Run Pipelines to Model the Data 
In this notebook, a database named i94 is created for demostrating the process of populating data into the data warehouse in Redshift.
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [38]:
# specify the database name (use sqlite to stimulate redshift)
sql_data = 'i94.sqlite'
# establish connection
conn = sq.connect(sql_data)
cur = conn.cursor()

In [39]:
# create staging table
df_i94_new.to_sql('stagging_immigration', conn, if_exists='replace', index=False)

In [40]:
df_temperature_new.to_sql('stagging_temperature', conn, if_exists='replace', index=False)
df_airport_codes_new.to_sql('stagging_airports', conn, if_exists='replace', index=False)
df_demographics_new.to_sql('stagging_demographics', conn, if_exists='replace', index=False)

  sql.to_sql(


In [41]:
# check the database connection
pd.read_sql('SELECT * \
    FROM stagging_immigration \
    LIMIT 5', conn)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
0,27.0,2016.0,4.0,101.0,101.0,BOS,20545.0,1.0,MA,20549.0,58.0,1.0,1.0,20160401,G,O,M,1958.0,4062016,M,LH,92478760000.0,422,B1
1,28.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20549.0,56.0,1.0,1.0,20160401,G,O,M,1960.0,4062016,F,LH,92478900000.0,422,B1
2,29.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20561.0,62.0,2.0,1.0,20160401,G,O,M,1954.0,9302016,M,AZ,92503780000.0,614,B2
3,30.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NJ,20578.0,49.0,2.0,1.0,20160401,G,O,M,1967.0,9302016,M,OS,92470210000.0,89,B2
4,31.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,NY,20611.0,43.0,2.0,1.0,20160401,G,O,M,1973.0,9302016,M,OS,92471290000.0,89,B2


In [42]:
# select attributes from stagging_immigration for inserting into fact table
immigration_sql = '''
SELECT CAST(cicid AS int) as cicid, CAST(i94cit AS int) as contry_code, i94port as iata_code, i94addr as state_code, CAST(i94yr AS int) as year, CAST(i94mon AS int) as month, CAST(i94res AS int) as contry_of_residence, i94port as port_of_entry, \
    CAST(arrdate AS int) as arrdate, CAST(i94mode AS int) as transport_mode, i94addr as immi_addr, CAST(depdate AS int) depdate, CAST(i94bir AS int) as age, CAST(i94visa AS int) as visa,\
    dtadfile as char_date_field, entdepa, entdepd, matflag, CAST(biryear AS int) as biryear, dtaddto, gender, airline, admnum, fltno, visatype
FROM stagging_immigration;
'''
immigration_sql_query_fact_table = pd.read_sql_query(immigration_sql, conn)

In [43]:
immigration_sql_query_fact_table.to_sql('immigration', conn, if_exists='replace', index=False)

In [44]:
# check the stagging_temperature table
pd.read_sql('SELECT * \
    FROM stagging_temperature \
    LIMIT 5', conn)

Unnamed: 0,country_code,Country,AverageTemperature,Latitude,Longitude
0,236,Afghanistan,13.816497,36.17N,69.61E
1,101,Albania,15.525828,40.99N,19.17E
2,316,Algeria,17.763206,36.17N,3.98E
3,324,Angola,21.759716,12.05S,13.15E
4,687,Argentina,16.999216,39.38S,62.43W


In [45]:
# select columns from stagging_temperature table for inserting into temperature dim table
temperature_sql = '''
SELECT country_code, Country as country, AverageTemperature as average_temperature, Latitude as latitude, Longitude as longitude
FROM stagging_temperature;
'''
temperature_sql_query_dim_table = pd.read_sql_query(temperature_sql, conn)
temperature_sql_query_dim_table.to_sql('temperature', conn, if_exists='replace', index=False)

In [46]:
# check the stagging_airports table
pd.read_sql('SELECT * \
    FROM stagging_airports \
    LIMIT 5', conn)

Unnamed: 0,iata_code,name,type,local_code,municipality,coordinates
0,UTK,Utirik Airport,small_airport,03N,Utirik Island,"169.852005, 11.222"
1,OCA,Ocean Reef Club Airport,small_airport,07FA,Key Largo,"-80.274803161621, 25.325399398804"
2,PQS,Pilot Station Airport,small_airport,0AK,Pilot Station,"-162.899994, 61.934601"
3,CSE,Crested Butte Airpark,small_airport,0CO2,Crested Butte,"-106.928341, 38.851918"
4,JCY,LBJ Ranch Airport,small_airport,0TE7,Johnson City,"-98.62249755859999, 30.251800537100003"


In [47]:
# select columns from stagging_airports table for inserting into airport dim table
airport_sql = '''
SELECT iata_code, name as ap_name, type as ap_type, local_code, municipality, coordinates
FROM stagging_airports;
'''
airport_sql_query_dim_table = pd.read_sql_query(airport_sql, conn)

airport_sql_query_dim_table.to_sql('airports', conn, if_exists='replace', index=False)

In [48]:
# check the stagging_demographics table
pd.read_sql('SELECT * \
    FROM stagging_demographics \
    LIMIT 5', conn)

Unnamed: 0,State Code,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
0,AK,32.2,764725.0,728750.0,1493475,137460.0,166290.0,2.77,336228
1,AL,38.0,2448200.0,2715106.0,5163306,352896.0,252541.0,2.41,1096154
2,AR,32.6,1400724.0,1482165.0,2882889,154390.0,307753.0,2.44,643597
3,AZ,34.1,11137275.0,11360435.0,22497710,1322525.0,3411565.0,2.835,5754881
4,CA,35.8,61055672.0,62388681.0,123444353,4617022.0,37059662.0,3.06,31753718


In [49]:
# select columns from stagging_demographics table for inserting into demographics dim table
demographics_sql = '''
SELECT "State Code" as state_code, "Median Age" as median_Age, CAST("Male Population" as numeric(10,0)) as male_population,\
    CAST("Female Population" as numeric(10,0)) as female_population, CAST("Total Population" as numeric(10,0)) as total_population, "Number of Veterans" as number_of_veterans,\
    "Foreign-born" as foreign_born, "Average Household Size" as average_household_size, Count as total_count 
FROM stagging_demographics;
'''
demographics_sql_query_dim_table = pd.read_sql_query(demographics_sql, conn)

demographics_sql_query_dim_table.to_sql('demographics', conn, if_exists='replace', index=False)

In [51]:
# create visa dim table
details = {
    'visa_code': [1,2,3],
    'visa_name': ['Business','Pleasure','Student']
}
df_visa = pd.DataFrame(details)
df_visa.to_sql('visa', conn, if_exists='replace', index=False)

In [52]:
# create transport dim table
details = {
    'transport_code': [1,2,3,9],
    'transport_name': ['Air','Sea','Land','Not reported']
}
df_transport = pd.DataFrame(details)
df_transport.to_sql('transport', conn, if_exists='replace', index=False)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (unique key)
 * Count checks to ensure completeness
 
Run Quality Checks

In [69]:
# Source/Count checks to ensure completeness
def data_count_check(conn, table_name):
    """ Data quality check: check the number of records in the table.
    
    Key argument:
        Input: 
            conn: database connection
            table_name: table name
        Output: print outcome of table count check
    """
    
    check_table_count_sql = 'SELECT COUNT(*) FROM {}'.format(table_name)
    records = pd.read_sql_query(check_table_count_sql, conn)
    
    if outcome['COUNT(*)'][0] >0:
        print('{} records in {} table'.format(records, table_name))
    else:
        print('No data found in table {}'.format(table_name))
    

In [71]:
# check data count
data_count_check(conn=conn, table_name='immigration')
data_count_check(conn=conn, table_name='temperature')
data_count_check(conn=conn, table_name='airports')
data_count_check(conn=conn, table_name='demographics')
data_count_check(conn=conn, table_name='visa')
data_count_check(conn=conn, table_name='transport')

   COUNT(*)
0   2306750 records in immigration table
   COUNT(*)
0       149 records in temperature table
   COUNT(*)
0      9042 records in airports table
   COUNT(*)
0        48 records in demographics table
   COUNT(*)
0         3 records in visa table
   COUNT(*)
0         4 records in transport table


In [83]:
# Data Integrity check
def data_interity_check(conn, table_name, pk):
    """ Data interity check is to ensure every primary key is unique
    
    Key argument:
        Input: 
            conn: database connection
            table_name: table name
            pk: primary key name
        Output:
            print outcome of data interity check
    
    """
    data_interity_check_sql = '''
        SELECT count FROM
        (
            SELECT {}, SUM(1) AS count 
            FROM {} 
            GROUP BY 1 
            HAVING count > 1
        )
        
    '''.format(pk, table_name)
    results = pd.read_sql_query(data_interity_check_sql, conn)
    
    if results.empty is False:
        print('{} duplicated keys in {} table'.format(records, table_name))
    else:
        print('No duplicated key found in table {}'.format(table_name))

In [85]:
data_interity_check(conn=conn, table_name='immigration', pk='cicid')
data_interity_check(conn=conn, table_name='temperature', pk='country_code')
data_interity_check(conn=conn, table_name='airports', pk='iata_code')
data_interity_check(conn=conn, table_name='demographics', pk='state_code')
data_interity_check(conn=conn, table_name='visa', pk='visa_code')
data_interity_check(conn=conn, table_name='transport', pk='transport_code')

No duplicated key found in table immigration
No duplicated key found in table temperature
No duplicated key found in table airports
No duplicated key found in table demographics
No duplicated key found in table visa
No duplicated key found in table transport


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Data dictionanry
#### immigration fact table
| Column Name | Description | 
| --- | --- |
| cicd | ID that uniquely identify each record|
| country_code | country code that serves as foreign key to temperature table|
| iata_code | unique identifier of airport table |
| state_code | unique identifier of demographics table |
| year | 4 digit year, year of record made |
| month | 1 digit month, month when the record was made |
| country_of_residence | residence of country for immigration |
| port_of_entry | port addmitted through |
| arrdate | Arrival date in the US |
| transport | mode of transportation (1=Air, 2=Sea, 3=Land, 9=Not reported) |
| immi_addr | state of arrival |
| depdate | Departure date from the USA |
| age | Age of the immigrant in year |
| visa | visa code (1=Business, 2=Pleasure, 3=Student)|
| char_date_field | Character Date Field |
| entdepa | Arrival flag. Whether admitted or paroled into the US |
| entdepd |  Departure flag. Whether departed, lost visa, or deceased |
| matflag | Match flag |
| biryear | 4 digit year of birth |
| dtaddto | Character Date Field - Date to which admitted to U.S. (allowed to stay until) |
| gender | Non-immigrant sex |
| airline | Airline used to arrive in U.S. |
| admnum | Admission Number |
| fltno | Flight number of Airline used to arrive in U.S. |
| visatype | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |
| 

#### temperature dimention table
| Column Name | Description | 
| --- | --- |
| country_code | unique identifier for each country |
| country | name of the country |
| average_temperature | average temperature |
| latitude | latitude|
| longitude | longitude |

#### airports dimention table
| Column Name | Description | 
| --- | --- |
| iata_code | unique identifier of each airport (a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems) |
| ap_name | name of the airport |
| ap_type | type of the airport |
| local_code | lcation identifier code |
| municipality | city that the airport is owned by |
| coordinates | latitude and longitude of the airport |

#### visa dimention table
| Column Name | Description | 
| --- | --- |
|visa_code| visa code|
|visa_name| name of the visa |

#### transport dimention table
| Column Name | Description | 
| --- | --- |
|transport_code| transport code|
|transport_name| name of the transport |

#### Step 5: Complete Project Write Up
This project processed the I-94 immigration dataset and enriched the data by adding additional temperature, airport, demograhic data. This project builds a data model to store this data, making the data avaialble for analytics team to query and find insight of immigration pattern.

Below are some questions we can answer by analysing the data




Questions to answer:
1. For the number of the immigrations recorded in the data, what are the visa they were holding when entering the country. What are the immigration number that holding each visa type?

In [94]:
sql_visa_type = """
    select i.year, v.visa_name, count(i.cicid) as number_of_immigrants
    from immigration i
    join visa v
    on i.visa = v.visa_code
    group by i.year, i.visa
    order by i.year, number_of_immigrants desc
    
"""

sql_visa_query_result = pd.read_sql_query(sql_visa_type, conn)

In [95]:
sql_visa_query_result

Unnamed: 0,year,visa_name,number_of_immigrants
0,2016,Pleasure,1921653
1,2016,Business,359718
2,2016,Student,25379


2. Amongst all the states, find out top 10 states that had the most immigrants during the recorded period. What are their median age, female population, male population, number of veterans, foreign born population, average household size, and total population

In [98]:
sql_demographic = '''
    select i.number_of_immigrants, i.state_code, d.median_Age, d.male_population, d.female_population, d.total_population, d.number_of_veterans, d.foreign_born, d.average_household_size, d.total_count
    from 
    (select state_code, count(cicid) as number_of_immigrants
    from immigration 
    group by state_code
    order by number_of_immigrants desc) i
    join demographics d
    on i.state_code = d.state_code
    limit 10;
'''

sql_demographic_query_result = pd.read_sql_query(sql_demographic, conn)

In [99]:
sql_demographic_query_result

Unnamed: 0,number_of_immigrants,state_code,median_Age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,total_count
0,1110,AK,32.2,764725.0,728750.0,1493475,137460.0,166290.0,2.77,336228
1,5779,AL,38.0,2448200.0,2715106.0,5163306,352896.0,252541.0,2.41,1096154
2,2011,AR,32.6,1400724.0,1482165.0,2882889,154390.0,307753.0,2.44,643597
3,15871,AZ,34.1,11137275.0,11360435.0,22497710,1322525.0,3411565.0,2.835,5754881
4,381969,CA,35.8,61055672.0,62388681.0,123444353,4617022.0,37059662.0,3.06,31753718
5,11378,CO,36.8,7273095.0,7405250.0,14678345,939480.0,1688155.0,2.53,3587084
6,10231,CT,35.4,2123435.0,2231661.0,4355096,122546.0,1114250.0,2.7,1106528
7,9201,DC,33.8,1598525.0,1762615.0,3361140,129815.0,475585.0,2.24,726519
8,2417,DE,36.4,163400.0,196385.0,359785,15315.0,16680.0,2.45,75048
9,510063,FL,39.9,15461937.0,16626425.0,32088362,1816258.0,7833464.0,2.64,8590869


#### Technology choice

Using python to process the data as python is sufficient to processing this amount of data. The processed data is stored in S3, becuase it can help keep a record of this data. Using a copy method, the data can be loaded into Redshift staging table. The reason to choose Redshift is becuase it's provided by AWS and fully managed service, it has greate advantage of storing data as a data warehouse.

Star schema was applied in data modeling. The reason to choose Star schema over Snowflake schema is because Star schema is in a more de-normalised form and hence tends to be better for performance. In addition, star schema is easier for readalibilty because its query structure is not as complex; on the other hand, snowflake schema would have a more complex query structure so it would be tougher for readability and implementing changes.

#### Data update
The date should be at least updated monthly, as I94 data set is updated every month. The other datasets are relatively static so they can be refreshed less frequently

#### How you would approach the problem differently under the following scenarios:
#### 1. The data was increased by 100x.
 If the data was increased by 100x, python would not be sufficient to process such big volume of data. I would choose to use Spark and run the data processing under EMR environment
#### 2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
 Use airflow to schedule the pipeline to be run a few hours before 7am every day. Connect the dashboard tool with redshift to populate the data into the dashboard
#### 3. The database needed to be accessed by 100+ people.
  Access the usability of data for these 100+ people. If necessary, we can create a data mart for each usability of the data set, so that people from each department can only see the data necessary for their department.