# Study of Immigration Data in the United States
### Data Engineering Capstone Project

#### Project Summary

This is the capstone project for the Udacity Data Engineering Nanodegree program. The idea is to take multiple disparate data sources, clean the data, and process it through an ETL pipeline to produce a usable data set for analytics.

We will be looking at the immigration data for the U.S. ports (I94 immigration data). The plan is to enrich this data using the other data sources suggested, build an ETL pipeline to process the raw data and create a data warehouse which can be used for analytics. 


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

#### Import Libraries

In [1]:
# Do all imports and installs here
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os
import configparser
import datetime
import re

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, quarter
from pyspark.sql.functions import monotonically_increasing_id, round, substring, upper
from pyspark.sql.types import *
#import chart_studio.plotly as py
import psycopg2

# import sys  
# sys.path.insert(0, "../src/")
from etl import check_data_quality

#### Load Configuration Data

In [2]:
# config = configparser.ConfigParser()
# config.read('config.cfg')

# os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

#### Create a Spark Session

In [3]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [4]:
spark

## Step 1: Scope the Project and Gather Data

### Project Scope
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

In this project, we will be gathering, assessing, cleaning, creating data models, setup and push data to a data warehouse where it will be analyzed to produce informative reports.

At a high level:

- Data is extracted from the immigration SAS data, partitioned by year, month, and day, and stored in a data lake on Amazon S3 as Parquet files.
- The partitioned data is loaded into Redshift into staging tables
- Design fact and dimension tables
- The staging data is combined with other staged data sources to produce the final fact and dimension records in the Redshift warehouse.

Example questions we could explore with the final data set:

+ For a given port city, how many immigrants enter from which countries?
+ Is there any relationship between the average temperature of the country of origin and average temperature of the port city of entry?
+ Is there any relationship between the connection between the volume of travel and the number of entry ports (ie airports)
+ The effects of temperature on the volume of travellers
+ What time of year or month sees more immigration for certain areas?

#### Technical Overview
Project uses following technologies,

+ AWS S3 Storage : To store inputs & outputs
+ AWS Redshift as Date Warehouse for Analytics
+ Juypter Notebooks
+ Python
+ Spark
+ Libraries : Pandas & Pyspark

### Project Datasets

**I94 Immigration Data (immigration)** 

The i94 data contains information about visitors to the US via an i94 form that all visitors must complete. It comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports of entry. - [source](https://www.trade.gov/national-travel-and-tourism-office). 

It is provided in SAS7BDAT format which is a binary database storage format. It is created by Statistical Analysis System (SAS) software to store data.

The immigration data is partitioned into monthly SAS files. Each file is around 300 to 700 MB. The data provided represents 12 months of data for the year 2016. This is the bulk of the data used in the project.

A data dictionary ```I94_SAS_Labels_Descriptions.SAS``` is provided for the immigration data. In addition to descriptions of the various fields, the port and country codes used were listed in table format. Two csv files are extracted from the dictionary: 

* ```i94_countries.csv```: Table containing country codes used in the dataset.
* ```i94_ports.csv```: Table containing city codes used in the dataset.

These files will be used as a lookup when extracting the immigration data.

**World Temparature data (temperature)** 

This dataset comes from Kaggle. - [source](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

It contains average temperature data for countries and cities around the world between 1743-11-01 and 2013-09-01.

The data is stored in ```GlobalLandTemperaturesByCity.csv``` (508 MB).

**U.S. City Demographic Data (demographics)**

This data comes from OpenSoft and contains demographic data for U.S. cities and states, such as age, population, veteran status and race. - [source](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

This data placed in the data lake as a single CSV file (246 KB). This data will be combined with port city data to provide ancillary demographic info for port cities.

**Airport Code Table (airports)**

This is a simple table of airport codes and corresponding cities. It comes from datahub.io. - [source](https://datahub.io/core/airport-codes#data)

This data is a single CSV file (5.8 KB). It provides additional information for airports and can be combined with the immigration port city info.


Below is a table of datasets used in the project:
<table>
<thead>
<tr>
<th>Source name</th>
<th>Filename</th>
<th>Format</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>I94 Immigration Sample Data</td>
<td>immigration_data_sample.csv</td>
<td>csv</td>
<td>This is a sample data which is from the US National Tourism and Trade Office.</td>
</tr>
<tr>
<td><a href="https://travel.trade.gov/research/reports/i94/historical/2016.html">I94 Immigration Data</a></td>
<td>data/18-83510-I94-Data-2016/i94_***16_sub.sas7bdat</td>
<td>SAS</td>
<td>This data comes from the US National Tourism and Trade Office.</td>
</tr>
<tr>
<td><a href="https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data">World Temperature Data</a></td>
<td>world_temperature.csv</td>
<td>csv</td>
<td>This dataset contains temperature data of various cities from 1700&#39;s - 2013. This dataset came from Kaggle.</td>
</tr>
<tr>
<td><a href="https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/">U.S. City Demographic Data</a></td>
<td>us-cities-demographics.csv</td>
<td>csv</td>
<td>This dataset contains population details of all US Cities and census-designated places includes gender &amp; race informatoin. This data came from OpenSoft.</td>
</tr>
<tr>
<td><a href="https://datahub.io/core/airport-codes#data">Airport Codes</a></td>
<td>airport-codes_csv.csv</td>
<td>csv</td>
<td>This is a simple table of airport codes and corresponding cities.</td>
</tr>
<tr>
<td>I94_country</td>
<td>i94_countries.csv</td>
<td>csv</td>
<td>Shows corresponding i94 Country of Citizenship &amp; Country of Residence codes. Source : I94_SAS_Labels_Descriptions.SAS</td>
</tr>
<tr>
<td>I94_port</td>
<td>i94_ports.csv</td>
<td>csv</td>
<td>Shows US Port of Entry city names and their corresponding codes. Source : I94_SAS_Labels_Descriptions.SAS</td>
</tr>
</tbody>
</table>

## Step 2: Explore and Assess the Data

#### 1. Immigration Data

In [9]:
immigration_data_path = '../Data_Engineering_Capstone_Project_local2/data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_immigration_spark =spark.read.format('com.github.saurfang.sas.spark').load(immigration_data_path)

In [10]:
df_immigration_spark.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [11]:
#df_immigration.info
df_immigration_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [12]:
df_immigration_spark.describe().show()

+-------+-----------------+--------------------+-------+------------------+------------------+-------+-----------------+------------------+------------------+------------------+------------------+-------------------+-------+--------------------+--------+-----------------+-------+-------+-------+-------+------------------+------------------+-------+-----------------+------------------+--------------------+------------------+--------+
|summary|            cicid|               i94yr| i94mon|            i94cit|            i94res|i94port|          arrdate|           i94mode|           i94addr|           depdate|            i94bir|            i94visa|  count|            dtadfile|visapost|            occup|entdepa|entdepd|entdepu|matflag|           biryear|           dtaddto| gender|           insnum|           airline|              admnum|             fltno|visatype|
+-------+-----------------+--------------------+-------+------------------+------------------+-------+-----------------+------

Read the whole sad7bdat files sperated by month

In [13]:
# columns with over 90% missing values and with no sense 
# df_immigration.drop(columns = ['occup', 'entdepu','insnum', 'count', 'dtaddto', 'entdepd', 'admnum', 'matflag'])
df_immigration_spark.drop('occup', 'entdepu','insnum', 'count', 'dtaddto', 'entdepd', 'admnum', 'matflag')

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, dtadfile: string, visapost: string, entdepa: string, biryear: double, gender: string, airline: string, fltno: string, visatype: string]

In [14]:
# Create udf to convert SAS date to PySpark date 
# @udf(StringType())
# def convert_datetime(x):
#     if x:
#         return (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).day
#     return None
# # return (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat()
df_immigration_cleaned = df_immigration_spark.dropDuplicates(['cicid'])
df_immigration_cleaned = df_immigration_spark.dropna(how="any", subset=["i94port", "i94addr", "gender"])
#df_immigration_cleaned = df_immigration_cleaned.withColumn("arrdate", convert_datetime(df_immigration_cleaned.arrdate))

#### Exact ports info from I94_SAS_Labels_Descriptions.SAS
i94_ports.csv: Table containing city codes used in the dataset

In [15]:
# Create UDF for process date, mode, visa
def convert_sas_date(days):
    """
    Converts SAS date stored as days since 1/1/1960 to datetime
    :param days: Days since 1/1/1960
    :return: datetime
    """
    if days is None:
        return None
    return datetime.date(1960, 1, 1) + datetime.timedelta(days=days)


def get_sas_day(days):
    """
    Converts SAS date stored as days since 1/1/1960 to day of month
    :param days: Days since 1/1/1960
    :return: Day of month value as integer
    """
    if days is None:
        return None
    return (datetime.date(1960, 1, 1) + datetime.timedelta(days=days)).day


def convert_i94mode(mode):
    """
    Converts i94 travel mode code to a description
    :param mode: int i94 mode as integer
    :return: i94 mode description
    """
    if mode == 1:
        return "Air"
    elif mode == 2:
        return "Sea"
    elif mode == 3:
        return "Land"
    else:
        return "Not Reported"


def convert_visa(visa):
    """
    Converts visa numeric code to description
    :param visa: str
    :return: Visa description: str
    """
    if visa is None:
        return "Not Reported"
    elif visa == 1:
        return "Business"
    elif visa == 2:
        return "Pleasure"
    elif visa == 3:
        return "Student"
    else:
        return "Not Reported"


def format_state(s):
    """
    Format state column
    :param s:
    :return:
    """
    s = s.replace('DIST. OF', 'District of') \
         .replace('S.', 'South') \
         .replace('N.', 'North') \
         .replace('W.', 'West')
    return ' '.join([w.capitalize() if w != 'of' else w for w in s.split()])


def covert_i94port(i94port, i94addr):
    """
    Process i94port dictionary
    :param i94port:
    :param i94addr:
    :return: i94port_split
    """
    i94port_split = {}
    index = 0
    for k, v in i94port.items():
        if not re.match('^Collapsed|^No PORT Code', v):
            try:
                # extract state part from i94port
                # the state part contains the state and also other words
                state_part = v.rsplit(',', 1)[1]
                city_part = v.rsplit(',', 1)[0]
                # create a set of all words in state part
                state_part_set = set(state_part.split())
                # if the state is valid (is in the set(i94addr.keys()), then retrieve state
                state = list(set(i94addr.keys()).intersection(state_part_set))[0]
                # add state to dict
                i94port_split[index] = [k, city_part, state]
            except IndexError:
                # no state is specified for Washington DC in labels so it is added here
                # 'MARIPOSA AZ' is not split by ","
                if v == 'WASHINGTON DC':
                    i94port_split[index] = [k, 'WASHINGTON DC', 'DC']
                elif v == 'MARIPOSA AZ':
                    i94port_split[index] = [k, 'MARIPOSA', 'AZ']
                else:
                    i94port_split[index] = [k, 'NULL', 'NULL']

        else:
            i94port_split[index] = [k, 'NULL', 'NULL']
        index += 1
    return i94port_split

In [16]:
def process_airport_data(spark, input_data, output_data):
    """
    Description: Process the airport data files, transform it to airport table
    :param spark: spark session
    :param input_data: input file path
    :param output_data: output file path
   """
    print('Processing airport codes dataset...')

    # get filepath to airport data file
    # airport_data_path = os.path.join(input_data, 'airport-codes_csv.csv')
    airport_data_path = input_data + 'airport-codes_csv.csv'
    df_airport_spark = spark.read\
        .format('csv')\
        .option("inferSchema", "true")\
        .option("header", "true")\
        .load(airport_data_path)\
        .drop("coordinates", "gps_code", "continent", "elevation_ft")

    dim_airport = df_airport_spark\
        .filter((col("type") == "small_airport") |
                (col("type") == "large_airport") |
                (col("type") == "medium_airport"))\
        .filter(df_airport_spark["iso_country"] == "US")\
        .filter(df_airport_spark['local_code'].isNotNull())\
        .withColumn("state", substring(df_airport_spark["iso_region"], 4, 2))
#         .filter(df_airport_spark['iata_code'].isNotNull()) \

    # airport_out_path = os.path.join(output_data, 'dim_airport/')
    airport_out_path = output_data + 'dim_airport/'
    dim_airport.write.parquet(airport_out_path)

    print('Finished processing airport codes data.')
    # return dim_airport

In [17]:
convert_i94mode_udf = udf(convert_i94mode, StringType())
convert_sas_date_udf = udf(convert_sas_date, DateType())
convert_visa_udf = udf(convert_visa, StringType())
get_sas_day_udf = udf(get_sas_day, IntegerType())

In [18]:
# Create dim_time spark dataframe
df_time_spark = df_immigration_spark.select(col("arrdate").alias("time")).union(df_immigration_spark.select(col("depdate").alias("time"))).distinct()
# df_time_spark.show()
time_table = df_time_spark.withColumn('date', convert_sas_date_udf(df_time_spark['time']))
time_table = time_table.select(
    col('time').alias('time_id').cast(IntegerType())\
    , col('date').alias('date') \
    , year('date').alias('year') \
    , quarter('date').alias('quarter')\
    , month('date').alias('month') \
    , dayofmonth('date').alias('day') \
    , dayofweek('date').alias('weekday')\
    , weekofyear('date').alias('week'))
time_table.show()

+-------+----------+----+-------+-----+---+-------+----+
|time_id|      date|year|quarter|month|day|weekday|week|
+-------+----------+----+-------+-----+---+-------+----+
|  20593|2016-05-19|2016|      2|    5| 19|      5|  20|
|  20689|2016-08-23|2016|      3|    8| 23|      3|  34|
|  20673|2016-08-07|2016|      3|    8|  7|      1|  31|
|  20467|2016-01-14|2016|      1|    1| 14|      5|   2|
|  20652|2016-07-17|2016|      3|    7| 17|      1|  28|
|  20196|2015-04-18|2015|      2|    4| 18|      7|  16|
|  20705|2016-09-08|2016|      3|    9|  8|      5|  36|
|  20621|2016-06-16|2016|      2|    6| 16|      5|  24|
|  20550|2016-04-06|2016|      2|    4|  6|      4|  14|
|  20682|2016-08-16|2016|      3|    8| 16|      3|  33|
|  20203|2015-04-25|2015|      2|    4| 25|      7|  17|
|  20592|2016-05-18|2016|      2|    5| 18|      4|  20|
|  20614|2016-06-09|2016|      2|    6|  9|      5|  23|
|  20556|2016-04-12|2016|      2|    4| 12|      3|  15|
|  20683|2016-08-17|2016|      

In [19]:
time_table.printSchema()

root
 |-- time_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- week: integer (nullable = true)



In [20]:
df_immigration_spark = df_immigration_spark \
        .withColumn('arrival_date', convert_sas_date_udf(df_immigration_spark['arrdate'])) \
        .withColumn('departure_date', convert_sas_date_udf(df_immigration_spark['depdate'])) \
        .withColumn('arrival_year', df_immigration_spark['i94yr'].cast(IntegerType())) \
        .withColumn('arrival_month', df_immigration_spark['i94mon'].cast(IntegerType())) \
        .withColumn('arrival_day', get_sas_day_udf(df_immigration_spark['arrdate'])) \
        .withColumn('age', df_immigration_spark['i94bir'].cast(IntegerType())) \
        .withColumn('country_of_bir', df_immigration_spark['i94cit'].cast(IntegerType())) \
        .withColumn('country_of_res', df_immigration_spark['i94res'].cast(IntegerType())) \
        .withColumn('port_of_admission', df_immigration_spark['i94port'].cast(StringType())) \
        .withColumn('birth_year', df_immigration_spark['biryear'].cast(IntegerType())) \
        .withColumn('mode', convert_i94mode_udf(df_immigration_spark['i94mode'])) \
        .withColumn('visa_category', convert_visa_udf(df_immigration_spark['i94visa']))

In [21]:
dim_immigration = df_immigration_spark \
            .filter(df_immigration_spark['country_of_bir'].isNotNull()) \
            .filter(df_immigration_spark['country_of_res'].isNotNull()) \
            .filter(df_immigration_spark['arrdate'].isNotNull()) \
            .filter(df_immigration_spark['depdate'].isNotNull()) \
            .filter(df_immigration_spark['port_of_admission'].isNotNull()) \
            .select(
                col('cicid').alias('id'),
                col('arrdate').alias('arr_ts'),
                'arrival_year',
                'arrival_month',
                'country_of_bir',
                'country_of_res',
                'port_of_admission',
                'mode',
                'visa_category',
                'visatype',
                col('depdate').alias('dep_ts')) \
            .dropDuplicates()

In [22]:
dim_immigration = df_immigration_spark.select(
        col('cicid').alias('id').cast(IntegerType()),
        col('arrdate').alias('arrdate_id').cast(IntegerType()),
        col('depdate').alias('depdate_id').cast(IntegerType()),
        'country_of_bir',
        'country_of_res',
        'port_of_admission',
        'mode',
        'visa_category',
        'visatype') \
        .dropDuplicates()

In [23]:
dim_immigration.printSchema()


root
 |-- id: integer (nullable = true)
 |-- arrdate_id: integer (nullable = true)
 |-- depdate_id: integer (nullable = true)
 |-- country_of_bir: integer (nullable = true)
 |-- country_of_res: integer (nullable = true)
 |-- port_of_admission: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- visa_category: string (nullable = true)
 |-- visatype: string (nullable = true)



In [24]:
dim_immigration.limit(5).toPandas()

Unnamed: 0,id,arrdate_id,depdate_id,country_of_bir,country_of_res,port_of_admission,mode,visa_category,visatype
0,212,20545,,103,103,NYC,Air,Pleasure,WT
1,218,20545,20548.0,103,103,NYC,Air,Pleasure,WT
2,420,20545,20601.0,103,103,NYC,Air,Pleasure,WT
3,1153,20545,20554.0,104,104,OGG,Air,Pleasure,WT
4,1221,20545,20546.0,104,104,NYC,Air,Pleasure,WT


In [38]:
df_immigration_cleaned.createOrReplaceTempView("immigration")

In [39]:
spark.sql("""
    SELECT COUNT(DISTINCT(admnum))
    FROM immigration
""").show()

+----------------------+
|count(DISTINCT admnum)|
+----------------------+
|               2540211|
+----------------------+



In [40]:
spark.sql("""
    SELECT COUNT(DISTINCT(cicid))
    FROM immigration
""").show()

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              2551402|
+---------------------+



In [37]:
dim_immigration.count()

2953856

All invalid values are grouped together as 'INVALID ENTRY'.

In [57]:
i94cntyl = { k: (v if not re.match('^INVALID:|^Collapsed|^No Country Code', v) else 'INVALID ENTRY') 
                 for k, v in i94cntyl.items()}
#i94cntyl

The states in map are formatted to comply with state names in other datasets.

+ 'DIST. OF' is replaced with 'District of'
+ 'S.', 'N.', 'W.' are replaced with 'South', 'North', 'West'
+ all states are capitalized

In [58]:
def format_state(s):
    s = s.replace('DIST. OF', 'District of') \
         .replace('S.', 'South') \
         .replace('N.', 'North') \
         .replace('W.', 'West')
    return ' '.join([w.capitalize() if w != 'of' else w for w in s.split() ])

# format addr labels
i94addr = {k: format_state(v) for k, v in i94addr.items()}
# i94addr

In [59]:
i94port_split = {}
index = 0 
for k, v in i94port.items():
    if not re.match('^Collapsed|^No PORT Code', v):
        try:
            # extract state part from i94port
            # the state part contains the state and also other words
            state_part = v.rsplit(',', 1)[1]
            city_part = v.rsplit(',', 1)[0] 
            # create a set of all words in state part
            state_part_set = set(state_part.split())
            # if the state is valid (is in the set(i94addr.keys()), then retrieve state
            state = list(set(i94addr.keys()).intersection(state_part_set))[0]
            # add state to dict
            i94port_split[index] = [k, city_part, state]
        except IndexError:
            # no state is specified for Washington DC in labels so it is added here
            # 'MARIPOSA AZ' is not split by ","
            if v == 'WASHINGTON DC':
                i94port_split[index] = [k, 'WASHINGTON DC','DC']
            elif v == 'MARIPOSA AZ':
                i94port_split[index] = [k,  'MARIPOSA', 'AZ']
            else:
                i94port_split[index] = [k, 'INVALID ENTRY', 'INVALID ENTRY']
            
    else:
        i94port_split[index] = [k, 'INVALID ENTRY', 'INVALID ENTRY']
    index += 1

In [60]:
df_i94cntyl = pd.DataFrame(i94cntyl.items(), columns=['country_code', 'country'])
#df_i94port = pd.DataFrame(i94port.items(), columns=['port_code', 'port'])
df_i94port = pd.DataFrame(i94port_split.values(), columns=['port_code', 'city_name', 'state_code'])
df_i94mode = pd.DataFrame(i94mode.items(), columns=['mode_num', 'mode'])
df_i94addr = pd.DataFrame(i94addr.items(), columns=['state_id', 'state_name'])
df_i94visa = pd.DataFrame(i94visa.items(), columns=['visa_type_num', 'visa_type'])

In [61]:
df_i94port_spark = spark.createDataFrame(df_i94port)
df_i94addr_spark = spark.createDataFrame(df_i94addr)

In [62]:
df_i94port_dict = df_i94port_spark\
    .join(df_i94addr_spark, df_i94port_spark['state_code'] == df_i94addr_spark['state_id'], 'left')

In [63]:
df_i94port_dict = df_i94port_dict.select("port_code", upper(col("city_name")).alias("city_name"), "state_code", upper(col("state_name")).alias("state_name"))\
                 .filter(df_i94port_dict["state_name"].isNotNull())

In [64]:
df_i94port_dict.show()

+---------+------------+----------+--------------+
|port_code|   city_name|state_code|    state_name|
+---------+------------+----------+--------------+
|      DOU|     DOUGLAS|        AZ|       ARIZONA|
|      LUK|   LUKEVILLE|        AZ|       ARIZONA|
|      MAP|    MARIPOSA|        AZ|       ARIZONA|
|      NAC|        NACO|        AZ|       ARIZONA|
|      NOG|     NOGALES|        AZ|       ARIZONA|
|      PHO|     PHOENIX|        AZ|       ARIZONA|
|      POR|      PORTAL|        AZ|       ARIZONA|
|      SLU|    SAN LUIS|        AZ|       ARIZONA|
|      SAS|      SASABE|        AZ|       ARIZONA|
|      TUC|      TUCSON|        AZ|       ARIZONA|
|      YUI|        YUMA|        AZ|       ARIZONA|
|      YUM|        YUMA|        AZ|       ARIZONA|
|      CHL|  CHARLESTON|        SC|SOUTH CAROLINA|
|      CAE|    COLUMBIA|        SC|SOUTH CAROLINA|
|      GEO|  GEORGETOWN|        SC|SOUTH CAROLINA|
|      GSP|  GREENVILLE|        SC|SOUTH CAROLINA|
|      GRR|       GREER|       

In [65]:
df_i94port_dict.count()

525

In [67]:
# Dictionary of valid i94port codes is created
i94_sas_label_descriptions_fname = "data/I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))

659


In [68]:
dim_immigration = dim_immigration.join(df_i94port_dict, 
          df_i94port_dict['port_code'] == dim_immigration['port_of_admission'], 'inner')

In [69]:
dim_immigration.limit(5).toPandas()

Unnamed: 0,id,arrdate_id,depdate_id,country_of_bir,country_of_res,port_of_admission,mode,visa_category,visatype,port_code,city_name,state_code,state_name
0,3606841,20563,20575,582,582,BGM,Air,Pleasure,B2,BGM,BANGOR,ME,MAINE
1,3599392,20563,20575,582,582,BGM,Air,Pleasure,B2,BGM,BANGOR,ME,MAINE
2,2177010,20556,20585,135,135,BGM,Air,Business,B1,BGM,BANGOR,ME,MAINE
3,4671460,20569,20571,108,108,BGM,Air,Business,B1,BGM,BANGOR,ME,MAINE
4,4040457,20566,20581,111,111,BGM,Air,Business,B1,BGM,BANGOR,ME,MAINE


### 2. World Temparature Data 

In [71]:
temperature_data_path = 'data/GlobalLandTemperaturesByCity.csv'

In [72]:
df_temperature = pd.read_csv(temperature_data_path)

In [73]:
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [74]:
df_temperature.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

In [75]:
df_temperature_spark = spark.read.csv(temperature_data_path, header=True, inferSchema=True)

In [76]:
df_temperature_spark.dtypes

[('dt', 'timestamp'),
 ('AverageTemperature', 'double'),
 ('AverageTemperatureUncertainty', 'double'),
 ('City', 'string'),
 ('Country', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string')]

In [77]:
df_temperature_spark = df_temperature_spark \
    .filter(df_temperature_spark["country"] == "United States") \
    .filter(df_temperature_spark.AverageTemperature.isNotNull())\
    .filter(year(df_temperature_spark["dt"]) == 2012)\
    .withColumn("year", year(df_temperature_spark["dt"])) \
    .withColumn("month", month(df_temperature_spark["dt"]))

In [78]:
df_temperature_spark.dtypes

[('dt', 'timestamp'),
 ('AverageTemperature', 'double'),
 ('AverageTemperatureUncertainty', 'double'),
 ('City', 'string'),
 ('Country', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('year', 'int'),
 ('month', 'int')]

In [79]:
temperature_table = df_temperature_spark.select(
    "year",
    "month",
    "AverageTemperature",
    upper(col("City")).alias('city'),
    "Country")\
    .dropDuplicates()

In [80]:
temperature_table.limit(5).toPandas()

Unnamed: 0,year,month,AverageTemperature,city,Country
0,2012,1,2.382,ALEXANDRIA,United States
1,2012,9,26.349,AUSTIN,United States
2,2012,2,7.129,CARY,United States
3,2012,7,27.427,CHATTANOOGA,United States
4,2012,1,1.023,CINCINNATI,United States


In [81]:
temperature_table = temperature_table.groupby("year", "city").avg("AverageTemperature")

In [82]:
temperature_table.limit(5).toPandas()

Unnamed: 0,year,city,avg(AverageTemperature)
0,2012,TORRANCE,17.089583
1,2012,INGLEWOOD,17.089583
2,2012,FORT WAYNE,12.203833
3,2012,FORT COLLINS,10.561417
4,2012,OAKLAND,15.051583


In [83]:
temperature_by_city = temperature_table.select("year", "city", round(col("avg(AverageTemperature)"),2).alias("avg_temp"))

In [84]:
temperature_by_city.limit(5).toPandas()

Unnamed: 0,year,city,avg_temp
0,2012,TORRANCE,17.09
1,2012,INGLEWOOD,17.09
2,2012,FORT WAYNE,12.2
3,2012,FORT COLLINS,10.56
4,2012,OAKLAND,15.05


In [173]:
temperature_by_city.count()

248

In [85]:
df_i94port_dict.limit(5).toPandas()

Unnamed: 0,port_code,city_name,state_code,state_name
0,DOU,DOUGLAS,AZ,ARIZONA
1,LUK,LUKEVILLE,AZ,ARIZONA
2,MAP,MARIPOSA,AZ,ARIZONA
3,NAC,NACO,AZ,ARIZONA
4,NOG,NOGALES,AZ,ARIZONA


In [86]:
df_i94port_dict_with_temp = df_i94port_dict\
    .join(temperature_by_city, df_i94port_dict['city_name'] == temperature_by_city['City'], 'left')

In [87]:
df_i94port_dict_with_temp.count()

525

In [88]:
df_i94port_dict_with_temp.limit(5).toPandas()

Unnamed: 0,port_code,city_name,state_code,state_name,year,city,avg_temp
0,ADW,ANDREWS AFB,MD,MARYLAND,,,
1,ANZ,ANZALDUAS,TX,TEXAS,,,
2,BAR,BAKER AAF - BAKER ISLAND,AK,ALASKA,,,
3,SAV,SAVANNAH,GA,GEORGIA,2012.0,SAVANNAH,20.78
4,CAR,CARIBOU MUNICIPAL AIRPORT,MN,MINNESOTA,,,


### 3. US City Demographic Data

In [89]:
demographics_data_path = "data/us-cities-demographics.csv"

In [90]:
df_demographics_spark = spark.read.csv(demographics_data_path, inferSchema=True, header=True, sep=';')

In [91]:
df_demographics_spark.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [92]:
df_demographics_spark.count()

2891

In [93]:
#Calculate percentages of each numeric column and create new columns.
df_demographics_spark=df_demographics_spark\
.withColumn("Median Age",col("Median Age").cast("float"))\
.withColumn("pct_male_pop",df_demographics_spark["Male Population"]/df_demographics_spark["Total Population"]*100)\
.withColumn("pct_female_pop",df_demographics_spark["Female Population"]/df_demographics_spark["Total Population"]*100)\
.withColumn("pct_veterans",df_demographics_spark["Number of Veterans"]/df_demographics_spark["Total Population"]*100)\
.withColumn("pct_foreign_born",df_demographics_spark["Foreign-born"]/df_demographics_spark["Total Population"]*100)\
.withColumn("pct_race",df_demographics_spark["Count"]/df_demographics_spark["Total Population"]*100)\
.orderBy("State")

In [94]:
#Select columns with new calculated percentages.
df_demographics_spark_select = df_demographics_spark\
    .select(col("City").alias("city"),\
            col("State").alias("state"), \
            col("Median Age").alias("median_age"),\
            "pct_male_pop",\
            "pct_female_pop",\
            "pct_veterans",\
            "pct_foreign_born",\
            "Race",\
            "pct_race",\
            "Total Population")

In [95]:
#pivot the Race column
df_demographics_spark_pivot = df_demographics_spark_select.groupBy("city", "state", "median_age", "pct_male_pop",\
                                    "pct_female_pop","pct_veterans",\
                                    "pct_foreign_born", "Total Population").pivot("Race").avg("pct_race")

In [96]:
#change the header name of the race fields for spark compatibility.
df_demographics_spark_pivot=df_demographics_spark_pivot\
    .select(upper(col("city")).alias("city"),
            upper(col("state")).alias("state"),
            round(col("median_age")).alias("median_age"),
             round(col("pct_male_pop"), 1).alias("perc_of_male_pop"),
             round(col("pct_female_pop"), 1).alias("perc_of_female_pop"),
             round(col("pct_veterans"), 1).alias("perc_of_veterans"),
             round(col("pct_foreign_born"), 1).alias("perc_of_born"),\
             round(col("American Indian and Alaska Native"), 1).alias("perc_of_native_american"),\
             round(col("Asian"), 1).alias("perc_of_asian"),\
             round(col("Black or African-American"), 1).alias("perc_of_black"),\
             round(col("Hispanic or Latino"), 1).alias("perc_of_latino"),\
             round(col("White"), 1).alias("perc_of_white"),\
             col("Total Population").alias("total_pop"))\
    .dropDuplicates(['city'])

In [97]:
df_demographics_spark_pivot.count()

567

In [98]:
df_demographics_spark_pivot = df_demographics_spark_pivot.withColumn("city_id", monotonically_increasing_id())

In [99]:
df_demographics_spark_pivot.limit(5).toPandas()

Unnamed: 0,city,state,median_age,perc_of_male_pop,perc_of_female_pop,perc_of_veterans,perc_of_born,perc_of_native_american,perc_of_asian,perc_of_black,perc_of_latino,perc_of_white,total_pop,city_id
0,CHINO,CALIFORNIA,37.0,59.6,40.4,4.9,21.8,5.6,15.1,8.0,53.1,52.0,85599,8589934592
1,STAMFORD,CONNECTICUT,35.0,50.4,49.6,1.8,34.1,1.1,8.5,18.9,25.8,66.4,128877,8589934593
2,YAKIMA,WASHINGTON,34.0,48.2,51.8,5.0,16.8,2.2,2.4,1.0,47.1,84.6,93700,8589934594
3,AMARILLO,TEXAS,34.0,49.8,50.2,5.5,10.6,2.1,4.3,7.0,32.8,87.3,199651,17179869184
4,MUNCIE,INDIANA,27.0,46.6,53.4,4.3,1.5,0.3,,15.0,3.6,86.0,69701,17179869185


In [100]:
dim_demographics= df_i94port_dict\
    .join(df_demographics_spark_pivot, 
          (df_i94port_dict['city_name'] == df_demographics_spark_pivot['city']) & (df_i94port_dict['state_name'] == df_demographics_spark_pivot['state']), "inner")\
    .drop("city_name", "state_name")

In [101]:
dim_demographics.limit(5).toPandas()

Unnamed: 0,port_code,state_code,city,state,median_age,perc_of_male_pop,perc_of_female_pop,perc_of_veterans,perc_of_born,perc_of_native_american,perc_of_asian,perc_of_black,perc_of_latino,perc_of_white,total_pop,city_id
0,PHO,AZ,PHOENIX,ARIZONA,34.0,50.3,49.7,4.6,19.2,2.7,4.2,8.5,42.9,74.3,1563001,326417514497
1,TUC,AZ,TUCSON,ARIZONA,34.0,49.8,50.2,7.2,15.5,4.6,4.6,6.4,43.5,76.1,531674,472446402561
2,YUI,AZ,YUMA,ARIZONA,33.0,51.3,48.7,7.6,20.5,1.3,1.3,4.0,60.6,74.0,94145,25769803777
3,YUM,AZ,YUMA,ARIZONA,33.0,51.3,48.7,7.6,20.5,1.3,1.3,4.0,60.6,74.0,94145,25769803777
4,CHL,SC,CHARLESTON,SOUTH CAROLINA,35.0,47.2,52.8,6.9,4.3,0.5,2.0,22.1,2.9,76.8,135524,850403524608


In [102]:
dim_demographics.count()

111

In [267]:
# df_demographics_spark = df_demographics_spark.filter(df_demographics_spark['Count'].isNotNull())

In [103]:
df_demographics_spark_pivot.dtypes

[('city', 'string'),
 ('state', 'string'),
 ('median_age', 'float'),
 ('perc_of_male_pop', 'double'),
 ('perc_of_female_pop', 'double'),
 ('perc_of_veterans', 'double'),
 ('perc_of_born', 'double'),
 ('perc_of_native_american', 'double'),
 ('perc_of_asian', 'double'),
 ('perc_of_black', 'double'),
 ('perc_of_latino', 'double'),
 ('perc_of_white', 'double'),
 ('total_pop', 'int'),
 ('city_id', 'bigint')]

In [104]:
df_demographics_spark_pivot.createOrReplaceTempView("demographics")

In [108]:
spark.sql("select city, state, perc_of_male_pop, perc_of_asian, total_pop from demographics where state = 'ARIZONA' order by city ").show()

+--------------+-------+----------------+-------------+---------+
|          city|  state|perc_of_male_pop|perc_of_asian|total_pop|
+--------------+-------+----------------+-------------+---------+
|      AVONDALE|ARIZONA|            48.0|          3.5|    80683|
|  CASAS ADOBES|ARIZONA|            47.3|          4.6|    65265|
|      CHANDLER|ARIZONA|            49.4|         12.7|   260833|
|     FLAGSTAFF|ARIZONA|            47.2|          3.8|    70317|
|       GILBERT|ARIZONA|            47.2|          8.0|   247523|
|      GLENDALE|ARIZONA|            48.6|          6.3|   240114|
|      GOODYEAR|ARIZONA|            46.5|          7.2|    79003|
|          MESA|ARIZONA|            49.8|          3.1|   471833|
|       PHOENIX|ARIZONA|            50.3|          4.2|  1563001|
|SAN TAN VALLEY|ARIZONA|            48.8|          1.7|    82797|
|    SCOTTSDALE|ARIZONA|            48.9|          4.9|   236844|
|      SURPRISE|ARIZONA|            49.0|          2.5|   128442|
|         

In [109]:
spark.sql("select state,  sum(total_pop) as state_pop from demographics group by state order by state").show()

+--------------------+---------+
|               state|state_pop|
+--------------------+---------+
|             ALABAMA|  1049629|
|              ALASKA|   298695|
|             ARIZONA|  4328300|
|            ARKANSAS|   507047|
|          CALIFORNIA| 24301460|
|            COLORADO|  2463682|
|         CONNECTICUT|   797098|
|DISTRICT OF COLUMBIA|   672228|
|             FLORIDA|  5928707|
|             GEORGIA|  1711032|
|              HAWAII|   352766|
|               IDAHO|   398883|
|            ILLINOIS|  4484017|
|             INDIANA|  1882753|
|                IOWA|   733811|
|              KANSAS|   997013|
|            KENTUCKY|   929877|
|           LOUISIANA|  1172934|
|            MARYLAND|  1208662|
|       MASSACHUSETTS|  1780881|
+--------------------+---------+
only showing top 20 rows



### 4. Airport Data

In [25]:
df_airport = pd.read_csv('data/airport-codes_csv.csv')

In [26]:
df_airport.columns

Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'coordinates'],
      dtype='object')

In [113]:
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [114]:
df_airport['municipality'].nunique()

27133

In [115]:
df_airport['municipality'].count()

49399

In [116]:
df_airport['type'].count()

55075

In [117]:
df_airport.dtypes

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object

In [118]:
df_airport.isna().sum()

ident               0
type                0
name                0
elevation_ft     7006
continent       27719
iso_country       247
iso_region          0
municipality     5676
gps_code        14045
iata_code       45886
local_code      26389
coordinates         0
dtype: int64

In [40]:
df_airport_spark = spark.read \
  .option("inferSchema", "true") \
  .option("header",True) \
  .csv("data/airport-codes_csv.csv") \
  .drop("coordinates", "gps_code", "continent", "elevation_ft")

In [41]:
df_airport_spark.filter(df_airport_spark["iata_code"].isNull()).count()

45886

In [42]:
df_airport_spark.show()

+-----+-------------+--------------------+-----------+----------+------------+---------+----------+
|ident|         type|                name|iso_country|iso_region|municipality|iata_code|local_code|
+-----+-------------+--------------------+-----------+----------+------------+---------+----------+
|  00A|     heliport|   Total Rf Heliport|         US|     US-PA|    Bensalem|     null|       00A|
| 00AA|small_airport|Aero B Ranch Airport|         US|     US-KS|       Leoti|     null|      00AA|
| 00AK|small_airport|        Lowell Field|         US|     US-AK|Anchor Point|     null|      00AK|
| 00AL|small_airport|        Epps Airpark|         US|     US-AL|     Harvest|     null|      00AL|
| 00AR|       closed|Newport Hospital ...|         US|     US-AR|     Newport|     null|      null|
| 00AS|small_airport|      Fulton Airport|         US|     US-OK|        Alex|     null|      00AS|
| 00AZ|small_airport|      Cordes Airport|         US|     US-AZ|      Cordes|     null|      00AZ|


In [43]:
df_airport_spark.select("ident","type").distinct().count()

55075

In [44]:
df_airport_spark=df_airport_spark\
    .filter((col("type")=="small_airport") | (col("type")=="large_airport") | (col("type")=="medium_airport"))\
    .filter(df_airport_spark["iso_country"]=="US")\
    .filter(df_airport_spark['local_code'].isNotNull())\
    .withColumn("state",substring(df_airport_spark["iso_region"],4,2))

# df_airport_spark=df_airport_spark\
#     .filter(df_airport_spark["type"]=="small_airport")\
#     .filter(df_airport_spark["iso_country"]=="US")\
#     .filter(df_airport_spark['iata_code'].isNotNull())\
#     .withColumn("state",substring(df_airport_spark["iso_region"],4,2))

In [45]:
df_airport_spark.toPandas().isna().sum()

ident               0
type                0
name                0
iso_country         0
iso_region          0
municipality       12
iata_code       12522
local_code          0
state               0
dtype: int64

In [46]:
df_airport_spark.show()

+-----+-------------+--------------------+-----------+----------+------------+---------+----------+-----+
|ident|         type|                name|iso_country|iso_region|municipality|iata_code|local_code|state|
+-----+-------------+--------------------+-----------+----------+------------+---------+----------+-----+
| 00AA|small_airport|Aero B Ranch Airport|         US|     US-KS|       Leoti|     null|      00AA|   KS|
| 00AK|small_airport|        Lowell Field|         US|     US-AK|Anchor Point|     null|      00AK|   AK|
| 00AL|small_airport|        Epps Airpark|         US|     US-AL|     Harvest|     null|      00AL|   AL|
| 00AS|small_airport|      Fulton Airport|         US|     US-OK|        Alex|     null|      00AS|   OK|
| 00AZ|small_airport|      Cordes Airport|         US|     US-AZ|      Cordes|     null|      00AZ|   AZ|
| 00CA|small_airport|Goldstone /Gts/ A...|         US|     US-CA|     Barstow|     null|      00CA|   CA|
| 00CL|small_airport| Williams Ag Airport|    

In [47]:
df_airport_spark=df_airport_spark\
.select("local_code", "type", "name", upper(col("municipality")).alias("city"), col("state").alias("state_code"), "iso_country", "iata_code")

In [48]:
df_airport_spark.limit(5).toPandas()

Unnamed: 0,local_code,type,name,city,state_code,iso_country,iata_code
0,00AA,small_airport,Aero B Ranch Airport,LEOTI,KS,US,
1,00AK,small_airport,Lowell Field,ANCHOR POINT,AK,US,
2,00AL,small_airport,Epps Airpark,HARVEST,AL,US,
3,00AS,small_airport,Fulton Airport,ALEX,OK,US,
4,00AZ,small_airport,Cordes Airport,CORDES,AZ,US,


In [36]:
df_airport_spark.createOrReplaceTempView("airport")

In [129]:
df_i94port_dict_with_temp.createOrReplaceTempView("ports")

In [130]:
spark.sql("select count(distinct(iata_code)) from airport").show()

+-------------------------+
|count(DISTINCT iata_code)|
+-------------------------+
|                     1861|
+-------------------------+



In [131]:
spark.sql("select count(distinct(port_code)) from ports").show()

+-------------------------+
|count(DISTINCT port_code)|
+-------------------------+
|                      525|
+-------------------------+



In [132]:
spark.sql("select count(*) from ports join airport on ports.port_code == airport.iata_code").show()

+--------+
|count(1)|
+--------+
|     224|
+--------+



In [133]:
spark.sql("""SELECT * from airport WHERE iata_code ='FMY'""").show()

+-----+--------------+----------+----------+----------+-----------+---------+
|ident|          type|      name|      city|state_code|iso_country|iata_code|
+-----+--------------+----------+----------+----------+-----------+---------+
| KFMY|medium_airport|Page Field|FORT MYERS|        FL|         US|      FMY|
+-----+--------------+----------+----------+----------+-----------+---------+



In [134]:
spark.sql("""SELECT * from airport WHERE ident ='FMY'""").show()

+-----+----+----+----+----------+-----------+---------+
|ident|type|name|city|state_code|iso_country|iata_code|
+-----+----+----+----+----------+-----------+---------+
+-----+----+----+----+----------+-----------+---------+



In [135]:
spark.sql("""
     SELECT a.ident, a.name, p.port_code,  p.city, iso_country
     FROM airport a
     JOIN ports p ON a.iata_code = p.port_code
""").show()

+-----+--------------------+---------+--------+-----------+
|ident|                name|port_code|    city|iso_country|
+-----+--------------------+---------+--------+-----------+
| KL06|Furnace Creek Air...|      DTH|    null|         US|
| KPIE|St Petersburg Cle...|      PIE|    null|         US|
| KADW|  Joint Base Andrews|      ADW|    null|         US|
| KNYL|Yuma MCAS/Yuma In...|      YUM|    null|         US|
| KORL|Orlando Executive...|      ORL| ORLANDO|         US|
| KSAV|Savannah Hilton H...|      SAV|SAVANNAH|         US|
| KCAR|Caribou Municipal...|      CAR|    null|         US|
| KUGN|Waukegan National...|      UGN|    null|         US|
| KPGR|          Kirk Field|      PGR|    null|         US|
| KSAR|Sparta Community ...|      SAR|    null|         US|
| KATL|Hartsfield Jackso...|      ATL| ATLANTA|         US|
| KMIA|Miami Internation...|      MIA|   MIAMI|         US|
| PAHO|       Homer Airport|      HOM|    null|         US|
| KMEM|Memphis Internati...|      MEM| M

In [136]:
spark.sql("""
     SELECT count(DISTINCT(a.ident)) as count
     FROM airport a
     JOIN ports p ON a.iata_code = p.port_code
     WHERE p.city != "INVALID ENTRY" and iso_country = "US"
""").show()

+-----+
|count|
+-----+
|   57|
+-----+



In [137]:
spark.sql("""
     SELECT count(DISTINCT(a.ident)) as count
     FROM airport a
     JOIN ports p ON a.city = p.city_name
     WHERE p.city != "INVALID ENTRY" and iso_country = "US" 
""").show()

+-----+
|count|
+-----+
|  584|
+-----+



In [138]:
df = spark.sql("""
     SELECT a.ident, a.name, p.port_code,  p.city, iso_country, iata_code
     FROM airport a
     JOIN ports p ON a.city = p.city_name
     WHERE iso_country = "US" 
     order by p.port_code
""")

In [139]:
df.repartition(1)\
.write.format("csv")\
.option("header", "true")\
.save("../data/output/mydata2.csv")\

In [140]:
df1 = spark.sql("""
     SELECT *
     FROM ports 
     order by port_code
""")

In [141]:
df1.repartition(1)\
.write.format("csv")\
.option("header", "true")\
.save("../data/output/mydata3.csv")\

In [142]:
spark.sql("""
     SELECT DISTINCT(type) 
     FROM airport
     WHERE city != "INVALID ENTRY" and iso_country = "US"
""").show()

+--------------+
|          type|
+--------------+
| large_airport|
|medium_airport|
| small_airport|
+--------------+



In [143]:
spark.sql("""
     SELECT count(DISTINCT(a.ident)) as count
     FROM airport a
     JOIN ports p ON a.ident = p.port_code
     WHERE p.city != "INVALID ENTRY" and iso_country = "US"
""").show()

+-----+
|count|
+-----+
|    0|
+-----+



In [144]:
spark.sql("""
     SELECT ident, p.city, iso_country, port_code, iata_code
     FROM airport a
     JOIN ports p ON a.ident = p.port_code
     WHERE iso_country = "US"
     order by 1
""").show()

+-----+----+-----------+---------+---------+
|ident|city|iso_country|port_code|iata_code|
+-----+----+-----------+---------+---------+
|  48Y|null|         US|      48Y|     null|
|  MOS|null|         US|      MOS|     null|
|  ORI|null|         US|      ORI|     null|
|  SAS|null|         US|      SAS|      SAS|
+-----+----+-----------+---------+---------+



In [145]:
spark.sql("""
     SELECT ident, p.city, iso_country, port_code, iata_code
     FROM airport a
     JOIN ports p ON a.iata_code = p.port_code
     WHERE  iso_country = "US"
     order by 1
""").show()

+-----+-----------+-----------+---------+---------+
|ident|       city|iso_country|port_code|iata_code|
+-----+-----------+-----------+---------+---------+
| 89NY|       null|         US|      AXB|      AXB|
| K5T9|       null|         US|      EGP|      EGP|
| KABE|       null|         US|      ABE|      ABE|
| KABQ|ALBUQUERQUE|         US|      ABQ|      ABQ|
| KACY|       null|         US|      ACY|      ACY|
| KADH|       null|         US|      ADT|      ADT|
| KADS|       null|         US|      ADS|      ADS|
| KADW|       null|         US|      ADW|      ADW|
| KAFW|       null|         US|      AFW|      AFW|
| KAGS|       null|         US|      AGS|      AGS|
| KALB|       null|         US|      ALB|      ALB|
| KAND|       null|         US|      AND|      AND|
| KANP|       null|         US|      ANP|      ANP|
| KAPA|       null|         US|      APA|      APA|
| KAPF|       null|         US|      APF|      APF|
| KASE|       null|         US|      ASE|      ASE|
| KAST|     

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The star schema is chosen as the data model because it is simple and yet effective. users can write simple queries by joing fact and dimension tables to analyze the data.

Here are the tables of the schema:

**Fact Table: fact_immigration**
- id                  (PK)
- arr_ts              (FK)
- arrival_year         
- arrival_month
- country_of_bir      (FK)
- country_of_res      (FK)
- port_of_admission   (FK)
- mode
- visa_category
- visatype
- dep_ts              (FK)

**Dimension Tables: dim_immigrant**   
- immigrant_id         (PK)
- gender
- age
- birth_year

**Dimension Tables: dim_country**  
- country_code         (PK)
- country_name

**Dimension Tables: dim_time**  
- time_id              (PK)
- date  
- year
- quarter
- month
- day
- weekday
- week
**Dimension Tables: dim_demographics**   
- port_code            (PK)
- state_code
- city
- state
- median_age
- perc_of_male_pop
- perc_of_female_pop
- perc_of_veterans
- perc_of_foreign_born
- perc_of_native_american
- perc_of_asian
- perc_of_black
- perc_of_latin
- perc_of_white
- total_pop
**Dimension Tables: dim_airport**   
- local_code            (PK)
- airport_name
- airport_type
- state_code
- state
- iso_country
- iata_code

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

In [147]:
def process_immigration_data(spark, input_data, output_data):
    """
    Description: This function processes the songs data files.
    1. Read and load immigration_data from S3
    2. Transform them to create immigration
    3. Write them to partioned parquet files in table directories on S3

    :param spark: spark session
    :param input_data: input file path
    :param output_data: output file path
    :return: dim_immigrant, dim_time, dim_immigration
    """
    print('Processing US immigration dataset...')

    # paths of input datasets
    months = ['jun']
    # months = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']
    paths = [month.join(['18-83510-I94-Data-2016/i94_', '16_sub.sas7bdat']) for month in months]
    # immigration_data_paths = [os.path.join(input_data, path) for path in paths]
    immigration_data_paths = [input_data + path for path in paths]

    # Defining User Defined Function (UDF) to convert log timestamp (seconds since epoch) to \
    # actual Datetime Type timestamp
    # create timestamp column from original timestamp column

    # convert_sas_date = udf(lambda days: (datetime.date(1960, 1, 1) + datetime.timedelta(days=days)), T.DateType())
    # df = df.withColumn('timestamp', get_timestamp(df.ts))

    convert_i94mode_udf = udf(convert_i94mode, StringType())
    convert_sas_date_udf = udf(convert_sas_date, DateType())
    convert_visa_udf = udf(convert_visa, StringType())
    # get_sas_day_udf = udf(get_sas_day, IntegerType())

    # immigration_data_paths = os.path.join(input_data, '18-83510-I94-Data-2016/i94_feb16_sub.sas7bdat')
    for immigration_data_paths in immigration_data_paths:
        # get filepath to song data file
        print("Processing fact_immigration spark dataframe")
        df_immigration_spark = spark.read\
            .format('com.github.saurfang.sas.spark')\
            .option("header", "true") \
            .option("inferSchema", "true")\
            .load(immigration_data_paths)

    #    .schema(schema['immigration_schema'])

        df_immigration_spark = df_immigration_spark \
            .withColumn('arrival_date', convert_sas_date_udf(df_immigration_spark['arrdate'])) \
            .withColumn('departure_date', convert_sas_date_udf(df_immigration_spark['depdate'])) \
            .withColumn('arrival_year', df_immigration_spark['i94yr'].cast(IntegerType())) \
            .withColumn('arrival_month', df_immigration_spark['i94mon'].cast(IntegerType())) \
            .withColumn('age', df_immigration_spark['i94bir'].cast(IntegerType())) \
            .withColumn('country_of_bir', df_immigration_spark['i94cit'].cast(IntegerType())) \
            .withColumn('country_of_res', df_immigration_spark['i94res'].cast(IntegerType())) \
            .withColumn('port_of_admission', df_immigration_spark['i94port'].cast(StringType())) \
            .withColumn('birth_year', df_immigration_spark['biryear'].cast(IntegerType())) \
            .withColumn('mode', convert_i94mode_udf(df_immigration_spark['i94mode'])) \
            .withColumn('visa_category', convert_visa_udf(df_immigration_spark['i94visa']))

    #    .withColumn('arrival_day', get_sas_day_udf(df_immigration_spark['arrdate'])) \

        dim_immigration = df_immigration_spark.select(
            col('cicid').alias('id'),
            col('arrdate').alias('arr_ts'),
            'arrival_year',
            'arrival_month',
            'country_of_bir',
            'country_of_res',
            'port_of_admission',
            'mode',
            'visa_category',
            'visatype',
            col('depdate').alias('dep_ts')) \
            .dropDuplicates()

        # immigration_out_path = os.path.join(output_data, 'fact_immigration/')
        immigration_out_path = output_data + 'fact_immigration/'

        dim_immigration.write.parquet(immigration_out_path, mode='append', partitionBy=('arrival_year',
                                                                                        'arrival_month'))

        df_time_spark = df_immigration_spark.select(col("arrdate").alias("time")).union(
            df_immigration_spark.select(col("depdate").alias("time"))).distinct().na.drop()

        # Create time dimension spark dateframe
        print("Processing time dimension spark dataframe")
        dim_time = df_time_spark.withColumn('date', convert_sas_date_udf(df_time_spark['time']))

        dim_time = dim_time\
            .select(
                col('time').alias('time_id')
                , col('date').alias('date')
                , year('date').alias('year')
                , quarter('date').alias('quarter')
                , month('date').alias('month')
                , dayofmonth('date').alias('day')
                , dayofweek('date').alias('weekday')
                , weekofyear('date').alias('week'))
        # time_out_path = os.path.join(output_data, 'dim_time/')
        time_out_path = output_data + 'dim_time/'
        dim_time.write.parquet(time_out_path, mode='append', partitionBy='year')

        # Create immigrant dimension spark dateframe
        print("Processing immigrant dimension spark dataframe")
        dim_immigrant = df_immigration_spark \
            .filter(col('birth_year') >= 1900) \
            .filter(col('birth_year') <= 2020) \
            .select(
                col('cicid').alias('immigrant_id'),
                'gender',
                'age',
                'birth_year') \
            .dropDuplicates()
        # immigrant_out_path = os.path.join(output_data, 'dim_immigrant/')
        immigrant_out_path = output_data + 'dim_immigrant/'
        dim_immigrant.write.parquet(immigrant_out_path, mode='append', partitionBy='birth_year')

    print('Finished processing us immigration data.')
    # return dim_immigrant, dim_time, dim_immigration

In [148]:
def process_demographics_data(spark, input_data, output_data):
    """
    Description: Process the demographics data files, transform it to demographics table
    :param spark: spark session
    :param input_data: input file path
    :param output_data: output file path
    """
    # dim_country, dim_port
    dim_port_dict = process_label_data(spark, output_data)
    print('Processing us cities demographics dataset...')
    # get filepath to demographics data file
    # demographics_data_path = os.path.join(input_data, 'us-cities-demographics.csv')
    demographics_data_path = input_data + 'us-cities-demographics.csv'
    df_demographics_spark = spark.read \
        .format('csv') \
        .option("header", "true") \
        .option("delimiter", ";")\
        .load(demographics_data_path)

    # Calculate percentages of each numeric column and create new columns.
    df_demographics_spark = df_demographics_spark \
        .withColumn("Median Age", col("Median Age").cast("float")) \
        .withColumn("pct_male_pop",
                    df_demographics_spark["Male Population"] / df_demographics_spark["Total Population"] * 100) \
        .withColumn("pct_female_pop",
                    df_demographics_spark["Female Population"] / df_demographics_spark["Total Population"] * 100) \
        .withColumn("pct_veterans",
                    df_demographics_spark["Number of Veterans"] / df_demographics_spark["Total Population"] * 100) \
        .withColumn("pct_foreign_born",
                    df_demographics_spark["Foreign-born"] / df_demographics_spark["Total Population"] * 100) \
        .withColumn("pct_race", df_demographics_spark["Count"] / df_demographics_spark["Total Population"] * 100) \
        .orderBy("State")

    # Select columns with new calculated percentages.
    df_demographics_spark_select = df_demographics_spark \
        .select(col("City").alias("city"),
                col("State").alias("state"),
                col("Median Age").alias("median_age"),
                "pct_male_pop",
                "pct_female_pop",
                "pct_veterans",
                "pct_foreign_born",
                "Race",
                "pct_race",
                "Total Population")

    # pivot the Race column
    df_demographics_spark_pivot = df_demographics_spark_select\
        .groupBy("city", "state", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
                "pct_foreign_born", "Total Population")\
        .pivot("Race")\
        .avg("pct_race")

    # change the header name of the race fields for spark compatibility and round the percentage.
    df_demographics_spark_pivot = df_demographics_spark_pivot \
        .select(upper(col("city")).alias("city"),
                upper(col("state")).alias("state"),
                round(col("median_age")).alias("median_age"),
                round(col("pct_male_pop"), 1).alias("perc_of_male_pop"),
                round(col("pct_female_pop"), 1).alias("perc_of_female_pop"),
                round(col("pct_veterans"), 1).alias("perc_of_veterans"),
                round(col("pct_foreign_born"), 1).alias("perc_of_foreign_born"),
                round(col("American Indian and Alaska Native"), 1).alias("perc_of_native_american"),
                round(col("Asian"), 1).alias("perc_of_asian"),
                round(col("Black or African-American"), 1).alias("perc_of_black"),
                round(col("Hispanic or Latino"), 1).alias("perc_of_latino"),
                round(col("White"), 1).alias("perc_of_white"),
                col("Total Population").alias("total_pop"))\
        .dropDuplicates(['city'])

    # dim_demographics = df_demographics_spark_pivot.withColumn("city_id", monotonically_increasing_id())
    dim_demographics = dim_port_dict\
        .join(df_demographics_spark_pivot,
              (dim_port_dict['city_name'] == df_demographics_spark_pivot['city']) & (
                          dim_port_dict['state_name'] == df_demographics_spark_pivot['state']), "inner") \
        .drop("city_name", "state_name")

    # demographics_out_path = os.path.join(output_data, 'dim_demographics/')
    demographics_out_path = output_data + 'dim_demographics/'
    dim_demographics.write.parquet(demographics_out_path, mode='overwrite', partitionBy='state')
    # dim_demographics.write.format("csv").save("data/demographics_table/demo.csv")
    print('Finished processing us cities demographics data.')
    # return dim_demographics
    # port_code/state_code/city/state/median_age/perc_of_male_pop/perc_of_femal......

In [149]:
def process_airport_data(spark, input_data, output_data):
    """
    Description: Process the airport data files, transform it to airport table
    :param spark: spark session
    :param input_data: input file path
    :param output_data: output file path
   """
    print('Processing airport codes dataset...')

    # get filepath to airport data file
    # airport_data_path = os.path.join(input_data, 'airport-codes_csv.csv')
    airport_data_path = input_data + 'airport-codes_csv.csv'
    df_airport_spark = spark.read\
        .format('csv')\
        .option("inferSchema", "true")\
        .option("header", "true")\
        .load(airport_data_path)\
        .drop("coordinates", "gps_code", "continent", "elevation_ft")

    dim_airport = df_airport_spark\
        .filter((col("type") == "small_airport") |
                (col("type") == "large_airport") |
                (col("type") == "medium_airport"))\
        .filter(df_airport_spark["iso_country"] == "US")\
        .filter(df_airport_spark['local_code'].isNotNull())\
        .withColumn("state", substring(df_airport_spark["iso_region"], 4, 2))
#         .filter(df_airport_spark['iata_code'].isNotNull()) \

    # airport_out_path = os.path.join(output_data, 'dim_airport/')
    airport_out_path = output_data + 'dim_airport/'
    dim_airport.write.parquet(airport_out_path)

    print('Finished processing airport codes data.')
    # return dim_airport

In [150]:
def process_temperature_data(spark, input_data, output_data):
    """
    Description: Process the temperatureData data files, transform it to temperature table
    :param spark: spark session
    :param input_data: input file path
    :param output_data: output file path
    """
    print('Processing cities temperature dataset...')
    # get filepath to temperatureData data file
    # temperature_data_path = os.path.join(input_data, 'GlobalLandTemperaturesByCity.csv')
    temperature_data_path = input_data + 'GlobalLandTemperaturesByCity.csv'
    df_temperature_spark = spark.read \
        .format('csv') \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(temperature_data_path)

    df_temperature_spark = df_temperature_spark \
        .filter(df_temperature_spark["country"] == "United States") \
        .filter(df_temperature_spark.AverageTemperature.isNotNull())\
        .withColumn("year", year(df_temperature_spark["dt"])) \
        .withColumn("month", month(df_temperature_spark["dt"]))

    dim_temperature = df_temperature_spark.select(
        "year",
        "month",
        "AverageTemperature",
        "City",
        "Country")\
        .dropDuplicates()

    # temperature_out_path = os.path.join(output_data, 'dim_temperature/')
    temperature_out_path = output_data + 'dim_temperature/'
    dim_temperature.write.parquet(temperature_out_path, mode='overwrite', partitionBy='year')

    print('Finished processing cities temperature data.')
    # return dim_temperature

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [7]:
input_data = 'data/'
output_data = '../data_folder/output/'

In [152]:
process_demographics_data(spark, input_data, output_data) # dim_demographics, dim_country

Processing i94 SAS dictionary...
mapper function
data/output1/i94cntyl.csv
mapper function
data/output1/i94addr.csv
mapper function
data/output1/i94port.csv
Finished processing i94 SAS dictionary
Processing us cities demographics dataset...
Finished processing us cities demographics data.


In [153]:
process_airport_data(spark, input_data, output_data) # dim_airport

Processing airport codes dataset...
Finished processing airport codes data.


In [154]:
process_immigration_data(spark, input_data, output_data)

Processing US immigration dataset...
Processing fact_immigration spark dataframe
Processing time dimension spark dataframe
Processing immigrant dimension spark dataframe
Finished processing us immigration data.


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness

In [8]:
# Run Quality Checks. Records for 2016-Jun
check_data_quality(spark, output_data)

Checking data quality...
Checking fact_immigration records...
Data quality check passed for fact_immigration with 3,287,405 records.
Performing key_not_null check on table fact_immigration...
Data quality check on fact_immigration passed. All key columns have no null value
Finished key_not_null check on table fact_immigration...
Checking dim_time records...
Data quality check passed for dim_time with 161 records.
Performing key_not_null check on table dim_time...
Data quality check on dim_time passed. All key columns have no null value
Finished key_not_null check on table dim_time...
Checking dim_immigrant records...
Data quality check passed for dim_immigrant with 3,574,350 records.
Checking dim_country records...
Data quality check passed for dim_country with 289 records.
Checking dim_demographics records...
Data quality check passed for dim_demographics with 112 records.
Performing key_not_null check on table dim_demographics...
Data quality check on dim_demographics passed. All key

0

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

<table>
<caption><strong>I94 Immigration Data dictionary</strong></caption>
<thead>
<th>Feature</th>
<th>Description</th>
</thead>
<tbody>
<tr><td>cicid</td><td>Unique record ID</td>
<tr><td>i94yr</td><td>4 digit year</td>
<tr><td>i94mon</td><td>Numeric month</td>
<tr><td>i94cit</td><td>3 digit code for immigrant country of birth</td>
<tr><td>i94res</td><td>3 digit code for immigrant country of residence </td>
<tr><td>i94port</td><td>Port of admission</td>
<tr><td>arrdate</td><td>Arrival Date in the USA</td>
<tr><td>i94mode</td><td>Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)</td>
<tr><td>i94addr</td><td>USA State of arrival</td>
<tr><td>depdate</td><td>Departure Date from the USA</td>
<tr><td>i94bir</td><td>Age of Respondent in Years</td>
<tr><td>i94visa</td><td>Visa codes collapsed into three categories</td>
<tr><td>count</td><td>Field used for summary statistics</td>
<tr><td>dtadfile</td><td>Character Date Field - Date added to I-94 Files</td>
<tr><td>visapost</td><td>Department of State where where Visa was issued </td>
<tr><td>occup</td><td>Occupation that will be performed in U.S</td>
<tr><td>entdepa</td><td>Arrival Flag - admitted or paroled into the U.S.</td>
<tr><td>entdepd</td><td>Departure Flag - Departed lost I-94 or is deceased</td>
<tr><td>entdepu</td><td>Update Flag - Either apprehended overstayed adjusted to perm residence</td>
<tr><td>matflag</td><td>Match flag - Match of arrival and departure records</td>
<tr><td>biryear</td><td>4 digit year of birth</td>
<tr><td>dtaddto</td><td>Character Date Field - Date to which admitted to U.S. (allowed to stay until)</td>
<tr><td>gender</td><td>Non-immigrant sex</td>
<tr><td>insnum</td><td>INS number</td>
<tr><td>airline</td><td>Airline used to arrive in U.S.</td>
<tr><td>admnum</td><td>Admission Number</td>
<tr><td>fltno</td><td>Flight number of Airline used to arrive in U.S.</td>
<tr><td>visatype</td><td>Class of admission legally admitting the non-immigrant to temporarily stay in U.S.</td>
</tbody>
</table>

<table>
<caption><strong>Fact_immigration Data Dictionary</strong></caption>
<thead>
<th>Feature</th>
<th>Description</th>
</thead>
<tbody>
<tr><td>id</td><td>Unique record ID (PK)(FK for dim_immigrant)</td>
<tr><td>arr_ts</td><td>Arrival date timestamp (FK for dim_time)</td>
<tr><td>arrival_year</td><td>4 digit year (used for partition)</td>
<tr><td>arrival_month</td><td>Numeric month (used for partition)</td>
<tr><td>country_of_bir</td><td>3 digit code for immigrant country of birth (FK for dim_country)</td>
<tr><td>country_of_res</td><td>3 digit code for immigrant country of residence (FK for dim_country) </td>
<tr><td>port_of_admission</td><td>I94 port of admission (FK for dim_demographics) </td>
<tr><td>mode</td><td>Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)</td>
<tr><td>dep_ts</td><td>Departure date timestamp (FK for dim_time)</td>
<tr><td>visa_category</td><td>Visa codes collapsed into three categories</td>
<tr><td>visatype</td><td>Class of admission legally admitting the non-immigrant to temporarily stay in U.S.</td>
</tbody>
</table>

<table>
<caption><strong>Dim_immigrant Data dictionary</strong></caption>
<thead>
<th>Feature</th>
<th>Description</th>
</thead>
<tbody>
<tr><td>immigrant_id</td><td>immigrant id (PK)</td>
<tr><td>gender</td><td>Non-immigrant sex</td>
<tr><td>age</td><td>Age of Respondent in Years</td>
<tr><td>bir_year</td><td>4 digit year of birth</td>
</tbody>
</table>

<table>
<caption><strong>Dim_country Data dictionary</strong></caption>
<thead>
<th>Feature</th>
<th>Description</th>
</thead>
<tbody>
<tr><td>country_code</td><td>3 digit code for country in the world (PK)</td>
<tr><td>country_name</td><td>Name of country</td>
</tbody>
</table>

<table>
<caption><strong>Dim_time Data dictionary</strong></caption>
<thead>
<th>Feature</th>
<th>Description</th>
</thead>
<tbody>
<tr><td>time_id</td><td>Timestamp as id (PK)</td>
<tr><td>date</td><td>Date type of the timestamp</td>
<tr><td>year</td><td>Year of the date</td>
<tr><td>quarter</td><td>Quarter of the year</td>
<tr><td>month</td><td>Month of the year</td>
<tr><td>day</td><td>Day of the month</td>
<tr><td>weekday</td><td>Day of the week</td>
<tr><td>week</td><td>Week of the year</td>
</tbody>
</table>

<table>
<caption><strong>Dim_time Data dictionary</strong></caption>
<thead>
<th>Feature</th>
<th>Description</th>
</thead>
<tbody>
<tr><td>port_code</td><td>City code as id (PK)</td>
<tr><td>state_code</td><td>Two-letter code of the state</td>
<tr><td>city</td><td>Year of the date</td>
<tr><td>state</td><td>Quarter of the year</td>
<tr><td>median_age</td><td>	Median age in the city (estimation)</td>
<tr><td>perc_of_male_pop</td><td>Percentage of male citizens</td>
<tr><td>perc_of_female_pop</td><td>Percentage of female citizens</td>
<tr><td>perc_of_veterans</td><td>Percentage of veteran citizens</td>
<tr><td>perc_of_foreign_born</td><td>Percentage of citizens born outside of US</td>
<tr><td>perc_of_native_american</td><td>Percentage of citizens belonging to this ethnic group</td>
<tr><td>perc_of_asian</td><td>Percentage of citizens belonging to this ethnic group</td>
<tr><td>perc_of_black</td><td>Percentage of citizens belonging to this ethnic group</td>
<tr><td>perc_of_latin</td><td>Percentage of citizens belonging to this ethnic group</td>
<tr><td>perc_of_white</td><td>Percentage of citizens belonging to this ethnic group</td>
<tr><td>total_pop</td><td>Total number of citizens in the given city</td>
</tbody>
</table>

<table>
<caption><strong>Dim_airport Data dictionary</strong></caption>
<thead>
<th>Feature</th>
<th>Description</th>
</thead>
<tbody>
<tr><td>local_code</td><td>Unique local code of the airport (PK)</td>
<tr><td>airport_name</td><td>Name of the Airport</td>
<tr><td>airport_type</td><td>Type of the airport</td>
<tr><td>state_code</td><td>Two-letter code of the state</td>
<tr><td>state</td><td>State where the airport is located</td>
<tr><td>iso_country</td><td>Country where the airport is located</td>
<tr><td>iata_code</td><td>Code of the airport assigned by International Air Transport Association (PK)</td>
</tbody>
</table>

#### Step 5: Complete Project Write Up
##### Clearly state the rationale for the choice of tools and technologies for the project.

This project aims to use technologies that allow to run it both locally and in the cloud. Spark allows us to run this project locally and can be easily promoted to an EMR cluster running on AWS. The parquet files used by Spark provide performance improvement over raw data formats and promise scalability up to many terabytes of data. The following tools are used in my project:
- **PySpark/Python/Pandas** - PySpark allows you to interact with the Apache Spark data processing framework by writing Python code. In paricular, PySpark allows you to express data as "DataFrames," which allows you to concentrate on data transformations and other tasks without managing how the dataset is distributed over nodes in the computing cluster.
- **Amazon EMR Cluster(Spark/Hadoop)** - The python script is submittted to Amazon's EMR Cluster (1 Master and 2 Core nodes), where it is configured with Spark 2.4.5 and Haddop 2.7 to process the spark job.
- **S3** - The raw data and cleaned data is stored on Amazon's S3 bucket. 
- **Jupyter Notebook** - Jupyter notebook is used to both explore the data and analyze the data. 

##### Propose how often the data should be updated and why.

Since we receive one file per month it seems reasonable to update the model monthly. However, we can also partition the immigration data daily so that one could update the dataset everyday and process the data daily as new data coming in.

##### Write a description of how you would approach the problem differently under the following scenarios:
 - The data was increased by 100x.

If the data were increased 100x, then we should be able to easily handle it using AWS EMR.  
The solution can be moved to AWS EMR cluster and the cluster could scale horizontally by adding new nodes. Or the data can be transferred to RedShift over Airflow. Both solutions can scale way above the 300 million rows.

 - The data populates a dashboard that must be updated on a daily basis by 7am every day.

The entire ETL process takes up to 1h. Processing new data every morning would not cause any problems. The data could be stored on S3, processed by EMR and deposited to Redshift. Entire process should not exceed 1 hour for the current size of data.
We could also create pipeline using Airflow and modify the scheduling to a specific time of day and introduce an SLA in Airflow to ensure jobs are completed in a timely manner and adjust accordingly.

 - The database needed to be accessed by 100+ people.

The data stored on S3 can be loaded to RedShift on AWS, which could then support a wide array of BI tools. Amazon Redshift as a data warehouse can be scaled as needed to support increased number of read requests per unit of time. 