In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import re
from pyspark.sql.functions import udf, expr, first, col
from pyspark.sql.types import *

### Step 1: Scope the Project and Gather Data

#### Scope 
I am going to create a STAR schema for this database. 

*1 fact table* \
*1 i94 immigration dimension table* \
*1 temperature dimension table*
*1 airport code dimension table* \
*1 US cities demographics table* 

Immigration data and temperature data will be aggregated by city. Airport code and US cities demographics data will be aggregated by state. I'm using Spark to build my ETL pipeline and process data into analytic tables. 

My final dataset will help the analytic team draw insights on immigration behaviors. 

#### Describe and Gather Data 
For this project, I pick the 4 following datasets:
* I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the notebook. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from.
* World Temperature Data: This dataset come from Kaggle. [This](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) is where the data comes from.
* U.S. City Demographic Data: This dataset comes from OpenSoft. [This](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) is where the data comes from.
* Airport Code Table: This is a simple table of airport codes and corresponding cities. [This](https://datahub.io/core/airport-codes#data) is where the data comes from.

In [2]:
#Read Immigration data for the month of April, 2016
i94_immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
i94_df = pd.read_sas(i94_immigration, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
i94_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
#Read Temperature data
temperature = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = pd.read_csv(temperature)

In [5]:
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [6]:
# Read Cities Demographics data
demographics = 'us-cities-demographics.csv'
demographics_df = pd.read_csv(demographics, sep=';')

In [7]:
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [8]:
# Read airport code data
airport = 'airport-codes_csv.csv'
airport_df = pd.read_csv(airport)

In [9]:
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [10]:
#Creating Connection to Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()


In [11]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 

In [12]:
# Exploring data sets
    # Immigration Data

i94_df.i94port.unique()

array(['XXX', 'ATL', 'WAS', 'NYC', 'TOR', 'BOS', 'HOU', 'MIA', 'CHI',
       'LOS', 'CLT', 'DEN', 'DAL', 'DET', 'NEW', 'FTL', 'LVG', 'ORL',
       'NOL', 'PIT', 'SFR', 'SPM', 'POO', 'PHI', 'SEA', 'SLC', 'TAM',
       'HAM', 'NAS', 'VCV', 'MAA', 'AUS', 'HHW', 'OGG', 'PHO', 'SDP',
       'SFB', 'EDA', 'MON', 'CLG', 'DUB', 'FMY', 'YGF', 'SAJ', 'CIN',
       'BAL', 'RDU', 'WPB', 'STT', 'OAK', 'NSV', 'SNA', 'OTT', 'X96',
       '5KE', 'CLE', 'HAR', 'PSP', 'CHR', 'HAL', 'SAA', 'KOA', 'SHA',
       'WIN', 'BGM', 'NCA', 'OPF', 'SAI', 'JFA', 'AGA', 'ONT', 'CLM',
       'STL', 'W55', 'CHS', 'SNJ', 'SRQ', 'ANC', 'LNB', 'LIH', 'MIL',
       'INP', 'KAN', 'ROC', 'SAC', 'BRO', 'LAR', 'RNO', 'SGR', 'ELP',
       'MCA', 'MDT', 'SPE', 'FPR', 'SYR', 'ICT', 'MLB', 'ADS', 'TUC',
       'DLR', 'CAE', 'CHA', 'HSV', 'WIL', 'HPN', 'HEF', 'BRG', 'BED',
       'DAB', 'JAC', 'FRB', 'SWF', 'KEY', 'PTK', 'MWH', 'X44', 'MYR',
       'APF', 'ATW', 'PVD', 'BUF', 'PIE', 'MHT', 'BDL', 'NYL', 'VNY',
       '5T6', 'LEX',

In [13]:
#Temperature Data
temp_df.AverageTemperature.unique()

array([ 6.068,    nan,  5.788, ...,  9.202,  6.875,  6.66 ])

In [14]:
#Demographic data
    #Race count
demographics_df.Race.value_counts()

Hispanic or Latino                   596
White                                589
Black or African-American            584
Asian                                583
American Indian and Alaska Native    539
Name: Race, dtype: int64

In [15]:
    #Unique city
demographics_df.City.unique()

array(['Silver Spring', 'Quincy', 'Hoover', 'Rancho Cucamonga', 'Newark',
       'Peoria', 'Avondale', 'West Covina', "O'Fallon", 'High Point',
       'Folsom', 'Philadelphia', 'Wichita', 'Fort Myers', 'Pittsburgh',
       'Laredo', 'Berkeley', 'Santa Clara', 'Allen', 'Hampton',
       'Bolingbrook', 'Frederick', 'Sparks', 'Rancho Cordova',
       'Westminster', 'Lakewood', 'Flint', 'New Haven', 'Brooklyn Park',
       'Chula Vista', 'Danbury', 'Framingham', 'Saint Petersburg',
       'Miami Gardens', 'Salt Lake City', 'Suffolk', 'North Little Rock',
       'Jurupa Valley', 'Los Angeles', 'Flower Mound', 'Vacaville',
       'Clarksville', 'New Britain', 'Tulsa', 'Seattle', 'Mesa', 'Yonkers',
       'Camden', 'Alexandria', 'Jonesboro', 'El Monte', 'Roswell', 'Omaha',
       'Troy', 'Winston-Salem', 'Corpus Christi', 'Atlanta', 'Bryan',
       'Lynchburg', 'Columbia', 'Killeen', 'Nashville', 'Somerville',
       'Bakersfield', 'Montgomery', 'Huntsville', 'Greensboro', 'Turlock',
       '

In [16]:
#Airport code data
    #country counts
airport_df.iso_country.value_counts()

US    22757
BR     4334
CA     2784
AU     1963
KR     1376
MX     1181
RU     1040
DE      947
GB      911
FR      850
AR      848
CO      706
IT      671
PG      593
VE      592
ZA      489
CL      474
ID      470
ES      416
CN      404
KE      372
IN      341
CD      285
PH      282
PL      278
CZ      269
JP      234
NO      228
SE      224
NZ      212
      ...  
DM        2
SM        2
WF        2
KN        2
RE        2
PM        2
LC        2
MF        2
SH        2
BN        2
AD        2
NU        1
GM        1
BL        1
LI        1
NR        1
MQ        1
GI        1
SX        1
MO        1
CC        1
AI        1
AW        1
CX        1
VA        1
CW        1
IO        1
JE        1
YT        1
NF        1
Name: iso_country, Length: 243, dtype: int64

In [17]:
    #airport type counts
airport_df.type.value_counts()

small_airport     33965
heliport          11287
medium_airport     4550
closed             3606
seaplane_base      1016
large_airport       627
balloonport          24
Name: type, dtype: int64

In [18]:
    #State counts
airport_df.iso_region.value_counts()

US-TX     2277
US-CA     1088
US-FL      967
US-PA      918
BR-SP      907
US-IL      902
US-AK      829
US-OH      799
GB-ENG     726
US-IN      697
CA-ON      695
US-NY      668
BR-MT      635
US-WI      624
US-LA      592
US-WA      578
US-MO      578
US-MN      569
US-MI      549
US-OK      537
BR-MS      527
US-GA      522
AU-QLD     511
US-CO      505
US-VA      505
US-OR      492
US-NC      473
CA-BC      467
US-NJ      442
US-KS      439
          ... 
IR-31        1
RO-AB        1
UG-312       1
SI-174       1
SI-188       1
GD-PA        1
EC-A         1
MV-01        1
TL-CO        1
FM-PNI       1
SM-06        1
AE-SH        1
TR-39        1
PW-350       1
AZ-AGS       1
EE-39        1
MV-08        1
MN-073       1
AZ-FUZ       1
NG-ED        1
JP-10        1
TR-53        1
TR-76        1
MH-LIK       1
AZ-XCI       1
BD-1         1
RS-17        1
GE-GU        1
MA-ERR       1
GR-33        1
Name: iso_region, Length: 2810, dtype: int64

#### Cleaning Steps

In [19]:
## Clean Immigration Data
    #Create a valid i94 port file

re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94ports = {}
with open('i94port.txt') as i94:
    for port in i94:
        match = re_obj.search(port)
        i94ports[match[1]] = [match[2]]
        
    #Drop entries where i94port value is invalid such as: XXX,...    
df_i94 =spark.read.format('com.github.saurfang.sas.spark').load(i94_immigration)
df_i94 = df_i94.filter(df_i94.i94port.isin(list(i94ports.keys())))


In [20]:
## Clean temperature Data
    #Drop entries where temperature value is NaN
df_temperature = spark.read.format('csv').option('header', 'true').load(temperature)
df_temperature = df_temperature.filter(df_temperature.AverageTemperature != 'Nan')

    #Drop duplicate city and countries
df_temperature = df_temperature.dropDuplicates(['City', 'Country'])

In [21]:
##Clean us-cities-demographics data
    #Transform value in 'race' row into columns
demographics_df = spark.read.format('csv').option('sep', ';').option('header', 'true').load(demographics)
    
demographic_race_count = (demographics_df.select('city', 'state code', 'Race', 'Count').groupBy(demographics_df.City, 'state code')
                    .pivot('Race').agg(first('Count')))

    #Drop duplicates
demographics_df= demographics_df.dropDuplicates()

    #Drop 'race column' and 'count column' from original dataset
demographics_df = demographics_df.drop('Race', 'Count')

    #join 2 datasets
demographics_final = demographics_df.join(demographic_race_count, ['city', 'state code'])

    #drop state column
demographics_final = demographics_final.drop('State')

demographics_final.show()
                                          

+--------------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+---------------------------------+------+-------------------------+------------------+------+
|                City|State Code|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|American Indian and Alaska Native| Asian|Black or African-American|Hispanic or Latino| White|
+--------------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+---------------------------------+------+-------------------------+------------------+------+
|             Lynwood|        CA|      29.4|          35634|            36371|           72005|               776|       28061|                  4.43|                             null|   994|                     5346|             63377| 48670|
|           Hollywood|  

In [22]:
##Clean airport code data
    #Drop entries where airport type is closed
airport_df = spark.read.format('csv').option('header', 'true').load(airport)
airport_df = airport_df.filter(airport_df.type != 'closed')
    
    #Drop entries where iata_code is null
airport_df = airport_df.filter(airport_df.iata_code != 'null')
    
    #Create a new column for state code
airport_df = airport_df.withColumn('state', expr('substr(iso_region, 4, length(iso_region))'))

    #Drop iso_region column
airport_df = airport_df.drop('iso_region')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
(For explanation of code, please see I94_SAS_Labels_Description.SAS file)

**FACT TABLE**: quantitative data

* i94yr: year
* i94mon: month
* i94 cit: origin city
* i94 port: destination city
* i94 mode: travel code
* i94 visa: reason for immigration
* Average Temperature: teperature
* port_state: state code

**FIRST DIMENSION TABLE**: immigration data

* i94yr: year
* i94mon: month
* i94cit: city
* i94port: destination city
* i94mode: travel code
* i94bir: age of respondents in years
* arrdate: arrival date
* depdate: depature date
* i94visa: reason for immigration

**SECOND DIMENSION TABLE**: temperature data

* i94port: destination city
* average temperature: temperature
* city: city name
* country: country name
* latitude: latitude
* longtitude: longtitude

**THIRD DIMENSION TABLE**: airport code data

* type: type of airport (small, medium,...)
* name: airport name
* continent: name of continent where the airport locates
* iso_country: country name
* state: state code
* municipality: town name
* local_code: local code
* coordinates: coordinates

**FOURTH DIMENSION TABLE**: U.S. City Demographics data

* City: city name
* State: state code
* median_age: average age
* male_population: male population
* female_population: female population
* total_population: total population
* foreign_born: foreign born
* avg_household_size: Average household size
* asian: asian race
* black: black or African American race
* hispanic: hispanic or latino race
* white: white race

#### 3.2 Mapping Out Data Pipelines
Steps necessary to pipeline the data into the chosen data model:

* Import and Install all necessary packages.
* Read 4 dataset files to dataframe
* Explore datasets to identify null, invalid, duplicates data
* Clean dataset as described in step 2
* Create immigration dimension table by:
    1. create a new dataframe df_i94 that include port city and port state
    2. Extracting relevant columns from df_i94
    3. Write to parquet file partitioned by i94port    
* Create temperature dimension table by:
    1. Add i94 port column to df_temperature, correspond port code to city name
    2. Extracting relevant columns from df_temperature
    3. Write to parquet file partitioned by i94port
* Create airport code dimension table by:
    1. Extract relevant columns from airport_df
    2. Write to parquet file partitioned by state
* Create U.S Cities Demographics table by:
    1. Extract relevant columns from demographics_final
    2. Write parquet file partitioned by state
* Create fact table by:
    1. Create 2 temporary SQL tables: immigration_table and temperature_table
    2. Joining immigration_table and temperature_table together
    3. Write parquet file partitioned by port_state and port
    

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [23]:
#Read i94port text file
i94port_df = pd.read_csv('i94port.txt', sep = '=', names=['symbol','port_name'])

#remove whitespaces and single quotes
i94port_df['symbol']=i94port_df['symbol'].str.strip().str.replace("'",'')

#create two columns from i94port string: port_city, and port_addr
i94port_df['port_city'], i94port_df['port_state']=i94port_df['port_name'].str.strip().str.replace("'", '').str.strip().str.split(',',1).str

#Remove more whitespace from port_addr
i94port_df['port_state']=i94port_df['port_state'].str.strip()

#Drop port column
i94port_df.drop(columns=['port_name'], inplace = True)

#Convert pandas dataframe to list
i94port_list=i94port_df.values.tolist()

#Convert list to spark dataframe
i94port_type = StructType([StructField('id', StringType(), True),
                            StructField('port_city', StringType(), True),
                            StructField('port_state', StringType(), True)
                            ])
i94port = spark.createDataFrame(i94port_list, i94port_type)

In [24]:
#Add columns from i94 port file to dataframe df_i94
#def join(df_i94, i94port, df_i94.id == i94port.id, how='left'):
    #df_i94 = df_i94.join(i94port, df_i94.id == i94port.id, how=how)
    #repeated_columns = [c for c in df_i94.columns if c in i94port.colums]
    #for col in repeated_columns:
        #df_i94 = df_i94.drop(i94port[col])
    #return df_i94

#df_i94.show()
    
df_i94 = df_i94.join(i94port, df_i94.i94port == i94port.id, how = 'left')

#Drop 'id' column
df_i94 = df_i94.drop('id')
df_i94.show()

+--------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+---------+----------+
|   cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|port_city|port_state|
+--------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+---------+----------+
| 13351.0|2016.0|   4.0| 116.0| 116.0|    BGM|20545.0|    1.0|     ME|20547.0|  68.0|    1.0|  1.0|20160401|     DBL| null|      G|      O|   null|      M| 1948.0|09302016|     M|  null|    *GA| 9.250494923E10|MABIS|      B1|   BANGOR|        ME|
| 26320.0|20

In [25]:
# Add i94 port column to temperature dataset, correspond port code to city name
@udf
def port_city(city):
    for key in i94ports:
        if city.lower() in i94ports[key][0].lower():
            return key
        
df_temperature = df_temperature.withColumn('i94port', port_city(df_temperature.City))

    #Drop entries that have null values
df_temperature = df_temperature.filter(df_temperature.i94port != 'null')

df_temperature.show()

+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|     City|             Country|Latitude|Longitude|i94port|
+----------+-------------------+-----------------------------+---------+--------------------+--------+---------+-------+
|1852-07-01|             15.488|                        1.395|    Perth|           Australia|  31.35S|  114.97E|    PER|
|1828-01-01|             -1.977|                        2.551|  Seattle|       United States|  47.42N|  121.97W|    SEA|
|1743-11-01|              2.767|                        1.905| Hamilton|              Canada|  42.59N|   80.73W|    HAM|
|1849-01-01|  7.399999999999999|                        2.699|  Ontario|       United States|  34.56N|  116.76W|    ONT|
|1821-11-01|              2.322|                        2.375|  Spokane|       United States|  47.42N|  117.24W|    SPO|
|1843-01-01| 18.874000000000002|

In [26]:
#Extract columns to create immigration dimension table
immigration_fact_table = df_i94.select(['i94yr', 'i94mon', 'i94cit', 'i94port', 'i94mode', 'i94bir', 'arrdate', 'depdate', 'i94visa'])

#Write parquet files partitioned by i94port
immigration_fact_table.write.mode('append').partitionBy('i94port').parquet('/tables/i94_dimension.parquet')

In [27]:
#Extract columns to create temperature dimension table
temperature_table = df_temperature.select('i94port', 'AverageTemperature', 'City', 'Country', 'Latitude', 'Longitude')
temperature_table.write.mode('append').partitionBy('i94port').parquet('/tables/temp_dimension.parquet')

In [28]:
#Extract columns to create airport code dimension table
airport_code_table = airport_df.select('type', 'name', 'continent', 'iso_country', 
                                       'state', 'municipality', 'local_code', 'coordinates')
airport_code_table.write.mode('append').partitionBy('state').parquet('/tables/airport_code.parquet')

In [29]:
#Extract columns to create cities demographics table
demographics_table = demographics_final.select('City', col('State Code').alias('state'), col('Median Age').alias('median_age'),
                                               col('Male Population').alias('male_population'),
                                               col('Female Population').alias('female_population'), 
                                               col('Total Population').alias('total_population'),
                                               col('Foreign-born').alias('foreign_born'),
                                               col('Average Household Size').alias('avg_household_size'),
                                               'Asian', col('Black or African-American').alias('black'),
                                               col('Hispanic or Latino').alias('hispanic'), 'White')
demographics_table.write.mode('append').partitionBy('state').parquet('/tables/demographics.parquet')

In [32]:
# Create temporary SQL tables
df_i94.createOrReplaceTempView('immigration_table')
df_temperature.createOrReplaceTempView('temperature_table')

#Create fact table
immigration_fact_table = spark.sql("""
                                    SELECT i.i94yr AS year,
                                        i.i94cit AS city,
                                        i.i94port AS port,
                                        i.i94mode AS travel_code,
                                        i.i94visa AS immigration_reason,
                                        i.i94mon AS month,
                                        t.AverageTemperature AS temperature,
                                        i.port_state
                                    FROM immigration_table AS i
                                    JOIN temperature_table AS t ON i.i94port = t.i94port    
                                    """)

#Write parquet file partitioned by i94port
immigration_fact_table.write.mode('append').partitionBy('port_state', 'port').parquet('/tables/fact_table.parquet')

#### 4.2 Data Quality Checks

Run Quality Checks
 * Count checks to ensure completeness
 

In [None]:
# Data quality check
def data_check(df,name):
    table = df.count()
    if table == 0:
        print("Quality check failed because table {} contains {} records".format(name, table))
    else:
        print("Quality check passed because table {} contains {} records".format(name, table))
        
data_check(df_i94, "i94 immigration table")
data_check(df_temperature, "temperature table")
data_check(airport_df, "airport code table")
data_check(demographics_final, "demographics table")
    

#### 4.3 Data dictionary 
Column name explanation from:

1. I94 immigration data

* i94yr: 4 digit year
* i94mon: numeric month
* i94cit: 3 digit code of origin city
* i94port: 3 character code of destination city
* i94mode: 1 digit travel code
* i94bir: age of respondent in years
* arrdate: arrival date
* depdate: depature date
* i94visa: reason for immigration

2. Temperature data

* i94port: 3 charater code of destination city
* AverageTemperature: Average temperature
* city: city name
* country: country name
* latitude: latitude
* longtitude: longtitude

3. Airport code data

* type: type of aiport
* name: airport name
* continent: 2 character code of continent
* iso_country: 2 character code of city
* state: 2 character code of state
* municipality: town name
* local_code: local code
* coordinates: coordinates

4. U.S. Cities Demographics

* city: city name
* state: 2 character code of state
* median_age: numeric age
* male_population: numeric male population
* female_population: numeric female population
* total_population: numeric total population
* foreign_born: numeric foreign born
* avg_household_size: numeric average household size
* asian: numeric count of Asian race
* black: numeric count of Black or African-American race
* hispanic: numeric count of Hispanic or Latino race
* white: numeric count of White race

#### Step 5: Complete Project Write Up

* Apache Spark is chosen for this project because it can quickly perform processing task on large dataset. 

* The data should be updated everytime the source data is updated. Other than that, the data should also be updated monthly. 

* Approach plan for possible scenarios:

 * The data was increased by 100x.
     * Data will no longer be stored in local storage. I will store my data in AWS S3. In addtion, I will use AWS EMR to perform my ETL pipeline.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * I will use apache airflow to automate scheduling
 * The database needed to be accessed by 100+ people.
     * I can upload my database to AWS Redshift cluster. There many users can have access to the database, and they will be able to perform data analysis queries. 