# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project will create a data model for immigration data, and will build a data pipeline to perform the ETL process in spark which writes parquet file as output


The project follows the follow steps:
* Step 0: Scope the Project
* Step 1: Gather, Explore and Clean the Data
* Step 2: Define the Data Model
* Step 3: Run ETL to Model the Data
* Step 4: Complete Project Write Up

### Step 0: Scope the Project

#### Scope 
##### - Data will be used: 

- immigration data: immigration_data_sample.csv
    - Contains information about immigration details
    - This data comes from the US National Tourism and Trade Office (https://travel.trade.gov/research/reports/i94/historical/2016.html).
     
     
- i94 description file for immigrasion data: I94_SAS_Labels_Descriptions.sas
     
    - This is a file contains the descriptions and references for the codes being used in the immigration dataset. 


- demographic data: us-cities-demographics.csv
    - Contains information about demographic data based on State-City pairs
    - This data comes from OpenSoft (https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
     


- downloaded dataset: /downloaded/FCDO_Geographical_Names_Index.csv
    - Contains country name and 2-digit code, and other information about the country. This is to complement the i94 description data.
    - This data comes from nationsonline.org (https://www.nationsonline.org/oneworld/country_code_list.htm).

##### - End solution:

A Data Model with a fact table that has immigration information and a series of dimension table with further details


##### - Tools will be used:

 - One time pre-processing: Pandas
 - ETL pipeline: Spark
 - Storage: S3 bucket (this is not in the submitted version since it will take an extremely long time to write and read using S3. The submitted project only use local storage for storing the parquet files)
 


#### Do all imports and installs here

In [8]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
import configparser
from datetime import datetime
import os

### Step 1: Gather, Explore and Clean the Data (This pre-processing step only needs to be ran the first time)
#### Explore and clean the Data
Identify data quality issues, like missing values, duplicate data, etc.

1. Immigration data needs more interpretability. Hence the i94 description file is introduced. In this file, below information will be extracted in this project:
    - port
    - visa
    - mode
    
2. i94 file has limited informaiton about country data. A downloaded file contains 2-digit country name which will complement this.


#### Path explaining:

- root path: resources from course

- /downloaded: resources from online

- /processed data: processed data combining different resources, which can be used in the project

- /parquet files: output parquet data for data warehousing

- /quality check: data needs to check and update
    

In [9]:
#processed data path
ppath='./processed data/'

#### 1.1 Read Description File and Load to CSV

In [10]:
#Read file from SAS file description
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
    
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

In [11]:
i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
#i94addr = code_mapper(f_content, "i94addrl")
i94visa = {'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}

In [12]:
cit_res_df=pd.DataFrame(list(i94cit_res.items()),columns=['Code','Country'])

In [13]:
# Read Code Descriptions from SAS file to Pandas DataFrame
cit_res_df=pd.DataFrame(list(i94cit_res.items()),columns=['Code','Country'])
port_df=pd.DataFrame(list(i94port.items()),columns=['Code','Address'])
mode_df=pd.DataFrame(list(i94mode.items()),columns=['Code','Mode'])
#addr_df=pd.DataFrame(list(i94addr.items()),columns=['Code','State'])
visa_df=pd.DataFrame(list(i94visa.items()),columns=['Code','Visa'])

In [14]:
# Split City and State in Port address
port_df['City']=port_df['Address'].apply(lambda x: x.split(',')[0] if len(x.split(','))==2 else x)
port_df['City']=port_df['City'].apply(lambda x: ' '.join([i.lower().capitalize() for i in x.split()]))
port_df['State']=port_df['Address'].apply(lambda x: x.split(',')[1] if len(x.split(','))==2 else x)

In [15]:
# Output the description to csv file for reference
port_df.to_csv(ppath+'port.csv',index=False)
mode_df.to_csv(ppath+'mode.csv',index=False)
visa_df.to_csv(ppath+'visa.csv',index=False)

#### 1.2 Combine Downloaded Data and Description File for More Informative Country Data

In [16]:
#Read file from downloaded folder
country = pd.read_csv('./downloaded/FCDO_Geographical_Names_Index.csv',encoding='ISO-8859-1')
airport = pd.read_csv('airport-codes_csv.csv')

In [17]:
# Extract continent information from airport data
continent = airport[['continent','iso_country']].drop_duplicates().reset_index(drop=True)

In [18]:
#Merge dataframes of i94, country and continent into one dataframe
cit_res_df['Country_lower']=cit_res_df['Country'].apply(lambda x: x.lower())
country['Country_lower']=country['Name'].apply(lambda x: x.lower())
cit_res_df=cit_res_df.drop('Country',axis=1)
country_df=pd.merge(cit_res_df,country,how='left',on='Country_lower')[['Code','Country','Name','Official name','Citizen names']]
country_df=pd.merge(country_df,continent,how='left',left_on='Country', right_on='iso_country')[['Code','Country','Name','Official name','Citizen names','continent']]

In [19]:
country_df.to_csv(ppath+'country.csv',index=False)

### Step 2: Define the Data Model

##### Fact table:
 - immigration table

##### Dimension tables:
 - demographic table
 - state_race table
 - temperature table
 - port table
 - time table
 - country table
 - mode table
 - visa table
 
#### 2.1 Assign Filenames


##### Files from course resources

In [20]:
imm_file='immigration_data_sample.csv' # a subset of the immigration data
#imm_file='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' # full dataset of the immigration data
demo_file='us-cities-demographics.csv'
temp_file='GlobalLandTemperaturesByCity.csv'

##### Files for pre-processed data

In [21]:
port_file=ppath+'port.csv'
mode_file=ppath+'mode.csv'
visa_file=ppath+'visa.csv'
country_file=ppath+'country.csv'

#### 2.2 Create Spark Session

In [22]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport() \
    .getOrCreate()

#### 2.2 Import Data and Map Out Data Pipelines

##### Import Immigration Data

In [23]:
df_imm = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("mode", "DROPMALFORMED") \
        .load(imm_file)

In [24]:
#df_imm = spark.read \
#        .format("com.github.saurfang.sas.spark") \
#        .option("header", "true") \
#        .option("mode", "DROPMALFORMED") \
#        .load(imm_file)

In [25]:
df_imm.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = tru

In [26]:
# Create a Function - Convert SAS date to datetime format

import pyspark.sql.functions as f
import pyspark.sql.types as t
from datetime import datetime
from datetime import timedelta
from pyspark.sql.functions import lit

def date_add_(date, days):

    # Type check and convert to datetime object
    if type(date) is not datetime:
        date = datetime.strptime('1960-01-01', "%Y-%m-%d")
        
    # Convert non-numeric days to float
    if type(days) is float:
        return date + timedelta(days)
    elif type(days) is not float and days is not None:
        return date + timedelta(float(days))
    
    # Output null value for null value
    return None

In [27]:
# Create udf to convert date fields from sas type to date type
date_add_udf = f.udf(date_add_, t.DateType())
# Add a column contains sas start date
df_imm=df_imm.withColumn("sas_date", lit('1960-01-01'))
# Convert
df_imm=df_imm.withColumn('arrdate_d', date_add_udf(f.to_date('sas_date'), 'arrdate'))
df_imm=df_imm.withColumn('depdate_d', date_add_udf(f.to_date('sas_date'), 'depdate'))

In [28]:
from pyspark.sql.functions import to_date
df_imm=df_imm.withColumn('dtaddto_d',to_date(df_imm['dtaddto'],'mmddyyyy'))

In [29]:
# Create temp table for immigration data
df_imm.createOrReplaceTempView("imm_table")

###### Create immigration table - fact table

In [30]:
# Create immigration table - fact table
immigration=spark.sql('''
    SELECT DISTINCT 
    INT(cicid) as cicid,
    INT(i94cit) AS cit_ctry,
    INT(i94res) AS res_ctry,
    i94port AS port_code,
    INT(i94mode) AS trnps_mode_code,
    i94addr AS address_state,
    arrdate_d AS arrival_date,
    depdate_d AS depart_date,
    i94visa AS visa,
    INT(i94bir) AS age,
    INT(biryear) AS birth_year,
    visapost AS dpmt_visa,
    occup AS occupation,
    dtaddto_d AS visa_expiry_date,
    gender,
    airline,
    visatype AS visa_code
    FROM imm_table
    ''')

In [31]:
immigration.count()

1000

In [32]:
# Explore for partition options
spark.sql('''
    select i94port,i94visa, count(*) from imm_table
    group by i94port,i94visa''').toPandas().describe()

Unnamed: 0,count(1)
count,111.0
mean,9.009009
std,18.819958
min,1.0
25%,1.0
50%,2.0
75%,9.0
max,128.0


###### Create time table

In [33]:
# Create time table - dimension table
time=spark.sql('''
    SELECT DISTINCT
        arrdate_d AS date,
        INT(YEAR(arrdate_d)) AS year,
        INT(MONTH(arrdate_d)) AS month,
        INT(DAY(arrdate_d)) AS day,
        INT(WEEKDAY(arrdate_d)+1) AS weekday
    FROM imm_table
    WHERE arrdate_d is not null
    UNION
        SELECT DISTINCT
        depdate_d AS date,
        INT(YEAR(depdate_d)) AS year,
        INT(MONTH(depdate_d)) AS month,
        INT(DAY(depdate_d)) AS day,
        INT(WEEKDAY(depdate_d)+1) AS weekday
    FROM imm_table
    WHERE depdate_d is not null
        ''')

##### Import Demographic Data

In [34]:
df_demo = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("mode", "DROPMALFORMED") \
        .option("delimiter",";") \
        .load(demo_file)

In [35]:
df_demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [36]:
df_demo.createOrReplaceTempView("demo_table")

###### Create race table (granular level of demographic table) - dimension table

In [37]:
# Create race table (granular level of demographic table) - dimension table
state_race = spark.sql('''
    SELECT DISTINCT
        City AS city,
        State AS state,
        `State Code` as state_code,
        Race AS race,
        int(Count) AS count,
        ROW_NUMBER() OVER (PARTITION BY City, State ORDER BY int(Count) desc) order
    FROM demo_table
    ORDER BY City, State, Count desc
    ''')

In [38]:
# Explore partition options
spark.sql('''
    select city, state, count(*) from demo_table
    group by city, state''').toPandas().describe()

Unnamed: 0,count(1)
count,596.0
mean,4.850671
std,0.491456
min,1.0
25%,5.0
50%,5.0
75%,5.0
max,5.0


In [39]:
state_race.createOrReplaceTempView("race_table")

###### Create demographic table (city-state pair) - dimension table

In [40]:
# Create demographic table (city-state pair) - dimension table
demographic = spark.sql('''
    SELECT DISTINCT
        d.City AS city,
        d.State AS state,
        `State Code` as state_code,
        INT(`Median Age`) AS median_age,
        INT(`Male Population`) AS male_pplt,
        INT(`Female Population`) AS female_pplt,
        INT(`Total Population`) AS total_pplt,
        INT(`Number of Veterans`) AS veteran_pplt,
        INT(`Foreign-born`) AS foreign_born,
        INT(`Average Household Size`) AS average_household_size,
        r.race AS largest_race,
        INT(r.count) AS largest_race_pplt
    FROM demo_table d
    JOIN race_table r on d.City = r.city and d.State = r.state and r.order = 1
    ''')

In [41]:
demographic.limit(5).toPandas()

Unnamed: 0,city,state,state_code,median_age,male_pplt,female_pplt,total_pplt,veteran_pplt,foreign_born,average_household_size,largest_race,largest_race_pplt
0,Sparks,Nevada,NV,36,47780,48318,96098,7315,15690,2,White,78737
1,Allen,Texas,TX,37,51324,46814,98138,3505,19649,3,White,69840
2,Peoria,Illinois,IL,33,56229,62432,118661,6634,7517,2,White,77074
3,Kissimmee,Florida,FL,36,33283,35869,69152,2449,16879,3,White,51123
4,Danbury,Connecticut,CT,37,43435,41227,84662,3752,25675,2,White,55917


In [42]:
demographic.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- median_age: integer (nullable = true)
 |-- male_pplt: integer (nullable = true)
 |-- female_pplt: integer (nullable = true)
 |-- total_pplt: integer (nullable = true)
 |-- veteran_pplt: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- average_household_size: integer (nullable = true)
 |-- largest_race: string (nullable = true)
 |-- largest_race_pplt: integer (nullable = true)



##### Import Temperature Data

In [43]:
df_temp = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("mode", "DROPMALFORMED") \
        .load(temp_file)

In [44]:
df_temp.createOrReplaceTempView("temp_table")

In [45]:
###### Create temperature table - dimension table

In [46]:
# Create temperature table - dimension table
# Only collect data after 2004, and sort by date
temperature=spark.sql('''
    SELECT * FROM (
        SELECT DISTINCT
            dt AS date,
            City AS city,
            Country AS country,
            AverageTemperature as avg_temperature,
            AverageTemperatureUncertainty AS avg_temperature_uncertainty,
            Latitude AS latitude,
            Longitude AS longitude
        FROM temp_table
        WHERE YEAR(dt) between 2004 and 2013
        ) AS a
    ORDER BY date
    ''')

##### Import Other Ref Data for Dimension Tables

In [47]:
# import country ref data
# country table - dimension table
df_country = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("mode", "DROPMALFORMED") \
        .load(country_file)

In [48]:
df_country.createOrReplaceTempView("country_table")

###### Create country table

In [49]:
country=spark.sql('''
    SELECT DISTINCT
        INT(Code) AS code,
        Country AS country,
        Name AS name,
        `Official name` AS official_name,
        `Citizen names` AS citizen_name,
        continent
    FROM country_table''')

In [50]:
country.printSchema()

root
 |-- code: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
 |-- official_name: string (nullable = true)
 |-- citizen_name: string (nullable = true)
 |-- continent: string (nullable = true)



###### Create mode table

In [51]:
# import mode ref data
# mode table - dimension table
df_mode = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("mode", "DROPMALFORMED") \
        .load(mode_file)

In [52]:
df_mode.createOrReplaceTempView("mode_table")

In [53]:
mode=spark.sql('''
    SELECT 
        INT(Code) AS code,
        Mode AS mode
    FROM mode_table''')

In [54]:
mode.printSchema()

root
 |-- code: integer (nullable = true)
 |-- mode: string (nullable = true)



###### Create visa table

In [55]:
# import visa ref data
# visa table - dimension table
df_visa = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("mode", "DROPMALFORMED") \
        .load(visa_file)

In [56]:
df_visa.createOrReplaceTempView("visa_table")

In [57]:
visa=spark.sql('''
    SELECT 
        INT(Code) AS code,
        Visa AS visa
    FROM visa_table''')

In [58]:
visa.printSchema()

root
 |-- code: integer (nullable = true)
 |-- visa: string (nullable = true)



###### Create port table

In [59]:
# import port ref data
# port table - dimension table
df_port = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("mode", "DROPMALFORMED") \
        .load(port_file)

In [60]:
df_port.createOrReplaceTempView("port_table")

In [61]:
port=spark.sql('''
    SELECT 
        Code AS code,
        Address AS address,
        City AS city,
        State AS state
    FROM port_table''')

In [62]:
port.printSchema()

root
 |-- code: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)



### Step 3: Data Pipeline

#### 3.1 Write parquet file

In [63]:
output_data = "parquet files"

In [64]:
imm_data= os.path.join(output_data, "immigration")
#immigration.write.parquet(imm_data, mode="overwrite")
immigration.write.partitionBy("port_code","visa_code").parquet(imm_data, mode="overwrite")

In [65]:
demo_data= os.path.join(output_data, "demographic")
demographic.write.partitionBy("city","state").parquet(demo_data, mode="overwrite")

In [66]:
time_data= os.path.join(output_data, "time")
time.write.partitionBy("year").parquet(time_data, mode="overwrite")

In [67]:
race_data= os.path.join(output_data, "state_race")
state_race.write.partitionBy("city","state").parquet(race_data, mode="overwrite")

In [68]:
temperature_data= os.path.join(output_data, "temperature")
temperature.write.partitionBy("date").parquet(temperature_data, mode="overwrite")

In [69]:
country_data= os.path.join(output_data, "country")
country.write.parquet(country_data, mode="overwrite")

In [70]:
port_data= os.path.join(output_data, "port")
port.write.parquet(port_data, mode="overwrite")

In [71]:
mode_data= os.path.join(output_data, "mode")
mode.write.parquet(mode_data, mode="overwrite")

In [72]:
visa_data= os.path.join(output_data, "visa")
visa.write.parquet(visa_data, mode="overwrite")

#### 3.2 Data Quality Checks

Will check:
1. Empty tables
2. Missing values
3. Duplicated keys

In [73]:
# Assign table names
table_name = ['immigration','demographic','state_race','time','mode','visa','port','country','temperature']

##### 3.2.1 Check empty tables

In [74]:
for table in table_name:
    df_table=spark.read.parquet(output_data+"/"+table)
    df_table.createOrReplaceTempView(table)
    count_rows = spark.sql('''
        SELECT count(*) FROM {}'''.format(table))
    if count_rows.collect()[0][0] == 0:
        raise ValueError("Data quality check failed. {} returned no results").format(table)

##### 3.2.2 Check missing values

In [75]:
qpath="./quality check/"

In [76]:
# Checking no-matched country code in immigration data
missing_country = spark.sql('''
    SELECT i.country_code
    FROM
        (SELECT cit_ctry AS country_code
        FROM immigration
        UNION
        SELECT res_ctry AS country_code
        FROM immigration) i
    LEFT JOIN country c on c.code = i.country_code
    WHERE c.code IS NULL
    ORDER BY c.code
''')

if missing_country.count() > 0:
    missing_country.toPandas().to_csv(qpath+'missing_country_code.csv',index=False)
    print("Attention required! {count} code do not have matched {name} information. Please update the reference data for {name} table." \
          .format(count=missing_country.count(),name="country"))

Attention required! 8 code do not have matched country information. Please update the reference data for country table.


In [77]:
# Checking no-matched port code in immigration data
missing_port = spark.sql('''
    SELECT i.port_code
    FROM immigration i
    LEFT JOIN port p on p.code = i.port_code
    WHERE p.code IS NULL
    ORDER BY p.code
''')

if missing_port.count() > 0:
    missing_port.toPandas().to_csv(qpath+'missing_{}_code.csv'.format("port"),index=False)
    print("Attention required! {count} code do not have matched {name} information. Please update the reference data for {name} table." \
          .format(count=missing_port.count(),name="port"))

In [78]:
# Checking no-matched State in immigration data
missing_state = spark.sql('''
    SELECT DISTINCT
        i.address_state
    FROM immigration i
    LEFT JOIN demographic d on d.state_code = i.address_state
    WHERE d.state_code IS NULL and i.address_state is not null
    --ORDER BY d.state_code
''')

if missing_state.count() > 0:
    missing_state.toPandas().to_csv(qpath+'missing_{}_code.csv'.format("state"),index=False)
    print("Attention required! {count} code do not have matched {name} information. Please update the reference data for {name} table." \
          .format(count=missing_state.count(),name="state"))

Attention required! 7 code do not have matched state information. Please update the reference data for state table.


##### 3.2.3 Checking duplicates

In [79]:
# Checking if state code and state are one-to-one relationship in demo table
dup_state = spark.sql('''
    SELECT state_code, count(distinct state)
    FROM demographic
    group by state_code
    having count(distinct state) >1
''')

if dup_state.count() > 0:
    dup_state.toPandas().to_csv(qpath+'duplicated_{}.csv'.format("state"),index=False)
    print("Attention required! {count} code have duplicated {name} information. Please update the reference data for {name} table." \
          .format(count=dup_state.count(),name="state"))
    
dup_state_code = spark.sql('''
    SELECT state, count(distinct state_code)
    FROM demographic
    group by state
    having count(distinct state_code) >1
''')

if dup_state_code.count() > 0:
    dup_state_code.toPandas().to_csv(qpath+'duplicated_{}.csv'.format("state_code"),index=False)
    print("Attention required! {count} code have duplicated {name} information. Please update the reference data for {name} table." \
          .format(count=dup_state_code.count(),name="state_code"))

In [80]:
# Checking duplicated city-state pair in demo table
dup_demo = spark.sql('''
    SELECT city, state
    FROM demographic
    group by city, state
    having count(*) >1
''')

if dup_demo.count() > 0:
    dup_demo.toPandas().to_csv(qpath+'duplicated_{}.csv'.format("city_state"),index=False)
    print("Attention required! {count} code have duplicated {name} information. Please update the reference data for {name} table." \
          .format(count=dup_demo.count(),name="city_state"))

#### 3.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Fact table:
##### immigration
 - cicid: primary key, unique ID for each visiting record
 - cit_ctry: the country code the visitor's/immigrant's citizenship belongs to
 - res_ctry: the country code the visitor's/immigrant's residence belongs to
 - trnps_mode_code: the transport mode code for the visitor/immigrant coming to U.S.
 - address_state: visiting State
 - arrival_date: date of arrival at U.S.
 - depart_date: date of departure from U.S.
 - age: visitor's age
 - birth_year: visitor's birth year
 - dpmt_visa: department issued the visa
 - occupation: visitor/immigrant's occupation in U.S.
 - visa_expiry_date: visa's expiry date
 - gender: visitor's/immigrant's gender
 - airline: the airline company the visitor/immigrant took to U.S.
 - port_code: port information of landing
 - visa_type: type of visa was legally admitted for visiting U.S.

#### Dimension table:
##### demographic
- city
- state_code: 2-letter State code
- state: state full name
- median_age: median age within the city in the state
- male_pplt: male population
- female_pplt: female population
- total_pplt: total population
- veteran_pplt: the number of veteran
- foreign_born: the number of foreign born
- average_household_size: average house hold size in the city in the state
- largest_race: largest race in the city in the state
- largest_race_pplt: the population of the largest race in the city in the state
        
##### state_race: list all the races in each city in each state
- city
- state_code
- state
- race
- count: population for the specific race
- order: the descending order (from the largest to smallest) of the population for the specific race in the city in the state

##### temperature: list the daily temperature for each city in each state for year 2004 ~ 2013
- date
- city
- country
- avg_temperature
- avg_temperature_uncertainty
- latitude
- longitude

##### time
- date
- month
- day
- weekday
- year

##### country
- code: i94 country code, which is the code used in the immigration data
- country: 2-letter country code
- name: country name
- official_name: the full name of the country
- citizen_name: the citizen's name for that country
- continent

##### port
- code: i94 port code, which is the code used in the immigration data
- address: address contains city and state in the sas description file
- city: parsed city
- state: parsed state

##### mode: transportation code
- code: i94 transportation code being used in the immigration data
- mode

##### visa
- code: i94 visa code being used in the immigration data
- visa


#### 3.4 Improvements needed:

- Temperature table: has only city and country, but not state information. There are duplicated city names but within different states in U.S, and in fact, we have same city names but different state names in our dataset. In order to have better and more accurate analysis, state information needs to be added into the temperature table
- Also, the temperature table needs to be updated for a couple of more years' data. The year as of this project right now is 2021, and we are missing almost 8 years data. The immigration data is based on 2016, so there was a big off as well.

In [81]:
# Look up data in above tables
spark.sql('''select * from temperature''').limit(5).toPandas()

Unnamed: 0,city,country,avg_temperature,avg_temperature_uncertainty,latitude,longitude,date
0,Beawar,India,19.804,0.513,26.52N,73.43E,2013-02-01
1,Catania,Italy,7.625999999999999,0.379,37.78N,14.24E,2013-02-01
2,Hanover,Germany,-0.123,0.274,52.24N,10.51E,2013-02-01
3,Liancheng,China,18.04,0.603,21.70N,109.90E,2013-02-01
4,Liverpool,United Kingdom,4.6610000000000005,0.159,53.84N,4.09W,2013-02-01


#### Step 4: Complete Project Write Up
##### 1. Clearly state the rationale for the choice of tools and technologies for the project, and propose how often the data should be updated and why.

In this project, we mainly focus on the immigration data and related details around the immigration data. Some of data should be updated regularly, such as immigration data and demographic data as they are always dynamic. Some of the data can be considered as hard-coded data, such as the related i94 sas description data, which are references explaining some of the fields in immigration data. The reference data does not usually update very frequently, but should be monitored in a regular base so that any no-matched records can be detected and updated accordingly in a time manner.

The retional of the data model design is to use python Pandas to pre-process the reference raw data and save them as hard-coded data/table, and we will maintain them as is moving forward. On the other hand, we use spark to load the dynamic data/table such as immigration and demographic data as well as the pre-processed the hard-coded reference data to the data model, which is so called, building the ETL pipeline. This ETL process will be performed regularly to keep the data warehouse almost always updated.


##### 2. Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - Re-design the partition for balanced seperation of the data and quicker retrieving
     - Only update new records for immigration data instead of updating the entire table every time
     - Design the data pipeline to include cloud computing, such as Redshift or S3 bucket, and use a suitable configuration for quicker uploads and downloads
     
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - Embed the data pipeline on Airflow to schedule the update of both the database and the dashboard
     - Make sure no other big uploads/downloads for the machine at the same time to ensure quicker udpate and update successfully
     
 * The database needed to be accessed by 100+ people.
     - Uplaod the parquet data to private S3 bucket and grant access to the people who need to access the data

#### Step 5. Analysis

There are two separate files for further analysis based on this data model

- Analysis in Pandas: Exploratory Analysis using Pandas, which was analyzing a subset of the dataset
- Analysis in SQL: Exploratory Analysis using Spark SQL, which was analyzing the full dataset of the immigration data. This file has fewer analysis.