# Project Title
### Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import pyspark

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will integrate I94 immigration data, world temperature data, US demographic data and Airport Code Table to setup a data warehouse with fact and dimension tables.

#### Describe and Gather Data 


1. [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html), SAS. The dataset contains international visitor arrival statistics by world regions and select countries, type of visa, mode of transportation, age groups, states visited, and the top ports of entry.
2. [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data), CSV. This dataset is from Kaggle and contains average temperature data each month at different country in the world.
3. [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/), CSV. This dataset contains information about the demographics of all US cities and census-designated places (population>= 65,000).
4. [Airport Code Table](https://datahub.io/core/airport-codes#data), CSV. This dataset contains either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code.


### Step 2: Explore and Assess the Data

#### Immigration dataset

In [2]:
# Read in the data here
df_immi = pd.read_csv("immigration_data_sample.csv")

In [3]:
df_immi.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
df_immi.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [5]:
fact_immi = df_immi[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']]
fact_immi.columns = ['cic_id', 'year', 'month', 'port_code', 'state_code', 'arrive_date', 'departure_date', 'travel_code', 'visa_code']
fact_immi.head(5)

Unnamed: 0,cic_id,year,month,port_code,state_code,arrive_date,departure_date,travel_code,visa_code
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0


In [6]:
fact_immi['country'] = 'United States'
fact_immi.head(5)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


Unnamed: 0,cic_id,year,month,port_code,state_code,arrive_date,departure_date,travel_code,visa_code,country
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0,United States


In [7]:
dim_immi_personal = df_immi[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']]
dim_immi_personal.columns = [['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']]
dim_immi_personal.head(5)

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,


In [8]:
dim_immi_airline = df_immi[['cicid', 'airline', 'admnum', 'fltno', 'visatype','i94port']]
dim_immi_airline.columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type', 'port_code']
dim_immi_airline.head(5)

Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type,port_code
0,4084316.0,JL,56582670000.0,00782,WT,HHW
1,4422636.0,*GA,94362000000.0,XBLNG,B2,MCA
2,1195600.0,LH,55780470000.0,00464,WT,OGG
3,5291768.0,QR,94789700000.0,00739,B2,LOS
4,985523.0,,42322570000.0,LAND,WT,CHM


#### temperature data set

In [9]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temperature = pd.read_csv(fname)

In [10]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [11]:
df_temperature['dt'].count()

8599212

In [12]:
len(pd.unique(df_temperature['dt']))

3239

##### Therefore non of the existing column could be used as id for the temperature dataset

In [13]:
df_temperature_usa = df_temperature[df_temperature['Country'] == 'United States']
df_temperature_usa = df_temperature_usa[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
df_temperature_usa.columns = ['dt', 'avg_temp', 'avg_temp_uncertnty', 'city', 'country']
df_temperature_usa.head(5)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country
47555,1820-01-01,2.101,3.217,Abilene,United States
47556,1820-02-01,6.926,2.853,Abilene,United States
47557,1820-03-01,10.767,2.395,Abilene,United States
47558,1820-04-01,17.989,2.202,Abilene,United States
47559,1820-05-01,21.809,2.036,Abilene,United States


In [14]:
df_temperature_usa['dt'] = pd.to_datetime(df_temperature_usa['dt'])
df_temperature_usa['year'] = df_temperature_usa['dt'].apply(lambda t: t.year)
df_temperature_usa['month'] = df_temperature_usa['dt'].apply(lambda t: t.month)
df_temperature_usa.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,1820,5


In [15]:
df_temperature_usa_2016 = df_temperature_usa[df_temperature_usa['year'] == 2016]

In [16]:
df_temperature_usa_2016.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month


##### Therefore, it indicates that this temperature dataset does not contain data in 2016

### U.S. City Demographic Data Set

In [17]:
df_demographic = pd.read_csv("us-cities-demographics.csv", sep=";")

In [18]:
df_demographic.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [19]:
dim_demographic_population = df_demographic[['City', 'State Code', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Race']]
dim_demographic_population.columns = ['city', 'state_code', 'male_pop', 'female_pop', 'num_vetarans', 'foreign_born', 'race']
dim_demographic_population.head()

Unnamed: 0,city,state_code,male_pop,female_pop,num_vetarans,foreign_born,race
0,Silver Spring,MD,40601.0,41862.0,1562.0,30908.0,Hispanic or Latino
1,Quincy,MA,44129.0,49500.0,4147.0,32935.0,White
2,Hoover,AL,38040.0,46799.0,4819.0,8229.0,Asian
3,Rancho Cucamonga,CA,88127.0,87105.0,5821.0,33878.0,Black or African-American
4,Newark,NJ,138040.0,143873.0,5829.0,86253.0,White


In [20]:
dim_demographic_statistics = df_demographic[['City', 'State Code', 'Median Age', 'Average Household Size']]
dim_demographic_statistics.columns = ['city', 'state_code', 'median_age', 'avg_household_size']
dim_demographic_statistics.head()

Unnamed: 0,city,state_code,median_age,avg_household_size
0,Silver Spring,MD,33.8,2.6
1,Quincy,MA,41.0,2.39
2,Hoover,AL,38.5,2.58
3,Rancho Cucamonga,CA,34.5,3.18
4,Newark,NJ,34.6,2.73


### Airport Codes Data Set

In [21]:
df_airport = pd.read_csv("airport-codes_csv.csv")

In [22]:
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [23]:
df_airport[~df_airport['iata_code'].isnull()].head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
223,03N,small_airport,Utirik Airport,4.0,OC,MH,MH-UTI,Utirik Island,K03N,UTK,03N,"169.852005, 11.222"
440,07FA,small_airport,Ocean Reef Club Airport,8.0,,US,US-FL,Key Largo,07FA,OCA,07FA,"-80.274803161621, 25.325399398804"
594,0AK,small_airport,Pilot Station Airport,305.0,,US,US-AK,Pilot Station,,PQS,0AK,"-162.899994, 61.934601"
673,0CO2,small_airport,Crested Butte Airpark,8980.0,,US,US-CO,Crested Butte,0CO2,CSE,0CO2,"-106.928341, 38.851918"
1088,0TE7,small_airport,LBJ Ranch Airport,1515.0,,US,US-TX,Johnson City,0TE7,JCY,0TE7,"-98.62249755859999, 30.251800537100003"


In [24]:
df_airport[(df_airport['iata_code']=="ATL") & (df_airport['iso_country']=="US")]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
26128,KATL,large_airport,Hartsfield Jackson Atlanta International Airport,1026.0,,US,US-GA,Atlanta,KATL,ATL,ATL,"-84.428101, 33.6367"


##### Need to filter with non-missing iata_code and "US" as iso_country for this data warehouse, otherwise errors may be generated. e.g. with iata_code as LOS, it is not referring to Los Angles in US, but rather to Lagos Airport in Nigeria.

In [25]:
dim_airport = df_airport[~df_airport['iata_code'].isnull() & (df_airport['iso_country']=="US")] 

dim_airport = dim_airport[['iata_code', 'name', 'type', 'iso_country', 'iso_region', 'municipality', 'coordinates']]
dim_airport.columns = [['iata_code', 'name', 'type', 'country', 'state', 'municipality', 'coordinates']]

In [26]:
dim_airport.head()

Unnamed: 0,iata_code,name,type,country,state,municipality,coordinates
440,OCA,Ocean Reef Club Airport,small_airport,US,US-FL,Key Largo,"-80.274803161621, 25.325399398804"
594,PQS,Pilot Station Airport,small_airport,US,US-AK,Pilot Station,"-162.899994, 61.934601"
673,CSE,Crested Butte Airpark,small_airport,US,US-CO,Crested Butte,"-106.928341, 38.851918"
1088,JCY,LBJ Ranch Airport,small_airport,US,US-TX,Johnson City,"-98.62249755859999, 30.251800537100003"
1402,PMX,Metropolitan Airport,small_airport,US,US-MA,Palmer,"-72.31140136719999, 42.223300933800004"


In [27]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [28]:
#write to parquet
df_spark.write.parquet("sas_data", mode="overwrite")
df_spark=spark.read.parquet("sas_data")

In [29]:
df_spark.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

#### Cleaning Steps
Document steps necessary to clean the data

1.Immigration Data:
* get the corresponding code and name/detail for country, visa, port(city/state) information from I94_SAS_Labels_Descriptions.SAS

In [30]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [31]:
country_code = {}
for countries in contents[9:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country

In [32]:
df_country_code = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
df_country_code.head()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [33]:
port_code = {}
for ports in contents[302:962]:
    pair = ports.split('=')
    code, port = pair[0].strip().strip("''"), pair[1].strip('\t').strip().strip("'")
    port_code[code] = port

In [34]:
df_port_code = pd.DataFrame(list(port_code.items()), columns=['code', 'port'])
df_port_code.head()

Unnamed: 0,code,port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [35]:
state_code = {}
for states in contents[981:1036]:
    pair = states.split('=')
    code, state = pair[0].strip().strip("''"), pair[1].strip('\t').strip().strip("'")
    state_code[code] = state

In [36]:
df_state_code = pd.DataFrame(list(state_code.items()), columns=['code', 'state'])
df_state_code.head()

Unnamed: 0,code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [37]:
visa_code = {}
for visas in contents[1046:1049]:
    pair = visas.split('=')
    code, visa = pair[0].strip().strip("''"), pair[1].strip().strip("'")
    visa_code[code] = visa


In [38]:
df_visa_code = pd.DataFrame(list(visa_code.items()), columns=['code', 'visa'])
df_visa_code

Unnamed: 0,code,visa
0,1,Business
1,2,Pleasure
2,3,Student


In [39]:
travel_code = {}
for travels in contents[972:976]:
    pair = travels.split('=')
    code, travel = pair[0].strip().strip("''"), pair[1].strip('\n').strip(";").strip().strip("''")
    travel_code[code] = travel


In [40]:
df_travel_code = pd.DataFrame(list(travel_code.items()), columns=['code', 'travel'])
df_travel_code

Unnamed: 0,code,travel
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


* Transform arrive_date, departure_date in immigration data from SAS time format to pandad datetime format

In [41]:
def process_SAS_date(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

In [42]:
fact_immi["arrive_date"] = process_SAS_date(fact_immi["arrive_date"])
fact_immi["departure_date"] = process_SAS_date(fact_immi["departure_date"])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [43]:
fact_immi.head()

Unnamed: 0,cic_id,year,month,port_code,state_code,arrive_date,departure_date,travel_code,visa_code,country
0,4084316.0,2016.0,4.0,HHW,HI,2016-04-22,2016-04-29,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,2016-04-23,2016-04-24,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,2016-04-07,2016-04-27,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,2016-04-28,2016-05-07,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,2016-04-06,2016-04-09,3.0,2.0,United States


2.Demographic Data Set
* Upper Case city so it could match the port_code dataframe

In [44]:
dim_demographic_population["city"] = dim_demographic_population["city"].apply(lambda x: x.upper())

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [45]:
dim_demographic_statistics.loc[:,"city"] = dim_demographic_statistics.loc[:,"city"].apply(lambda x: x.upper())

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self.obj[item] = s


In [46]:
dim_demographic_population.head()

Unnamed: 0,city,state_code,male_pop,female_pop,num_vetarans,foreign_born,race
0,SILVER SPRING,MD,40601.0,41862.0,1562.0,30908.0,Hispanic or Latino
1,QUINCY,MA,44129.0,49500.0,4147.0,32935.0,White
2,HOOVER,AL,38040.0,46799.0,4819.0,8229.0,Asian
3,RANCHO CUCAMONGA,CA,88127.0,87105.0,5821.0,33878.0,Black or African-American
4,NEWARK,NJ,138040.0,143873.0,5829.0,86253.0,White


In [47]:
dim_demographic_statistics.head()

Unnamed: 0,city,state_code,median_age,avg_household_size
0,SILVER SPRING,MD,33.8,2.6
1,QUINCY,MA,41.0,2.39
2,HOOVER,AL,38.5,2.58
3,RANCHO CUCAMONGA,CA,34.5,3.18
4,NEWARK,NJ,34.6,2.73


* Convert the city with corresponding port code

In [48]:
port_code_reverse = {}
for ports in contents[302:962]:
    pair = ports.split('=')
    port, code = pair[1].strip('\t').strip().strip("'").split(",")[0], pair[0].strip().strip("''")
    port_code_reverse[port] = code

In [49]:
df_port_code_reverse = pd.DataFrame(list(port_code_reverse.items()), columns=['port', 'code'])
df_port_code_reverse.head()

Unnamed: 0,port,code
0,ALCAN,ALC
1,ANCHORAGE,ANC
2,BAKER AAF - BAKER ISLAND,BAR
3,DALTONS CACHE,DAC
4,DEW STATION PT LAY DEW,PIZ


In [50]:
dim_demographic_population["city_code"] = dim_demographic_population["city"].apply(lambda x:  port_code_reverse[x] if x in port_code_reverse else x)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [51]:
dim_demographic_population.head(20)

Unnamed: 0,city,state_code,male_pop,female_pop,num_vetarans,foreign_born,race,city_code
0,SILVER SPRING,MD,40601.0,41862.0,1562.0,30908.0,Hispanic or Latino,SILVER SPRING
1,QUINCY,MA,44129.0,49500.0,4147.0,32935.0,White,QUINCY
2,HOOVER,AL,38040.0,46799.0,4819.0,8229.0,Asian,HOOVER
3,RANCHO CUCAMONGA,CA,88127.0,87105.0,5821.0,33878.0,Black or African-American,RANCHO CUCAMONGA
4,NEWARK,NJ,138040.0,143873.0,5829.0,86253.0,White,NEWARK
5,PEORIA,IL,56229.0,62432.0,6634.0,7517.0,American Indian and Alaska Native,PEORIA
6,AVONDALE,AZ,38712.0,41971.0,4815.0,8355.0,Black or African-American,AVONDALE
7,WEST COVINA,CA,51629.0,56860.0,3800.0,37038.0,Asian,WEST COVINA
8,O'FALLON,MO,41762.0,43270.0,5783.0,3269.0,Hispanic or Latino,O'FALLON
9,HIGH POINT,NC,51751.0,58077.0,5204.0,16315.0,Asian,HIGH POINT


##### It seems that the same city might consist of several "race-partitioned" record, so further action might be available here to "pivot" the dataset.

In [52]:
dim_demographic_statistics["city"] = dim_demographic_statistics["city"].apply(lambda x:  port_code_reverse[x] if x in port_code_reverse else x)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [53]:
dim_demographic_statistics.head()

Unnamed: 0,city,state_code,median_age,avg_household_size
0,SILVER SPRING,MD,33.8,2.6
1,QUINCY,MA,41.0,2.39
2,HOOVER,AL,38.5,2.58
3,RANCHO CUCAMONGA,CA,34.5,3.18
4,NEWARK,NJ,34.6,2.73


##### It is found that not all citys have a corresponding 3-letter code in SAS_Labels_Description, so we leave the original full names

3.Airport Dataset
* Clean the coordiantes into longitude and latitude

In [54]:
dim_airport.head()

Unnamed: 0,iata_code,name,type,country,state,municipality,coordinates
440,OCA,Ocean Reef Club Airport,small_airport,US,US-FL,Key Largo,"-80.274803161621, 25.325399398804"
594,PQS,Pilot Station Airport,small_airport,US,US-AK,Pilot Station,"-162.899994, 61.934601"
673,CSE,Crested Butte Airpark,small_airport,US,US-CO,Crested Butte,"-106.928341, 38.851918"
1088,JCY,LBJ Ranch Airport,small_airport,US,US-TX,Johnson City,"-98.62249755859999, 30.251800537100003"
1402,PMX,Metropolitan Airport,small_airport,US,US-MA,Palmer,"-72.31140136719999, 42.223300933800004"


In [55]:
dim_airport = dim_airport.reset_index()

In [56]:
dim_airport.head()

Unnamed: 0,index,iata_code,name,type,country,state,municipality,coordinates
0,440,OCA,Ocean Reef Club Airport,small_airport,US,US-FL,Key Largo,"-80.274803161621, 25.325399398804"
1,594,PQS,Pilot Station Airport,small_airport,US,US-AK,Pilot Station,"-162.899994, 61.934601"
2,673,CSE,Crested Butte Airpark,small_airport,US,US-CO,Crested Butte,"-106.928341, 38.851918"
3,1088,JCY,LBJ Ranch Airport,small_airport,US,US-TX,Johnson City,"-98.62249755859999, 30.251800537100003"
4,1402,PMX,Metropolitan Airport,small_airport,US,US-MA,Palmer,"-72.31140136719999, 42.223300933800004"


In [57]:
import numpy as np

In [58]:
idx = np.where(dim_airport.columns.get_loc("coordinates"))[0][0]

In [59]:
dim_airport_coor = pd.DataFrame(dim_airport.iloc[:,idx].str.split(", ").tolist(), columns = ['longitude','latitude'])

In [60]:
dim_airport_coor.head()

Unnamed: 0,longitude,latitude
0,-80.274803161621,25.325399398804
1,-162.899994,61.934601
2,-106.928341,38.851918
3,-98.6224975586,30.251800537100003
4,-72.31140136719999,42.2233009338


In [61]:
dim_airport = pd.concat([dim_airport,dim_airport_coor], axis=1, join="inner")

In [62]:
dim_airport.columns = ["index", "iata_code", "name", "type", "country", "state", "municipality", "coordinates", "longitude", "latitude"]

In [63]:
dim_airport.drop(["index", "coordinates"], axis=1).head()

Unnamed: 0,iata_code,name,type,country,state,municipality,longitude,latitude
0,OCA,Ocean Reef Club Airport,small_airport,US,US-FL,Key Largo,-80.274803161621,25.325399398804
1,PQS,Pilot Station Airport,small_airport,US,US-AK,Pilot Station,-162.899994,61.934601
2,CSE,Crested Butte Airpark,small_airport,US,US-CO,Crested Butte,-106.928341,38.851918
3,JCY,LBJ Ranch Airport,small_airport,US,US-TX,Johnson City,-98.6224975586,30.251800537100003
4,PMX,Metropolitan Airport,small_airport,US,US-MA,Palmer,-72.31140136719999,42.2233009338


In [64]:
dim_airport = dim_airport.drop(["index", "coordinates"], axis=1)

In [65]:
dim_airport["longitude"] = dim_airport["longitude"].astype(float)

In [66]:
dim_airport["latitude"] = dim_airport["latitude"].astype(float)

In [67]:
dim_airport.head()

Unnamed: 0,iata_code,name,type,country,state,municipality,longitude,latitude
0,OCA,Ocean Reef Club Airport,small_airport,US,US-FL,Key Largo,-80.274803,25.325399
1,PQS,Pilot Station Airport,small_airport,US,US-AK,Pilot Station,-162.899994,61.934601
2,CSE,Crested Butte Airpark,small_airport,US,US-CO,Crested Butte,-106.928341,38.851918
3,JCY,LBJ Ranch Airport,small_airport,US,US-TX,Johnson City,-98.622498,30.251801
4,PMX,Metropolitan Airport,small_airport,US,US-MA,Palmer,-72.311401,42.223301


* Remove country code in the state

In [68]:
dim_airport["state"] = dim_airport["state"].apply(lambda x: x.split("-")[1])

In [69]:
dim_airport["country"] = "United States"

In [70]:
dim_airport.columns = ["iata_code", "name", "type", "country", "state_code", "municipality", "longitude", "latitude"]

In [71]:
dim_airport.head()

Unnamed: 0,iata_code,name,type,country,state_code,municipality,longitude,latitude
0,OCA,Ocean Reef Club Airport,small_airport,United States,FL,Key Largo,-80.274803,25.325399
1,PQS,Pilot Station Airport,small_airport,United States,AK,Pilot Station,-162.899994,61.934601
2,CSE,Crested Butte Airpark,small_airport,United States,CO,Crested Butte,-106.928341,38.851918
3,JCY,LBJ Ranch Airport,small_airport,United States,TX,Johnson City,-98.622498,30.251801
4,PMX,Metropolitan Airport,small_airport,United States,MA,Palmer,-72.311401,42.223301


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Since the purpose of this data warehouse is for OLAP and BI app usage, these data sets will be modelled with star schema into a fact table and several dimension tables.
Details could be seen from this figure:


<img src="img/star_schema.png" style="float: left;" alt="" width="1000"/>


#### 3.2 Mapping Out Data Pipelines
1. Althrough all data sets are provided locally, we may assume all data sets are stored in S3 buckets as below
#####  [Source_S3_Bucket]/immigration/18-83510-I94-Data-2016/*.sas7bdat
##### [Source_S3_Bucket]/I94_SAS_Labels_Descriptions.SAS
##### [Source_S3_Bucket]/temperature/GlobalLandTemperaturesByCity.csv
##### [Source_S3_Bucket]/demography/us-cities-demographics.csv
2. Follow by Step 2 – Cleaning step to clean up data sets:
##### Transform immigration data to 1 fact table and 2 dimension tables, fact table will be partitioned by state
##### Parsing label description file to additional tables to document the codes and names
##### ----
##### Transform temperature data to dimension table
##### Filter only for US which are relevent for this datawarehouse
##### ----
##### Split demography data to 2 dimension tables
##### Upper case the state and port name where additional tables created above will also help here
##### Depending on the usage, further aggregation may be applied here too
##### ----
##### Transform airport data to dimension table
##### Filter only for US and non-iata rows
##### Split to get the coordinates and clean the state code


3. Store these tables back to target S3 bucket

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.
Refer and run etl.py for the cleaned and entire process into target S3 bucket.

In [72]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf, col, lit, year, month, upper, to_date
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import DateType
from pyspark.sql.functions import split
from pyspark.sql.types import FloatType, IntegerType, LongType

In [73]:
input_data = "./"   # to be replaced with source S3 bucket if needed
output_data = "/opt/output/" # to be replaced with target S3 bucket in etl.py

In [74]:
def rename_columns(table, new_columns):
    for original, new in zip(table.columns, new_columns):
        table = table.withColumnRenamed(original, new)
    return table

#### Fact table

In [75]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [76]:
fact_immi = df_spark.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa').distinct()\
                         .withColumn("immigration_id", monotonically_increasing_id())

new_columns = ['cic_id', 'year', 'month', 'port_code', 'state_code', 'arrive_date', 'departure_date', 'travel_code', 'visa_code']
fact_immi = rename_columns(fact_immi, new_columns)

In [77]:
fact_immi = fact_immi.withColumn("country", lit("United States"))

In [78]:
def SAS_to_date(date):
    if date is not None:
        return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')
    else:
        return None

In [79]:
process_SAS_date_udf = udf(SAS_to_date, DateType())

In [80]:
fact_immi = fact_immi.withColumn('arrive_date',  process_SAS_date_udf(col('arrive_date')))
fact_immi = fact_immi.withColumn('departure_date',  process_SAS_date_udf(col('departure_date')))

In [81]:
fact_immi.show(5)

+------+------+-----+---------+----------+-----------+--------------+-----------+---------+--------------+-------------+
|cic_id|  year|month|port_code|state_code|arrive_date|departure_date|travel_code|visa_code|immigration_id|      country|
+------+------+-----+---------+----------+-----------+--------------+-----------+---------+--------------+-------------+
|  27.0|2016.0|  4.0|      BOS|        MA| 2016-04-01|    2016-04-05|        1.0|      1.0|             0|United States|
| 233.0|2016.0|  4.0|      NYC|        NY| 2016-04-01|    2016-04-07|        1.0|      2.0|             1|United States|
|1103.0|2016.0|  4.0|      NEW|        NY| 2016-04-01|    2016-04-09|        1.0|      2.0|             2|United States|
|1123.0|2016.0|  4.0|      NEW|        PA| 2016-04-01|    2016-04-08|        1.0|      1.0|             3|United States|
|1446.0|2016.0|  4.0|      NYC|        NY| 2016-04-01|    2016-04-07|        1.0|      2.0|             4|United States|
+------+------+-----+---------+-

In [82]:
fact_immi = fact_immi.withColumn('cic_id',  col('cic_id').cast(IntegerType()))
fact_immi = fact_immi.withColumn('year',  col('year').cast(IntegerType()))
fact_immi = fact_immi.withColumn('month',  col('month').cast(IntegerType()))
fact_immi = fact_immi.withColumn('travel_code',  col('travel_code').cast(IntegerType()))
fact_immi = fact_immi.withColumn('visa_code',  col('visa_code').cast(IntegerType()))

In [83]:
fact_immi.write.mode("overwrite").partitionBy('state_code')\
                    .parquet(path=output_data + 'fact_immi')

#### Dimension personal table

In [84]:
dim_immi_personal = df_spark.select('cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum').distinct()\
                                .withColumn("immi_person_id",  monotonically_increasing_id())

In [85]:
new_columns = ['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']
dim_immi_personal = rename_columns(dim_immi_personal, new_columns)

In [86]:
dim_immi_personal = dim_immi_personal.withColumn('cic_id',  col('cic_id').cast(IntegerType()))
dim_immi_personal = dim_immi_personal.withColumn('citizen_country',  col('citizen_country').cast(IntegerType()))
dim_immi_personal = dim_immi_personal.withColumn('residence_country',  col('residence_country').cast(IntegerType()))
dim_immi_personal = dim_immi_personal.withColumn('birth_year',  col('birth_year').cast(IntegerType()))

In [87]:
dim_immi_personal.show(5)

+------+---------------+-----------------+----------+------+-------+--------------+
|cic_id|citizen_country|residence_country|birth_year|gender|ins_num|immi_person_id|
+------+---------------+-----------------+----------+------+-------+--------------+
|    16|            101|              101|      1988|  null|   null|             0|
|    84|            103|              103|      1994|     M|   null|             1|
|   536|            103|              103|      1956|     M|   null|             2|
|   670|            103|              124|      1979|     M|   null|             3|
|   681|            103|              112|      1955|     F|   null|             4|
+------+---------------+-----------------+----------+------+-------+--------------+
only showing top 5 rows



In [88]:
dim_immi_personal.write.mode("overwrite")\
                    .parquet(path=output_data + 'dim_immi_personal')

#### Dimension airline table

In [89]:
dim_immi_airline = df_spark.select('cicid', 'airline', 'admnum', 'fltno', 'visatype','i94port').distinct()\
                                .withColumn("immi_airline_id",  monotonically_increasing_id())

In [90]:
new_columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type', 'port_code']
dim_immi_airline = rename_columns(dim_immi_airline, new_columns)

In [91]:
dim_immi_airline = dim_immi_airline.withColumn('cic_id',  col('cic_id').cast(IntegerType()))

In [92]:
dim_immi_airline = dim_immi_airline.withColumn('admin_num',  col('admin_num').cast(LongType()))

In [93]:
dim_immi_airline.show(5)

+------+-------+-----------+-------------+---------+---------+---------------+
|cic_id|airline|  admin_num|flight_number|visa_type|port_code|immi_airline_id|
+------+-------+-----------+-------------+---------+---------+---------------+
|   472|    VES|55408590533|        91285|       WT|      FTL|              0|
|  1019|     UA|55438475733|        02067|       WT|      NEW|              1|
|  1932|     SK|55415788133|        00909|       WB|      NEW|              2|
|  2252|     DY|92512109330|        07031|       B2|      FTL|              3|
|  2358|     LH|92476163130|        00440|       B1|      HOU|              4|
+------+-------+-----------+-------------+---------+---------+---------------+
only showing top 5 rows



In [94]:
dim_immi_airline.write.mode("overwrite")\
                    .parquet(path=output_data + 'dim_immi_airline')

#### Write additional code tables with spark

In [95]:
spark.createDataFrame(country_code.items(), ['code', 'country'])\
         .withColumn('code', col('code').cast(IntegerType()))\
         .write.mode("overwrite")\
         .parquet(path=output_data + 'country_code')

In [96]:
spark.createDataFrame(port_code.items(), ['code', 'port'])\
         .write.mode("overwrite")\
         .parquet(path=output_data + 'port_code')

In [97]:
spark.createDataFrame(state_code.items(), ['code', 'state'])\
         .write.mode("overwrite")\
         .parquet(path=output_data + 'state_code')

In [98]:
spark.createDataFrame(visa_code.items(), ['code', 'visa'])\
         .withColumn('code', col('code').cast(IntegerType()))\
         .write.mode("overwrite")\
         .parquet(path=output_data + 'visa_code')

In [99]:
spark.createDataFrame(travel_code.items(), ['code', 'travel'])\
         .withColumn('code', col('code').cast(IntegerType()))\
         .write.mode("overwrite")\
         .parquet(path=output_data + 'travel_code')

#### Dimension demographic table

In [100]:
import os

In [101]:
path = os.path.join(input_data + '/us-cities-demographics.csv')

In [102]:
df_spark = spark.read.format('csv').options(header=True, delimiter=';').load(path)

In [103]:
dim_demographic_population = df_spark.select('City', 'State Code', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Race').distinct()\
                                .withColumn("pop_id",  monotonically_increasing_id())

In [104]:
new_columns = ['city', 'state_code', 'male_pop', 'female_pop', 'num_vetarans', 'foreign_born', 'race']
dim_demographic_population = rename_columns(dim_demographic_population, new_columns)

In [105]:
dim_demographic_population.show(5)

+----------+----------+--------+----------+------------+------------+--------------------+------+
|      city|state_code|male_pop|female_pop|num_vetarans|foreign_born|                race|pop_id|
+----------+----------+--------+----------+------------+------------+--------------------+------+
|Providence|        RI|   89090|     90114|        4933|       53532|               Asian|     0|
| Iowa City|        IA|   37089|     37138|        2047|        9202|American Indian a...|     1|
|     Tampa|        FL|  175517|    193511|       20636|       58795|American Indian a...|     2|
|  Richmond|        VA|  104793|    115496|       12538|       15741|American Indian a...|     3|
|  McKinney|        TX|   77339|     85548|        8082|       25760|  Hispanic or Latino|     4|
+----------+----------+--------+----------+------------+------------+--------------------+------+
only showing top 5 rows



In [106]:
dim_demographic_population.write.mode("overwrite").partitionBy('state_code')\
                    .parquet(path=output_data + 'dim_demographic_population')

In [107]:
dim_demographic_statistics = df_spark.select('City', 'State Code', 'Median Age', 'Average Household Size').distinct()\
                                .withColumn("stat_id",  monotonically_increasing_id())

In [108]:
new_columns = ['city', 'state_code', 'median_age', 'avg_household_size']
dim_demographic_statistics = rename_columns(dim_demographic_statistics, new_columns)

In [109]:
dim_demographic_statistics = dim_demographic_statistics.withColumn('city', upper(col('city')))

In [110]:
dim_demographic_statistics.show(5)

+-----------------+----------+----------+------------------+----------+
|             city|state_code|median_age|avg_household_size|   stat_id|
+-----------------+----------+----------+------------------+----------+
|ARLINGTON HEIGHTS|        IL|      45.9|              2.38|         0|
|         GULFPORT|        MS|      35.1|              2.54|         1|
| WEST VALLEY CITY|        UT|      30.7|              3.81|         2|
|       RIO RANCHO|        NM|      38.3|              2.86|8589934592|
|          ANTIOCH|        CA|      34.0|              3.31|8589934593|
+-----------------+----------+----------+------------------+----------+
only showing top 5 rows



In [111]:
dim_demographic_statistics.write.mode("overwrite").partitionBy('state_code')\
                    .parquet(path=output_data + 'dim_demographic_statistics')

#### Dimension airport table

In [112]:
path = os.path.join(input_data + '/airport-codes_csv.csv')

In [113]:
df_spark = spark.read.format('csv').options(header=True).load(path)

In [114]:
dim_airport = df_spark.select('iata_code', 'name', 'type', 'iso_country', 'iso_region', 'municipality', 'coordinates').distinct()

In [115]:
new_columns = ['iata_code', 'name', 'type', 'country', 'state_code', 'municipality', 'coordinates']
dim_airport = rename_columns(dim_airport, new_columns)

In [116]:
dim_airport = dim_airport.filter(col("iata_code").isNotNull()).filter(col('iso_country')=='US')

In [117]:
dim_airport = dim_airport.withColumn("longitude", split('coordinates', ", ")[0].cast(FloatType()))

In [118]:
dim_airport = dim_airport.withColumn("latitude", split('coordinates', ", ")[1].cast(FloatType()))

In [119]:
dim_airport = dim_airport.drop(col("coordinates"))

In [120]:
dim_airport = dim_airport.withColumn("state_code", split("state_code", "-")[1])

In [121]:
dim_airport = dim_airport.withColumn("country", lit("United States"))

In [122]:
dim_airport.show(5)

+---------+--------------------+--------------+-------------+----------+------------+---------+--------+
|iata_code|                name|          type|      country|state_code|municipality|longitude|latitude|
+---------+--------------------+--------------+-------------+----------+------------+---------+--------+
|      BSW| Boswell Bay Airport| small_airport|United States|        AK| Boswell Bay| -146.146| 60.4231|
|      DGW|Converse County A...| small_airport|United States|        WY|     Douglas| -105.386| 42.7972|
|      DUC|   Halliburton Field| small_airport|United States|        OK|      Duncan| -97.9599| 34.4709|
|      GTG|Grantsburg Munici...| small_airport|United States|        WI|  Grantsburg| -92.6644| 45.7981|
|      ISO|Kinston Regional ...|medium_airport|United States|        NC|     Kinston| -77.6088| 35.3314|
+---------+--------------------+--------------+-------------+----------+------------+---------+--------+
only showing top 5 rows



In [123]:
dim_airport.write.mode("overwrite").partitionBy('state_code')\
                    .parquet(path=output_data + 'dim_airport')

#### Dimension temperature table

In [124]:
df_spark = spark.read.format('csv').options(header=True).load('../../data2/GlobalLandTemperaturesByCity.csv')

In [125]:
dim_temperature = df_spark.filter(col('country')=='United States')

In [126]:
dim_temperature = dim_temperature.select('dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country')

In [127]:
new_columns = ['dt', 'avg_temp', 'avg_temp_uncertnty', 'city', 'country']
dim_temperature = rename_columns(dim_temperature, new_columns)

In [128]:
dim_temperature = dim_temperature.withColumn('dt', to_date(col('dt')))
dim_temperature = dim_temperature.withColumn('year', year(col('dt')))
dim_temperature = dim_temperature.withColumn('month', month(col('dt')))
dim_temperature = dim_temperature.withColumn("record_id",  monotonically_increasing_id())

In [129]:
dim_temperature = dim_temperature.withColumn('avg_temp', col('avg_temp').cast(FloatType()))
dim_temperature = dim_temperature.withColumn('avg_temp_uncertnty', col('avg_temp_uncertnty').cast(FloatType()))

In [130]:
dim_temperature.show(5)

+----------+--------+------------------+-------+-------------+----+-----+---------+
|        dt|avg_temp|avg_temp_uncertnty|   city|      country|year|month|record_id|
+----------+--------+------------------+-------+-------------+----+-----+---------+
|1820-01-01|   2.101|             3.217|Abilene|United States|1820|    1|        0|
|1820-02-01|   6.926|             2.853|Abilene|United States|1820|    2|        1|
|1820-03-01|  10.767|             2.395|Abilene|United States|1820|    3|        2|
|1820-04-01|  17.989|             2.202|Abilene|United States|1820|    4|        3|
|1820-05-01|  21.809|             2.036|Abilene|United States|1820|    5|        4|
+----------+--------+------------------+-------+-------------+----+-----+---------+
only showing top 5 rows



In [131]:
dim_temperature.count()

687289

In [132]:
dim_temperature.select('dt').distinct().count()

3239

In [133]:
dim_temperature.write.mode("overwrite")\
                .parquet(path=output_data + 'dim_temperature')

#if too slow, without partition

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
##### Run Quality Checks (Assuming they are written into the local repositories; the checks could also be done if they are written into S3 after the ETL processes)

In [134]:
# iterate over files in
# that directory
for filename in os.listdir(output_data):
    f = os.path.join(output_data, filename)
    if not filename == ".ipynb_checkpoints" and os.path.isdir(f):
        df = spark.read.parquet(f)
        print("Table: " + f.split('/')[-1])
        df.printSchema()


Table: country_code
root
 |-- code: integer (nullable = true)
 |-- country: string (nullable = true)

Table: visa_code
root
 |-- code: integer (nullable = true)
 |-- visa: string (nullable = true)

Table: port_code
root
 |-- code: string (nullable = true)
 |-- port: string (nullable = true)

Table: state_code
root
 |-- code: string (nullable = true)
 |-- state: string (nullable = true)

Table: dim_demographic_statistics
root
 |-- city: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- avg_household_size: string (nullable = true)
 |-- stat_id: long (nullable = true)
 |-- state_code: string (nullable = true)

Table: dim_airport
root
 |-- iata_code: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- state_code: string (nullable = true)

Table: travel_code
ro

In [135]:
for filename in os.listdir(output_data):
    f = os.path.join(output_data, filename)
    if not filename == ".ipynb_checkpoints" and os.path.isdir(f):
        df = spark.read.parquet(f)
        record_num = df.count()
        if record_num <= 0:
            raise ValueError("This table is empty!")
        else:
            print("Table: " + f.split('/')[-1] + f" is not empty: total {record_num} records.")

Table: country_code is not empty: total 289 records.
Table: visa_code is not empty: total 3 records.
Table: port_code is not empty: total 660 records.
Table: state_code is not empty: total 55 records.
Table: dim_demographic_statistics is not empty: total 596 records.
Table: dim_airport is not empty: total 2018 records.
Table: travel_code is not empty: total 4 records.
Table: dim_immi_personal is not empty: total 3096313 records.
Table: fact_immi is not empty: total 3096313 records.
Table: dim_immi_airline is not empty: total 3096313 records.
Table: dim_temperature is not empty: total 687289 records.
Table: dim_demographic_population is not empty: total 2891 records.


#### Check integrity through anti joins

In [136]:
integrity_airline = fact_immi.select(col("cic_id")).distinct() \
                             .join(dim_immi_airline, fact_immi["cic_id"] == dim_immi_airline["cic_id"], "left_anti") \
                             .count() 

In [137]:
integrity_airports = fact_immi.select(col("port_code")).distinct() \
                                 .join(dim_airport, fact_immi["port_code"] == dim_airport ["iata_code"], "left_anti") \
                                 .count()

In [138]:
integrity_demo_pop = fact_immi.select(col("state_code")).distinct() \
                                 .join(dim_demographic_population , fact_immi["state_code"] == dim_demographic_population["state_code"], "left_anti") \
                                 .count() 

In [139]:
integrity_demo_stat = fact_immi.select(col("state_code")).distinct() \
                                 .join(dim_demographic_statistics  , fact_immi["state_code"] == dim_demographic_statistics["state_code"], "left_anti") \
                                 .count() 

In [140]:
integrity_personal = fact_immi.select(col("cic_id")).distinct() \
                             .join(dim_immi_personal , fact_immi["cic_id"] == dim_immi_personal ["cic_id"], "left_anti") \
                             .count() 

In [141]:
print (integrity_airline, integrity_airports, integrity_demo_pop, integrity_demo_stat, integrity_personal)

0 167 409 409 0


In [142]:
dim_demographic_population_test = dim_demographic_population.select(col("state_code")).distinct()

In [143]:
fact_immi_test = fact_immi.select(col("state_code")).distinct()

In [144]:
fact_immi_test.show(100,False)

+----------+
|state_code|
+----------+
|.N        |
|YH        |
|RG        |
|RF        |
|CI        |
|FT        |
|TC        |
|AZ        |
|SC        |
|IC        |
|FI        |
|UA        |
|PU        |
|EA        |
|NS        |
|KI        |
|RO        |
|SL        |
|PI        |
|LA        |
|YN        |
|NL        |
|MN        |
|BS        |
|FC        |
|BW        |
|11        |
|NK        |
|OI        |
|PS        |
|AM        |
|PL        |
|RE        |
|AA        |
|MK        |
|NJ        |
|DR        |
|RA        |
|73        |
|WC        |
|MX        |
|HW        |
|3         |
|30        |
|JF        |
|F         |
|VL        |
|DC        |
|QL        |
|GL        |
|TV        |
|EE        |
|EX        |
|PD        |
|VG        |
|ZN        |
|CN        |
|4B        |
|S6        |
|OR        |
|JC        |
|FG        |
|RY        |
|UR        |
|UM        |
|TI        |
|AT        |
|85        |
|NW        |
|52        |
|UL        |
|SW        |
|OL        |
|HH        |

In [145]:
fact_immi.filter(fact_immi.state_code=="YH").show()

+-------+----+-----+---------+----------+-----------+--------------+-----------+---------+--------------+-------------+
| cic_id|year|month|port_code|state_code|arrive_date|departure_date|travel_code|visa_code|immigration_id|      country|
+-------+----+-----+---------+----------+-----------+--------------+-----------+---------+--------------+-------------+
|1400855|2016|    4|      DAL|        YH| 2016-04-08|    2016-04-19|          1|        2|   25769807262|United States|
+-------+----+-----+---------+----------+-----------+--------------+-----------+---------+--------------+-------------+



In [146]:
dim_demographic_population_test.show(100,False)

+----------+
|state_code|
+----------+
|AZ        |
|SC        |
|LA        |
|MN        |
|NJ        |
|DC        |
|OR        |
|VA        |
|RI        |
|KY        |
|NH        |
|MI        |
|NV        |
|WI        |
|ID        |
|CA        |
|CT        |
|NE        |
|MT        |
|NC        |
|MD        |
|DE        |
|MO        |
|IL        |
|ME        |
|WA        |
|ND        |
|MS        |
|AL        |
|IN        |
|OH        |
|TN        |
|IA        |
|NM        |
|PA        |
|SD        |
|NY        |
|TX        |
|GA        |
|MA        |
|KS        |
|FL        |
|CO        |
|AK        |
|AR        |
|OK        |
|PR        |
|UT        |
|HI        |
+----------+



##### so it is possible that some state codes from the fact table do not show up in the dimension demographic tables, further action may be required (either filter out or revise the state code, unfortunately I dont have enough knowledge here...), cid id appears to be fine 

#### 4.3 Data dictionary 

<img src="img/metadata_1.png" style="float: left;" alt="" width="1000"/>

<img src="img/metadata_2.png" style="float: left;" alt="" width="1000"/>

#### 4.4 Example analysis
##### The citizen country of the immigrants to US in Apr 2016 (since we ony deal with the raw dataset from this period), top 5 citizen countries by the number of immigrants.  

In [153]:
country_code = spark.read.parquet(os.path.join(output_data, 'country_code'))
state_code = spark.read.parquet(os.path.join(output_data, 'state_code'))

In [155]:
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg

In [158]:
dim_immi_personal.join(country_code, dim_immi_personal['citizen_country'] == country_code['code'], "left") \
                .select(country_code.country)\
                .groupBy('country') \
                .agg({'country':'count'}) \
                .withColumnRenamed('count(country)', 'Countrycount') \
                .sort(desc('Countrycount')) \
                .show(5)

+--------------------+------------+
|             country|Countrycount|
+--------------------+------------+
|      UNITED KINGDOM|      360157|
|               JAPAN|      206873|
|          CHINA, PRC|      191425|
|              FRANCE|      188766|
|MEXICO Air Sea, a...|      175781|
+--------------------+------------+
only showing top 5 rows



##### The destination state in US in Apr 2016 (since we ony deal with the raw dataset from this period), top 5 states by the number of immigrants. 

In [160]:
fact_immi.join(state_code, fact_immi['state_code'] == state_code['code'], "left") \
                .select(state_code.state)\
                .groupBy('state') \
                .agg({'state':'count'}) \
                .withColumnRenamed('count(state)', 'Statecount') \
                .sort(desc('Statecount')) \
                .show(5)

+----------+----------+
|     state|Statecount|
+----------+----------+
|   FLORIDA|    621701|
|  NEW YORK|    553677|
|CALIFORNIA|    470386|
|    HAWAII|    168764|
|     TEXAS|    134321|
+----------+----------+
only showing top 5 rows



##### The distribution of immigrants in US Apr 2016, ordered by the birth year, descending.

In [162]:
dim_immi_personal.groupBy('birth_year') \
                .agg({'birth_year':'count'}) \
                .withColumnRenamed('count(birth_year)', 'BirthYearcount') \
                .sort(desc('BirthYearcount')) \
                .show(5)

+----------+--------------+
|birth_year|BirthYearcount|
+----------+--------------+
|      1986|         71958|
|      1983|         70415|
|      1985|         70409|
|      1982|         70251|
|      1984|         69809|
+----------+--------------+
only showing top 5 rows



In [165]:
dim_immi_personal.join(country_code, dim_immi_personal['citizen_country'] == country_code['code'], "left") \
                .select(dim_immi_personal.gender, country_code.country)\
                .filter((country_code.country=="UNITED KINGDOM") | (country_code.country=="JAPAN") | (country_code.country=="CHINA, PRC")) \
                .groupBy('country', 'gender') \
                .agg({'gender':'count'}) \
                .withColumnRenamed('count(gender)', 'Gendercount') \
                .sort(desc('Gendercount')) \
                .show()

+--------------+------+-----------+
|       country|gender|Gendercount|
+--------------+------+-----------+
|UNITED KINGDOM|     M|     152400|
|UNITED KINGDOM|     F|     131864|
|    CHINA, PRC|     F|      90258|
|         JAPAN|     M|      86481|
|    CHINA, PRC|     M|      84254|
|         JAPAN|     F|      80071|
|    CHINA, PRC|     X|        391|
|    CHINA, PRC|     U|        131|
|         JAPAN|     U|          2|
|UNITED KINGDOM|     U|          2|
|         JAPAN|  null|          0|
|UNITED KINGDOM|  null|          0|
|    CHINA, PRC|  null|          0|
+--------------+------+-----------+



### Step 5: Complete Project Write Up
#### Clearly state the rationale for the choice of tools and technologies for the project.
1. AWS S3 for storage (both source and target)
2. Pandas and Numpy for data exploration and analysis
3. PySpark for large data set data processing to extract, transform and load tables

#### Propose how often the data should be updated and why.
1. Tables created from immigration and temperature dataset should be updated everyone month since the raw data is build based on the month.
2. Tables created from demography data set could be updated annually since it takes time for data collection and insufficient data might lead to inappropriate conclusions compared with those that drawn from more complete dataset.
3. For all tables during the update, it should proceed with "append" mode.

#### Suggestions for following scenarios:
 1. The data was increased by 100x.
   If the data was increased by 100x, and if spark standalone server mode can not process, we could consider to put data in AWS EMR which is a distributed data cluster for processing large data sets on cloud
 
 2. The data populates a dashboard that must be updated on a daily basis by 7am every day.
   This could be helped with AWS airflow. With the argument "schedule_interval" set as '7 0 * * *', the entire process will be executed 7am every day hence the tables could be updated.
   
 3. The database needed to be accessed by 100+ people.
   If the database needed to be accessed by 100+ people, one way is to load the tables into a cloud based database which could support these connections. For example, AWS redshift could handle up to 500 connections, however the cost needs to be considered here since with such connection requirement, the AWS redshift has to be available all the time.
   
#### Other thoughts

1. Temperature dataset is not up to the latest. During the exploration, it is realized that no data is available for year 2016 which is the time for our fact table.

2. During the integrity check, it is found that some of the state names do not appear in the label descriptions. Further actions might be needed here.

3. Port code might not be 100% equivalent to the city code, and it is tried to build a reverse table to link the city to the code in the label descriptions, however many cities does not have the corresponding codes hence for the demographic data, we have to leave with the full city name.