# Immigration Data Analytics based on I-94 Forms For Year 2016

#### Project Summary
This project creates data lake for Immigration to USA during the year 2016.  Currently data
is residing on S3.  ETL pipeline is designed to extract data from S3, clean up/process using Spark,
and loading back to S3 as set of tables.  

In [8]:
import pandas as pd
import os
import configparser
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from datetime import datetime, timedelta, date
from pyspark.sql import types as T
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear

In [9]:
# AWS credentials from configure file
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']     = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']
    
# Input from local system OR S3
DATA_LOCAL = False

if DATA_LOCAL:
    input_datapath_i94                      = config['LOCAL_DATAPATH']['INPUT_DATA_I94']
    input_datapath_airportcode              = config['LOCAL_DATAPATH']['INPUT_DATA_AIRPORT_CODE']
    input_datapath_citcode                  = config['LOCAL_DATAPATH']['INPUT_DATA_CIT_CODE']
    input_datapath_iatacode                 = config['LOCAL_DATAPATH']['INPUT_DATA_IATA_CITY']
    input_datapath_us_city_demographies     = config['LOCAL_DATAPATH']['INPUT_DATA_US_CITY']
    output_datapath                         = config['LOCAL_DATAPATH']['OUTPUT_DATA']
else:
    input_datapath_i94                      = config['S3_DATAPATH']['INPUT_DATA_I94']
    input_datapath_airportcode              = config['S3_DATAPATH']['INPUT_DATA_AIRPORT_CODE']
    input_datapath_citcode                  = config['S3_DATAPATH']['INPUT_DATA_CIT_CODE']
    input_datapath_iatacode                 = config['S3_DATAPATH']['INPUT_DATA_IATA_CITY']
    input_datapath_us_city_demographies     = config['S3_DATAPATH']['INPUT_DATA_US_CITY']
    output_datapath                         = config['S3_DATAPATH']['OUTPUT_DATA']

In [10]:
# Spark session
spark = SparkSession.builder\
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
                    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")\
                    .config("spark.hadoop.fs.s3a.access.key", os.environ["AWS_ACCESS_KEY_ID"])\
                    .config("spark.hadoop.fs.s3a.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"])\
                    .enableHiveSupport().getOrCreate()

sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])

print(spark)

<pyspark.sql.session.SparkSession object at 0x7f9b401fae48>


### Step 1: Scope the Project and Gather Data

#### Scope 
These tables can be leveraged by BI and Analytics team to find the US immigration 
patterns for different US cities (airports), explore us city demographics corelation with immigrant arrival for the city.
This project will make it possible to find insights that simply is not possible from given raw data. 
E.g. Immigration visa types per various US City airports.

#### Describe and Gather Data 
    (1) Immigration data: SAS file for each month of 2016. 
        This data comes from US National Tourism and Trade Office, and the original
        source: https://travel.trade.gov/research/reports/i94/historical/2016.html 
    
    (2) US City Demographic Data: CSV file with information on US City demographics:
        population -male, female, median income etc. 
        Source: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
     
    (3) Airport Code: CSV file with table of airport codes and corresponding cities
        source: https://datahub.io/core/airport-codes#data
        
    (4) IATA code: CSV file, lookup to get IATA code to airport name, city, state
        Source: https://www.airportcodes.us/us-airports.htm
        
    (5) Country Code: CSV file that's look up for Country Name based on I94CIT 3 digit value number
        Source: I94_SAS_Lables_Descriptions.SAS.
 
 Please see below for the example of this data.

### Step 2: Explore and Assess the Data

##### Identify data quality issues & Basic clean up the data. 
There were several missing values, they replaced by 'NA' or appropriate values and data types. 
Duplicate records were removed.
Airport codes (iata code) is key element and regular expression was used to filter out invalid codes.

#### Immigration Dataset

In [11]:
# To covert arrival and departure date from df_i94 double to date!
#Reference: https://knowledge.udacity.com/questions/66798

def convert_datetime(x):
    try:
        if x == 'null':
            x = 0
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

In [12]:
print(input_datapath_i94)
df_i94 = spark.read.parquet(input_datapath_i94)

s3a://udacity-project-immigration2016-parquet-files/*/*.parquet


In [13]:
df_i94 = df_i94.withColumn('arrdate', udf_datetime_from_sas(df_i94.arrdate))
df_i94 = df_i94.withColumn('depdate', udf_datetime_from_sas(df_i94.depdate))
df_i94.dropna()
df_i94.printSchema()
df_i94.show(3)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)

In [14]:
# cleaning up immigration data replace missing values with NA or 0!
df_i94 = df_i94.withColumn('i94port', trim(col('i94port')))
df_i94 = df_i94.na.fill({\
         'i94yr':      0.0,\
         'i94addr':   'NA',\
         'depdate':   'NA',\
         'i94bir':    'NA',\
         'i94visa':    0.0,\
         'count':      0.0,\
         'dtadfile':  'NA',\
         'visapost':  'NA',\
         'occup':     'NA',\
         'entdepa':   'NA',\
         'entdepd':   'NA',\
         'entdepu':   'NA',\
         'matflag':   'NA',\
         'biryear':   'NA',\
         'dtaddto':   'NA',\
         'gender':    'NA',\
         'insnum':    'NA',\
         'airline':   'NA',\
         'admnum':     0.0,\
         'fltno':     'NA',\
         'visatype':  'NA'\
         })
df_i94.show(5)

+---------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|    cicid| i94yr|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+---------+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|5680949.0|2016.0| 117.0| 117.0|    NYC|2016-07-24|    1.0|     NY|      null|  30.0|    3.0|  1.0|20160724|     NPL|   NA|      G|     NA|     NA|     NA| 1986.0|     D/S|     F|    NA|     IG|2.947450085E9| 3940|      F1|
|5680950.0|2016.0| 245.0| 245.0|    DET|2016-07-24|    1.0|     IL|2016-08-13|  46.0|    2.0|  1.0|20160

#### Airport Information - Name, Type, IATA Code of airport, Country, Region & Municipality of airport location


In [15]:
df_airports = spark.read.format('csv').options(header='true').load('s3a://udacity-project-csv-files/airport-codes.csv')
df_airports.dropna()
df_airports.printSchema()
df_airports.show(3)

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|    

In [16]:
# airport table data clean up
# since iata_code is very important, drop if no iata_code = null
expr = r'^[A-Z]'
df_airports = df_airports.filter(df_airports.iata_code.isNotNull())
df_airports = df_airports.filter(df_airports['iata_code'].rlike(expr))
df_airports.show(3)

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|  03N|small_airport|      Utirik Airport|           4|       OC|         MH|    MH-UTI|Utirik Island|    K03N|      UTK|       03N|  169.852005, 11.222|
| 07FA|small_airport|Ocean Reef Club A...|           8|       NA|         US|     US-FL|    Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|
|  0AK|small_airport|Pilot Station Air...|         305|       NA|         US|     US-AK|Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|
+-----+-------------+--------------------+------------+---------+-----------

#### US City Demographics

In [17]:
df_city = spark.read.format('csv').options(header='true',delimiter=';').load('s3a://udacity-project-csv-files/us-cities-demographics.csv')
df_city.dropna()
df_city.printSchema()
df_city.show(3)

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+---------------

In [18]:
# Clean up df_city - removing row where city is null
df_city = df_city.filter(df_city.City.isNotNull())
df_city.show(3)

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|              Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+------------------+-----+
|Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|Hispanic or Latino|25924|
|       Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|             White|58723|
|       Hoover|      Alabama|      38.5|          38040|            46799|           

#####  I94CIT Code Country Lookup

In [19]:
df_i94cit = spark.read.format('csv').options(header='true').load('s3a://udacity-project-csv-files/country_code.csv')
df_i94cit = df_i94cit.filter(df_i94cit.I94CIT.isNotNull())
df_i94cit.dropna()
df_i94cit.printSchema()
df_i94cit.show(3)

root
 |-- I94CIT: string (nullable = true)
 |-- Country: string (nullable = true)

+------+-----------+
|I94CIT|    Country|
+------+-----------+
|   582|     MEXICO|
|   236|AFGHANISTAN|
|   101|    ALBANIA|
+------+-----------+
only showing top 3 rows



##### US City Airport IATA Code

In [20]:
df_iata = spark.read.format('csv').options(header='true').load('s3a://udacity-project-csv-files/iata_code_city.csv')
df_iata = df_iata.filter(df_iata.Code.isNotNull())
df_iata.dropna()
df_iata.printSchema()
df_iata.show(3)

root
 |-- Code: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)

+----+--------------------+-------------+-----+
|Code|                Name|         City|State|
+----+--------------------+-------------+-----+
| 0AK|Pilot Station Air...|Pilot Station|   AK|
| 16A| Nunapitchuk Airport|  Nunapitchuk|   AK|
| 1G4|Grand Canyon West...|Peach Springs|   AZ|
+----+--------------------+-------------+-----+
only showing top 3 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Data is modeled with star schema.  It offers efficient way to organize the raw data and supports the intended queries.  
"immigration_table" is a fact table with most relevant busienss data and "immigrant_table","airport_table", "city_table" are supporting dimension tables.

![ERD](Immigration-ERD.jpg)

#### 3.2 Mapping Out Data Pipelines

Immigration and airport Staging tables were created to get value from data sources and load in to tables appropriately.

Look up tables were created for linking IATA code (airport code) to add to the city table. 
Immigration table had only numeric code for the country, country code table was created based on I94 description to find the
country of origin for a immigrant table.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Create Immigration Table

In [21]:
df_i94.createOrReplaceTempView("staging_immigration_table")
immigration_table = spark.sql("""SELECT 
                                    cast(cicid AS INT)     AS cicid,
                                    cast(i94cit AS INT)    AS country_code,
                                    i94port                AS iata_code,
                                    i94mode                AS i94model,
                                    i94addr                AS State,
                                    arrdate                AS arrival_date,
                                    depdate                AS departure_date,
                                    i94visa                AS visa,
                                    visatype               AS visa_type,
                                    airline                AS airline,
                                    fltno                  AS flight_number,
                                    gender                 AS gender,
                                    cast(biryear AS INT)   AS birth_year,
                                    occup                  AS occupation,
                                    insnum                 AS ins_number,
                                    cast(admnum AS FLOAT)  AS admission_number
                                 FROM staging_immigration_table ORDER BY cicid
                             """).dropDuplicates()
immigration_table.printSchema()
immigration_table.show(5)

root
 |-- cicid: integer (nullable = true)
 |-- country_code: integer (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- i94model: double (nullable = true)
 |-- State: string (nullable = false)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- visa: double (nullable = false)
 |-- visa_type: string (nullable = false)
 |-- airline: string (nullable = false)
 |-- flight_number: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- birth_year: integer (nullable = true)
 |-- occupation: string (nullable = false)
 |-- ins_number: string (nullable = false)
 |-- admission_number: float (nullable = false)

+-----+------------+---------+--------+-----+------------+--------------+----+---------+-------+-------------+------+----------+----------+----------+----------------+
|cicid|country_code|iata_code|i94model|State|arrival_date|departure_date|visa|visa_type|airline|flight_number|gender|birth_year|occupation|ins_number|admissi

##### Create Airport Table

In [22]:
df_airports.createOrReplaceTempView("staging_airport_table")
airport_table = spark.sql("""SELECT 
                                    name         AS airport_name,
                                    iso_region   AS region,
                                    municipality AS muncipality,
                                    iata_code    AS iata_code,
                                    SPLIT(coordinates,',')[0]  AS coordinate_x,
                                    SPLIT(coordinates,',')[1]  AS coordinate_y
                             FROM staging_airport_table ORDER BY region
                            """).dropDuplicates()
airport_table.printSchema()
airport_table.show(5)
#airport_table = airport_table.filter(airport_table.iata_code.isNotNull()).show(5)

root
 |-- airport_name: string (nullable = true)
 |-- region: string (nullable = true)
 |-- muncipality: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- coordinate_x: string (nullable = true)
 |-- coordinate_y: string (nullable = true)

+--------------------+------+----------------+---------+------------------+-------------------+
|        airport_name|region|     muncipality|iata_code|      coordinate_x|       coordinate_y|
+--------------------+------+----------------+---------+------------------+-------------------+
|Andorra la Vella ...| AD-07|Andorra La Vella|      ALV|          1.533551|          42.511174|
|Yas Island Seapla...| AE-AZ|       Abu Dhabi|      AYM|           54.6103|             24.467|
|Abu Dhabi Interna...| AE-AZ|       Abu Dhabi|      AUH|54.651100158691406| 24.433000564575195|
|      Bateen Airport| AE-AZ|            null|      AZI|54.458099365234375| 24.428300857543945|
|Al Ain Internatio...| AE-AZ|          Al Ain|      AAN| 55.60919952

##### Create City Demographics Table, add Airport Code for IATA lookup table

In [23]:
df_city.createOrReplaceTempView("staging_city")
df_iata.createOrReplaceTempView("staging_iata")
city_table = spark.sql("""SELECT 
                             sc.City               AS city,
                             sc.State              AS state,
                             sc.`State Code`       AS state_abbr,
                             si.Code               AS iata_code,
                             si.Name               AS airport_name,
                             sc.`Median Age`       AS median_age,
                             sc.`Total Population` AS total_population,
                             sc.`Foreign-born`     AS foreign_born
                       FROM staging_city AS sc
                       JOIN staging_iata AS si ON sc.City = si.City 
                       ORDER BY city
                       """).dropDuplicates()
city_table.printSchema()
city_table.show(5)

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_abbr: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- total_population: string (nullable = true)
 |-- foreign_born: string (nullable = true)

+-------+--------+----------+---------+--------------------+----------+----------------+------------+
|   city|   state|state_abbr|iata_code|        airport_name|median_age|total_population|foreign_born|
+-------+--------+----------+---------+--------------------+----------+----------------+------------+
|Abilene|   Texas|        TX|      ABI|Abilene Regional ...|      31.3|          125876|        8129|
| Albany| Georgia|        GA|      ALB|Albany Internatio...|      33.3|           71109|         861|
| Albany| Georgia|        GA|      ABY|Southwest Georgia...|      33.3|           71109|         861|
| Albany|New York|        NY|      ALB|Albany In

##### Immigrant Table (Immigration information and add name of country of origin based on i94cit value)

In [24]:
immigration_table.createOrReplaceTempView("staging_immigration")
df_i94cit.createOrReplaceTempView("staging_cit")
immigrant_table = spark.sql("""SELECT 
                                    it.cicid,
                                    cit.Country  AS country_of_origin,
                                    it.iata_code AS port_of_entry,
                                    it.arrival_date,
                                    it.visa,  
                                    CASE it.visa
                                        WHEN '1.0' THEN 'Business'
                                        WHEN '2.0' THEN 'Pleasure'
                                        WHEN '3.0' THEN 'Student'
                                        ELSE 'Unknown'
                                    END AS visa_for,
                                    it.visa_type,             
                                    it.gender,   
                                    it.birth_year,
                                    it.occupation
                                 FROM staging_immigration AS it
                                 JOIN staging_cit         AS cit ON (it.country_code = cit.i94CIT)
                                 ORDER BY cicid
                             """).dropDuplicates()
immigrant_table.dropna()
immigrant_table.printSchema()
immigrant_table.show(5)

root
 |-- cicid: integer (nullable = true)
 |-- country_of_origin: string (nullable = true)
 |-- port_of_entry: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- visa: double (nullable = false)
 |-- visa_for: string (nullable = false)
 |-- visa_type: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- birth_year: integer (nullable = true)
 |-- occupation: string (nullable = false)

+-----+-----------------+-------------+------------+----+--------+---------+------+----------+----------+
|cicid|country_of_origin|port_of_entry|arrival_date|visa|visa_for|visa_type|gender|birth_year|occupation|
+-----+-----------------+-------------+------------+----+--------+---------+------+----------+----------+
|    2|   CZECH REPUBLIC|          NYC|  2016-07-01| 2.0|Pleasure|       WT|     F|      1971|        NA|
|    2|          ALBANIA|          ATL|  2016-02-14| 3.0| Student|       F1|     F|      1995|        NA|
|    2|            INDIA|          XXX|  2016

In [None]:
# Parquet file writes
immigrantParquetPath = "{}{}".format('s3a://udacity-project-output/', 'immigrant.parquet')
airportParquetPath   = "{}{}".format('s3a://udacity-project-output/', 'airport.parquet')                                     
cityParquetPath      = "{}{}".format('s3a://udacity-project-output/', 'city.parquet')                                     

immigrant_table.write.mode('overwrite').partitionBy('country_of_origin').parquet(immigrantParquetPath) 
airport_table.write.mode('overwrite').partitionBy('iata_code').parquet(airportParquetPath)           
city_table.write.mode('overwrite').partitionBy('city').parquet(cityParquetPath)                                                        

#### 4.2 Data Quality Checks
Data quality checks  to ensure the pipeline ran as expected.
Many of the Quality Checks were perfomed earlier to replace missing values, removing duplicates etc.
Airport data frame was checked using regular expression to filter out invalid codes.
Following is just very naive basic checks.

In [29]:
# Perform quality checks here

#Immigration primary cicid is not null, following count shall be Zero!
immigration_table.createOrReplaceTempView("staging_immigration")
cicid_count_check = spark.sql("""SELECT COUNT(*) FROM staging_immigration WHERE cicid IS NULL""")
cicid_count_check.show(1)

airport_code_validation = spark.sql("""SELECT COUNT(*) FROM staging_immigration WHERE iata_code = 0""")
airport_code_validation.show(1)

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|       0|
+--------+



#### 4.3 Data dictionary 

    (1) Immigration Table - Created from the I94 SAS files.
        Fields:
    
         cicid: integer - unique ID for each immigrant used as PK for this table
         country_code: integer - I94CIT field from I94, country immigrating from e.g. 582 -> Mexico
         iata_code: string - Airport code e.g. BOS -> Boston, NYC - New York City
         i94model: double - mode of entry e.g. 1-> Air, 2-> Sea, 3-> Land
         State: string - Standard US state e.g. CA -> California, NY -> New York
         arrival_date: date - data of arrival YYYY-MM-DD
         departure_date: date - date of departure YYYY-MM-DD
         visa: double- visa code, e.g. 1 = Business, 2 = Pleasure
         visa_type: string visa type per visa code e.g. Business, Student
         airline: string - Airline used to arrive in US
         flight_number: string - Flight Number of airline used to arrive in US
         gender: string - Non immigrant sex e.g. M, F
         birth_year: integer - 4 digit year of birth
         occupation: string - occupation that will be performed in US
         ins_number: string - INS number
         admission_number: float - Admission number
         
    (2) Airport Table - Created from airport code csv file
        Fields:
        
        airport_name: string e.g. Abu Dhabi International Airport
        region: string - Region of the airport e.g AE-AZ
        muncipality: string - e.g. Abu Dhabi
        iata_code: string - 3-digit Airport code, e.g. ALV
        coordinate_x: string - can be used to find location airport
        coordinate_y: string  - can be used to find location airport
        
    (3) City Table - created from US city demographics csv file
        Fields:
          
         city: string - City for the airport e.g. San Francisco 
         state: string - State e.g California
         state_abbr: string - State abbr. e.g CA
         iata_code: string - 3-digit Airport code e.g. SFO
         airport_name: string e.g. San Francisco International Airport
         median_age: string - median age for the population in that city
         total_population: string - total population for this city
         foreign_born: string - how many are foreign born in that city
         
    (4) Immigrant Table - created from immigration table and looking up country of origin based on country code (I94CIT)
        Fields:
        
         cicid: integer - unique ID for immigrant based on I94 form
         country_of_origin: string - Immigrating from which country
         port_of_entry: string - what airport (city) as entry point e.g. NYC, ATL (iata code)
         arrival_date: date - date of arrival YYYY-MM-DD 
         visa: double - visa type e.g. 1.0, 2.0, 3.0
         visa_for: string - visa type description - Pleasure, Student, Business
         visa_type: string - category of visa type e.g. F1, B2
         gender: string - sex M, F, NA
         birth_year: integer - 4 digit birth year YYYY to calculate age
         occupation: string - occupation, mostly null as CIC not using it
         

#### Step 5: Project Write Up

Final goal was accomplished to process massive amount of I-94 records - all immigration arrival to USA in year 2016.
There were 12 SAS files, one for each month, (> 1 M lines). This project builds data lake that can be used by Anlytics
team to study immigration pattern by country of origin and possibly co-relate with US city demographics.
This would not have been possible from the raw data in different file formats such as SAS, CSV, Text, JSON.

##### Technology choice
Creating data lake on S3 seemed appropriate choice, it offers flexibility, efficiency and cost effectiveness.
This design supports the goal of analytics team have data in a way that can be leveraged to find insights and 
possibly do predictive analytics.

Python, Spark framework are appropriate choice for implementation because of the library support.
Spark API offers convinient Read/Write to AWS S3 and course offered several examples, code snippet to help. 

Spark package 'com.github.saurfang.sas.spark' was used to read 12 I94 SAS('sas7bdt') format files.  However, that caused lot of challenges
to use spark to read from S3.  Couldn't get over the Project workspace environment dependency issue with Hadoop version.
After many days of futile efforts, wrote python utility program - <b>i94-SAS.py</b> that reads I94 SAS files for each month,
and writes back as parquet files.  These parquet files were copied to AWS S3.  
I learned the lesson in development environment issues - how time consuming they can be.

ETL pipeline is run with immigration-etl.py, it handles reading parquet files from S3 using spark session with 'org.apache.hadoop:hadoop-aws:2.7.0', 
joining with above mentioned several other data sources and write results back to S3 in parquet format.

If I can afford more time at this, there is lot of room for automation. 

##### Data Refresh
ETL script should run at monthly, because I94 data is given for every month, it should be synchronized with new I94 monthly data refreshed.  
Immigration and Immigrant table will be refresh monthly.  However, other data is relatively more static (airport codes, city demographics) that can be refreshed
less frequently.

##### How would I approach the problem differently under the following scenarios:

(1) If the data was increased by 100x:        
    Work only with small sample to test the ETL pipeline, data flow. I would still use Apache Spark - but run in distributed mode 
    on a cluster.  Leveraging AWS EC2 (larger cluster with more nodes) with EMR service to handle large amount of processing to handle the data. 
    Run and store results on cloud (e.g. AWS S3) with full dataset.  Local system will only used with small subset of data.

(2) If the pipelines were run on a daily basis by 7am:    
    Consider creating 'reporting' data server to not disrupt the production or operations group. If something breaks in production
    it can impact organization performance.  Tools like Apache Airflow can be leverged for scheduled run.   

(3) If the database needed to be accessed by 100+ people:
    Find out if all of these people have same need (common report) or more specific (custom) by division - e.g. Finance, HR, Operations.
    If possible, create different views based on this for best performance.
    Explore/Research more on Hadoop YARN for High Availibity perspective.