# Udacity Data Engineer Nanodegree - Capstone Project

#### Project Summary

This project will build up a data warehouse as a [single-source-of-truth](https://en.wikipedia.org/wiki/Single_source_of_truth) database by integrating data from different data sources for data analysis purpose and future backend usage.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import pyspark

### Step 1: Scope the Project and Gather Data

#### Scope
This project will integrate I94 immigration data, world temperature data and US demographic data to setup a data warehouse with fact and dimension tables.

* Data Sets 
    1. [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)
    2. [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
    3. [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

* Tools
    * AWS S3: data storage
    * Python for data processing
        * Pandas - exploratory data analysis on small data set
        * PySpark - data processing on large data set

#### Describe and Gather Data 

| Data Set | Format | Description |
| ---      | ---    | ---         |
|[I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)| SAS | Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).|
|[World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)| CSV | This dataset is from Kaggle and contains monthly average temperature data at different country in the world wide.|
|[U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)| CSV | This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.|

### Step 2: Explore and Assess the Data

#### Explore the data

1. Use pandas for exploratory data analysis to get an overview on these data sets
2. Split data sets to dimensional tables and change column names for better understanding
3. Utilize PySpark on one of the SAS data sets to test ETL data pipeline logic

#### Explore immigration data set

In [2]:
df_immigration = pd.read_csv("immigration_data_sample.csv")

In [3]:
df_immigration.head(10)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT
5,721257,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,...,,M,1965.0,10072016,M,,DL,736852600.0,910,B2
6,1072780,2197173.0,2016.0,4.0,245.0,245.0,SFR,20556.0,1.0,CA,...,,M,1968.0,10112016,F,,CX,786312200.0,870,B2
7,112205,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,...,,M,1983.0,6302016,F,,BA,55474490000.0,00117,WT
8,2577162,5227851.0,2016.0,4.0,131.0,131.0,CHI,20572.0,1.0,IL,...,,M,1977.0,7262016,,,LX,59413420000.0,00008,WT
9,10930,13213.0,2016.0,4.0,116.0,116.0,LOS,20545.0,1.0,CA,...,,M,1981.0,6292016,,,AA,55449790000.0,00109,WT


In [4]:
df_immigration.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [5]:
fact_immigration = df_immigration[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 
                                   'arrdate', 'depdate', 'i94mode', 'i94visa']]
fact_immigration.columns = ['cic_id', 'year', 'month', 'city_code', 'state_code',
                            'arrive_date', 'departure_date', 'mode', 'visa']
fact_immigration.head(10)

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0
5,1481650.0,2016.0,4.0,ATL,GA,20552.0,20606.0,1.0,2.0
6,2197173.0,2016.0,4.0,SFR,CA,20556.0,20635.0,1.0,2.0
7,232708.0,2016.0,4.0,NYC,NY,20546.0,20554.0,1.0,2.0
8,5227851.0,2016.0,4.0,CHI,IL,20572.0,20575.0,1.0,2.0
9,13213.0,2016.0,4.0,LOS,CA,20545.0,20553.0,1.0,2.0


In [6]:
fact_immigration.insert(2, 'country', 'United States')
fact_immigration.head(10)

Unnamed: 0,cic_id,year,country,month,city_code,state_code,arrive_date,departure_date,mode,visa
0,4084316.0,2016.0,United States,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,United States,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,United States,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,United States,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,United States,4.0,CHM,NY,20550.0,20553.0,3.0,2.0
5,1481650.0,2016.0,United States,4.0,ATL,GA,20552.0,20606.0,1.0,2.0
6,2197173.0,2016.0,United States,4.0,SFR,CA,20556.0,20635.0,1.0,2.0
7,232708.0,2016.0,United States,4.0,NYC,NY,20546.0,20554.0,1.0,2.0
8,5227851.0,2016.0,United States,4.0,CHI,IL,20572.0,20575.0,1.0,2.0
9,13213.0,2016.0,United States,4.0,LOS,CA,20545.0,20553.0,1.0,2.0


In [7]:
df_immigration_personal = df_immigration[['cicid', 'i94cit', 'i94res', 'biryear',
                                          'gender', 'insnum']]
df_immigration_personal.columns = [['cic_id', 'citizen_country',
                                    'residence_country', 'birth_year', 'gender',
                                    'ins_num']]
df_immigration_personal.head(10)

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,
5,1481650.0,577.0,577.0,1965.0,M,
6,2197173.0,245.0,245.0,1968.0,F,
7,232708.0,113.0,135.0,1983.0,F,
8,5227851.0,131.0,131.0,1977.0,,
9,13213.0,116.0,116.0,1981.0,,


In [8]:
df_immigration_airline = df_immigration[['cicid', 'airline', 'admnum',
                                         'fltno', 'visatype']]
df_immigration_airline.columns = ['cic_id', 'airline', 'admin_num',
                                  'flight_number', 'visa_type']
df_immigration_airline.head(10)

Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT
5,1481650.0,DL,736852600.0,910,B2
6,2197173.0,CX,786312200.0,870,B2
7,232708.0,BA,55474490000.0,00117,WT
8,5227851.0,LX,59413420000.0,00008,WT
9,13213.0,AA,55449790000.0,00109,WT


#### Explore temperature data set

In [9]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)

In [10]:
df_temp.head(10)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E


In [11]:
df_temp_usa = df_temp[df_temp['Country'] == 'United States']
df_temp_usa = df_temp_usa[['dt', 'AverageTemperature',
                           'AverageTemperatureUncertainty',
                           'City', 'Country']]
df_temp_usa.columns = ['dt', 'avg_temp', 'avg_temp_uncertnty',
                       'city','country']
df_temp_usa.head(10)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country
47555,1820-01-01,2.101,3.217,Abilene,United States
47556,1820-02-01,6.926,2.853,Abilene,United States
47557,1820-03-01,10.767,2.395,Abilene,United States
47558,1820-04-01,17.989,2.202,Abilene,United States
47559,1820-05-01,21.809,2.036,Abilene,United States
47560,1820-06-01,25.682,2.008,Abilene,United States
47561,1820-07-01,26.268,1.802,Abilene,United States
47562,1820-08-01,25.048,1.895,Abilene,United States
47563,1820-09-01,22.435,2.216,Abilene,United States
47564,1820-10-01,15.83,2.169,Abilene,United States


In [12]:
df_temp_usa['dt'] = pd.to_datetime(df_temp_usa['dt'])
df_temp_usa['year'] = df_temp_usa['dt'].apply(lambda t: t.year)
df_temp_usa['month'] = df_temp_usa['dt'].apply(lambda t: t.month)
df_temp_usa.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,1820,5


#### Explore demography data set

In [13]:
df_demog = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df_demog.head(10)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


In [14]:
dim_city_population = df_demog[['City', 'State', 'Male Population',
                                'Female Population', 'Number of Veterans',
                                'Foreign-born', 'Race']]
dim_city_population.columns = ['city', 'state', 'male_pop', 'female_pop',
                               'num_vetarans', 'foreign_born', 'race']
dim_city_population.head(10)

Unnamed: 0,city,state,male_pop,female_pop,num_vetarans,foreign_born,race
0,Silver Spring,Maryland,40601.0,41862.0,1562.0,30908.0,Hispanic or Latino
1,Quincy,Massachusetts,44129.0,49500.0,4147.0,32935.0,White
2,Hoover,Alabama,38040.0,46799.0,4819.0,8229.0,Asian
3,Rancho Cucamonga,California,88127.0,87105.0,5821.0,33878.0,Black or African-American
4,Newark,New Jersey,138040.0,143873.0,5829.0,86253.0,White
5,Peoria,Illinois,56229.0,62432.0,6634.0,7517.0,American Indian and Alaska Native
6,Avondale,Arizona,38712.0,41971.0,4815.0,8355.0,Black or African-American
7,West Covina,California,51629.0,56860.0,3800.0,37038.0,Asian
8,O'Fallon,Missouri,41762.0,43270.0,5783.0,3269.0,Hispanic or Latino
9,High Point,North Carolina,51751.0,58077.0,5204.0,16315.0,Asian


In [15]:
dim_city_statistics = df_demog[['City', 'State', 'Median Age',
                                'Average Household Size']]
dim_city_statistics.columns = ['city', 'state', 'median_age',
                               'avg_household_size']
dim_city_statistics.head(5)

Unnamed: 0,city,state,median_age,avg_household_size
0,Silver Spring,Maryland,33.8,2.6
1,Quincy,Massachusetts,41.0,2.39
2,Hoover,Alabama,38.5,2.58
3,Rancho Cucamonga,California,34.5,3.18
4,Newark,New Jersey,34.6,2.73


#### Run data with Spark

In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [17]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').\
    load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [18]:
# write to parquet: run only the first time
# df_spark.write.parquet("sas_data")

# Read parquet: run if sas_data folder already exists and is not empty
df_spark=spark.read.parquet("sas_data")
df_spark.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

#### Cleaning Steps

1. Transform arrive_date, departure_date in immigration data from SAS time format to pandad datetime format
2. Parse I94_SAS_Labels_Descriptions.SAS file to get auxiliary dimension table - country_code, city_code, state_code
3. Tranform city, state in demography data to upper case to match city_code and state_code table

#### 1. Transform arrdate and depdate from SAS time format to pandad.datetime

In [19]:
def sas_to_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

In [20]:
fact_immigration = fact_immigration.assign(
    arrive_date = lambda row: sas_to_datetime(row.arrive_date)
)

fact_immigration = fact_immigration.assign(
    arrive_date = lambda row: sas_to_datetime(row.departure_date)
)

fact_immigration.head(10)

Unnamed: 0,cic_id,year,country,month,city_code,state_code,arrive_date,departure_date,mode,visa
0,4084316.0,2016.0,United States,4.0,HHW,HI,2016-04-29,20573.0,1.0,2.0
1,4422636.0,2016.0,United States,4.0,MCA,TX,2016-04-24,20568.0,1.0,2.0
2,1195600.0,2016.0,United States,4.0,OGG,FL,2016-04-27,20571.0,1.0,2.0
3,5291768.0,2016.0,United States,4.0,LOS,CA,2016-05-07,20581.0,1.0,2.0
4,985523.0,2016.0,United States,4.0,CHM,NY,2016-04-09,20553.0,3.0,2.0
5,1481650.0,2016.0,United States,4.0,ATL,GA,2016-06-01,20606.0,1.0,2.0
6,2197173.0,2016.0,United States,4.0,SFR,CA,2016-06-30,20635.0,1.0,2.0
7,232708.0,2016.0,United States,4.0,NYC,NY,2016-04-10,20554.0,1.0,2.0
8,5227851.0,2016.0,United States,4.0,CHI,IL,2016-05-01,20575.0,1.0,2.0
9,13213.0,2016.0,United States,4.0,LOS,CA,2016-04-09,20553.0,1.0,2.0


#### 2. Parse description file to get auxiliary dimension table - country_code, city_code, state_code

In [21]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [22]:
country_code = {}
for countries in contents[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country

In [23]:
df_country_code = pd.DataFrame(list(country_code.items()),
                               columns=['code', 'country'])
df_country_code.head(10)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA
5,529,ANGUILLA
6,518,ANTIGUA-BARBUDA
7,687,ARGENTINA
8,151,ARMENIA
9,532,ARUBA


In [24]:
city_code = {}
for cities in contents[303:962]:
    pair = cities.split('=')
    code, city = pair[0].strip("\t").strip().strip("'"), pair[1].strip('\t').strip().strip("''")
    city_code[code] = city

In [25]:
df_city_code = pd.DataFrame(list(city_code.items()),
                            columns=['code', 'city'])
df_city_code.head(10)

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"
5,EGL,"EAGLE, AK"
6,FRB,"FAIRBANKS, AK"
7,HOM,"HOMER, AK"
8,HYD,"HYDER, AK"
9,JUN,"JUNEAU, AK"


In [26]:
state_code = {}
for states in contents[982:1036]:
    pair = states.split('=')
    code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    state_code[code] = state

In [27]:
df_state_code = pd.DataFrame(list(state_code.items()),
                             columns=['code', 'state'])
df_state_code.head(10)

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO
5,CT,CONNECTICUT
6,DE,DELAWARE
7,DC,DIST. OF COLUMBIA
8,FL,FLORIDA
9,GA,GEORGIA


In [28]:
dim_city_statistics = dim_city_statistics.assign(
    city = lambda row: row.city.str.upper()
)
dim_city_statistics = dim_city_statistics.assign(
    state = lambda row: row.state.str.upper()
)
dim_city_statistics.head(10)

Unnamed: 0,city,state,median_age,avg_household_size
0,SILVER SPRING,MARYLAND,33.8,2.6
1,QUINCY,MASSACHUSETTS,41.0,2.39
2,HOOVER,ALABAMA,38.5,2.58
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,3.18
4,NEWARK,NEW JERSEY,34.6,2.73
5,PEORIA,ILLINOIS,33.1,2.4
6,AVONDALE,ARIZONA,29.1,3.18
7,WEST COVINA,CALIFORNIA,39.8,3.56
8,O'FALLON,MISSOURI,36.0,2.77
9,HIGH POINT,NORTH CAROLINA,35.5,2.65


### Step 3: Define the Data Model

#### Conceptual Data Model
Since the purpose of this data warehouse is for OLAP and BI app usage, we will model these data sets with star schema data modeling.

* Star Schema

![Star schema](./img/data_model_capston_udacity.png)

#### Data Pipeline Build Up Steps

1. Assume all data sets are stored in S3 buckets as below
    * `[Source_S3_Bucket]/immigration/18-83510-I94-Data-2016/*.sas7bdat`
    * `[Source_S3_Bucket]/I94_SAS_Labels_Descriptions.SAS`

2. Follow by Step 2 – Cleaning step to clean up data sets

3. Transform immigration data to 1 fact table and 2 dimension tables, fact table will be partitioned by state

4. Parsing label description file to get auxiliary tables.

5. Store these tables back to target S3 bucket


### Step 4: Run Pipelines to Model the Data 

#### 4.1 Create the data model

Data processing and data model was created in Spark based on explorations performed in this notebook. Check the [etl.py](./etl.py) for further details.

#### 4.2 Data Quality Checks

Data quality checks includes

1. No empty table after running ETL data pipeline
2. Data schema of every dimensional table matches data model

Please refer to `Data_Quality_Check.ipynb`

#### 4.3 Data dictionary

#### Fact table

- `immigration`

| Field                                | Type      | Description                                |
| ------------------------------------ | --------- | ------------------------------------------ |
| immigration_id                       | INT       | Primary key                                |
| cic_id                               | BIGINT    | CIC ID                                     |
| year                                 | INT       | 4 digit year                               |
| month                                | INT       | Numeric month                              |
| city_code                            | CHAR(3)   | Three characters abbreviation for USA city |
| state_code                           | CHAR(2)   | Two characters abbreviation for USA state  |
| mode                                 | INT       | Traffic method                             |
| visa                                 | INT       | Visa category                              |
| arrive_date                          | TIMESTAMP | Arrive date                                |
| departure_date                       | TIMESTAMP | Leaving date                               |
| country                              | CHAR(13)  | Country of departure (United States)       |

#### Dimension tables

- `immigration_airline`

| Field               | Type    | Description                                     |
| ------------------- | ------- | ----------------------------------------------- |
| dim_immi_airline_id | INT     | Primary Key                                     |
| cic_id              | BIGINT  | CIC ID                                          |
| airline             | VARCHAR | "Airline used to arrive in U,S,"                |
| admin_num           | BIGINT  | Admission Number                                |
| flight_number       | VARCHAR | Flight number of Airline used to arrive in U.S. |
| visa_type           | CHAR(2) | Class of legal immigration admission            |

- `immigration_personal`

| Field                | Type    | Description                     |
| -------------------- | ------- | ------------------------------- |
| immigration_id       | INT     | Primary key                     |
| cic_id               | BIGINT  | CIC ID                          |
| citizen_country      | INT     | Country of citizenship (i94cit) |
| residence_country    | INT     | Country of residence (i94res)   |
| birth_year           | INT     | Brith year                      |
| gender               | CHAR(1) | Gender                          |
| ins_num              | INT     | INS number                      |

- `city`

| Field     | Type    | Description                                |
| --------- | ------- | ------------------------------------------ |
| city_code | CHAR(2) | Three characters abbreviation for USA city |
| city      | VARCHAR | Full name of USA city                      |

- `state`

| Field      | Type    | Description                               |
| ---------- | ------- | ----------------------------------------- |
| state_code | CHAR(3) | Two characters abbreviation for USA state |
| state      | VARCHAR | Full name of USA state                    |

- `country`

| Field      | Type    | Description  |
| ---------- | ------- | ------------ |
| country_id | INT     | Country ID   |
| country    | VARCHAR | Country name |

- `temperature`

| Field                 | Type      | Description                             |
| --------------------- | --------- | --------------------------------------- |
| dt                    | TIMESTAMP | Record time stamp                       |
| city_code             | VARCHAR   | Record city                             |
| country               | VARCHAR   | Record country                          |
| aveg_temp             | FLOAT     | Monthly average temperature             |
| aveg_temp_uncertainty | FLOAT     | Monthly average temperature uncertainty |
| year                  | INT       | 4 digit year of dt field                |
| month                 | INT       | Numeric month of dt field               |

- `demog_population`

| Field             | Type    | Description                                |
| ----------------- | ------- | ------------------------------------------ |
| demog_pop_id      | INT     | Primary Key                                |
| city_code         | CHAR(3) | Three characters abbreviation for USA city |
| state_code        | CHAR(2) | Two characters abbreviation for USA state  |
| male_population   | INT     | City male population                       |
| female_population | INT     | City female population                     |
| num_veterans      | INT     | Number of veterans                         |
| foreign_born      | INT     | Number of foreign born baby                |
| race              | VARCHAR | Race of majority                           |

- `demog_statistics`

| Field              | Type    | Description                                |
| ------------------ | ------- | ------------------------------------------ |
| demog_stat_id      | INT     | Primary Key                                |
| city_code          | CHAR(3) | Three characters abbreviation for USA city |
| state_code         | CHAR(2) | Two characters abbreviation for USA state  |
| median_age         | INT     | City median age                            |
| avg_household_size | FLOAT   | City average household size                |

### Step 5: Complete Project Write Up

#### Tools and Technologies
1. AWS S3 for data storage

    This allow us to keep and track huge amounth of data for long periods of time. Multiples types of services can be selected depending on the disponibility required, another advantages finded are:

   * Reliable Security.
   * All-time Availability.
   * Very Low cost.
   * Ease of Migration.
   * The Simplicity of Management.

2. AWS Redshift (future)

    This services provides a fully managed and scaled for data warehouses, also include funtionalities of the AWS ecosistem that can make thing easier, this services has some advantages:
     * High speed.
     * Bulk data processing.
     * Minimal data loss.
     * SQL interface.
     * Security.
     * Cost-effective.
3. Pandas for sample data set exploratory data analysis

    This allow us to explore easily and fast a small sample of the data, if we are talking about Bigdata.

4. PySpark for large data set data processing to transform staging table to dimensional table

    This is needed when managing a massive amount of data, or if eventually, you will handle it.

    Some of the its advantages are:

    - Speed.
    - Ease of use.
    - Advanced Analytics.
    - Dynamic in Nature.
    - Multilingual.


#### Data Update Frequency
1. Tables created from immigration and temperature data set should be updated monthly since the raw data set is built up monthly.

2. Tables created from demography data set could be updated annually since demography data collection takes time and high frequent demography might take high cost but generate wrong conclusion.

3. All tables should be update in an append-only mode.

#### Future Design Considerations
1. The data was increased by 100x.

    If Spark with standalone server mode can not process 100x data set, we could consider to put data in [AWS EMR](https://aws.amazon.com/tw/emr/?nc2=h_ql_prod_an_emr&whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc) which is a distributed data cluster for processing large data sets on cloud, also can be considered Databricks as a solution for this task, it is important to analyze cost/benefit from an engineering perspective.


2. The data populates a dashboard that must be updated on a daily basis by 7am every day.

    [Apache Airflow](https://airflow.apache.org) could be used for building up a ETL data pipeline to regularly update the date and populate a report. Apache Airflow also integrate with Python and AWS works very well. More applications can be combined together to deliever more powerful task automation, also can look for for another alternatives to Apache Airflow that can fit better your needs.


3. The database needed to be accessed by 100+ people.

    [AWS Redshift](https://aws.amazon.com/tw/redshift/?nc2=h_ql_prod_db_rs&whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc) can handle up to 500 connections. If this SSOT database will be accessed by 100+ people, we can move this database to Redshift with confidence to handle this request. Cost/Benefit analysis will be needed if we are going be implement this cloud solution.

## Test query

Get the most `visa_type` in the sample data.

In [29]:
df_immigration_airline.dtypes

cic_id           float64
airline           object
admin_num        float64
flight_number     object
visa_type         object
dtype: object

In [30]:
from pyspark.sql.types import StructType,StructField, StringType, FloatType

In [31]:
schema = StructType([ \
    StructField("cic_id",FloatType(),True), \
    StructField("airline",StringType(),True), \
    StructField("admin_num",FloatType(),True), \
    StructField("flight_number", StringType(), True), \
    StructField("visa_type", StringType(), True) \
  ])

In [32]:
sp_immigration_airline = spark.createDataFrame(df_immigration_airline, schema)
sp_immigration_airline.show(5)

+---------+-------+------------+-------------+---------+
|   cic_id|airline|   admin_num|flight_number|visa_type|
+---------+-------+------------+-------------+---------+
|4084316.0|     JL|5.6582676E10|        00782|       WT|
|4422636.0|    *GA|9.4361993E10|        XBLNG|       B2|
|1195600.0|     LH|5.5780467E10|        00464|       WT|
|5291768.0|     QR|9.4789698E10|        00739|       B2|
| 985523.0|    NaN|4.2322571E10|         LAND|       WT|
+---------+-------+------------+-------------+---------+
only showing top 5 rows



In [33]:
sp_immigration_airline.createOrReplaceTempView("immigration_airline")

In [34]:
query = spark.sql("""
    SELECT visa_type,
       Count(cic_id) AS Quantity
    FROM   immigration_airline
    GROUP  BY visa_type
    ORDER  BY Count(cic_id) DESC
    LIMIT  1
""")
query.show()

+---------+--------+
|visa_type|Quantity|
+---------+--------+
|       WT|     443|
+---------+--------+

