# Project Title
### Data Engineering Capstone Project

#### Project Summary

This project will combine data from various sources, clean, combine, and store them in a way that can be used for either analysis or source of truth database.  The project will make use of AWS S3 and EMR cluster for storage and computing.  The raw data will be stored on S3 where the EMR cluster will pull data and save a datalake and data that can be used for a data warehouse.  All the steps  and reasoning will be outlined in this notebook.  

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
import os
import configparser
import logging
import boto3
from botocore.exceptions import ClientError
from pyspark.sql.functions import split
import pyspark.sql
from pyspark.sql.functions import isnan, when, count, col, regexp_replace, lower,trim, to_date
from pyspark.sql.types import DoubleType,IntegerType


### Step 1: Scope the Project and Gather Data

#### Scope 

- In this project we will combine airport, immigration, city demographic, and temperature data to provide immigrants and officials a source of truth database or analysis. 

- Here we will work with a subset of the immigration data set to plan our project due to the size of the data.  The full data set will be used when running the ETL on the EMR cluster.

- In order to reduce hardware requirements to work with the large data set we will use spark rather than something like pandas which would require more resources.

- We want a solution that we can work with that is not confined to a network or personal computer so any raw data that is on the udacity workspace, or minor table that is created for category conversions, will be moved to AWS S3.

- There will be two end solutions, a data lake and a data warehouse.
    - Data lake: This will contain data based on each immigrant with their associated state demographic/temperature data and airport code data if any connection exists. 
    - Data warehouse: This will contain 3 different data sets.  State data containing demographic and temperature data for each state.  Airport code data containing some general information on each airport.  Immigraiton data will contain I-94 arrivals data collected by the NTTO(National Travel and Tourism Office)


#### Describe and Gather Data 

- Airport code data
    - This data contains information on airports in the United States mostly around location.
    - The data comes from: https://datahub.io/core/airport-codes#data
- Immigration data
    - This data comes from the US National Tourism and Trade Office, and 
    - The data comes from: https://travel.trade.gov/research/reports/i94/historical/2016.html
- Immigration supporting data:
    - All supporting data comes from the `i94_SAS_Labels_Descriptions.SAS` document included.
    - i94cit_res_codes
        - This contains codes for city and city of residence conversion: 3 digits to city
    - i94port_codes
        - This contains the port conversion codes: 3 letter to city/state
    - Visa data
        - This data contains conversions from numeric to categorical visa (business, pleasure, student).
    - Mode data
        - This data contains conversions from numeric to categorical mode of entry(land, sea, air).
    - i94addr
        - This contains state abbreviations to state names.        
            - All data comes from the `i94_SAS_Labels_Descriptions.SAS` document included.

- City demographic data
    - This data contails demographic information on cities in the United States.
    - This data comes from: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
- Temperature data
    - This data contains temperature data from around the world on each city as far back as 1750.
    - This is a kaggle data set and comes from: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data


In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [6]:
### This cell does not need to be run, All relevent files have been moved to S3
# Uncomment to run 

#config = configparser.ConfigParser()
#config.read_file(open('dl.cfg'))

# Set AWS key and secret key
#os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS','AWS_ACCESS_KEY_ID')
#os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS','AWS_SECRET_ACCESS_KEY')

#KEY = config.get('AWS','AWS_ACCESS_KEY_ID')
#SECRET = config.get('AWS','AWS_SECRET_ACCESS_KEY')

# upload files to s3
def upload_file(file_name, bucket, object_name=None):
    """Upload a file to an S3 bucket

    :param file_name: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """

    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = file_name

    # Upload the file
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(file_name, bucket, object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True
#upload_file(file_name='airport-codes_csv.csv' ,bucket='de-capstone',object_name='raw_data/airport-codes_csv.csv')
#upload_file(file_name='i94addr.csv' ,bucket='de-capstone',object_name='raw_data/i94addr.csv')
#upload_file(file_name='i94cit_res_codes.csv' ,bucket='de-capstone',object_name='raw_data/i94cit_res_codes.csv')
#upload_file(file_name='i94port_code.csv' ,bucket='de-capstone',object_name='raw_data/i94port_code.csv')
#upload_file(file_name='us-cities-demographics.csv' ,bucket='de-capstone',object_name='raw_data/us-cities-demographics.csv')
#upload_file(file_name='../../data2/GlobalLandTemperaturesByCity.csv' ,bucket='de-capstone',object_name='raw_data/temp_data.csv')

In [11]:
# This cell is a test column for reading and writing the immigration data

#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
- Identify data quality issues, like missing values, duplicate data, etc.
- Set correct data types for relevant fields
- Transform/string split necessary fields depending on use case.
- Condense tables to usable scale.

#### Data Cleaning



#### Airport Code data
- Airport code data will be used to create a table about airports in the US


In [7]:
# Read in the data here
df_apt_codes = spark.read.csv('airport-codes_csv.csv',header=True)

# pull just the airports in the US
df_apt_codes = df_apt_codes.filter(df_apt_codes.iso_country=='US')

# remove iata code == null
df_apt_codes = df_apt_codes.filter(df_apt_codes.iata_code.isNotNull())

#check data
df_apt_codes.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+
| 07FA|small_airport|Ocean Reef Club A...|           8|       NA|         US|     US-FL|    Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|
|  0AK|small_airport|Pilot Station Air...|         305|       NA|         US|     US-AK|Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|
| 0CO2|small_airport|Crested Butte Air...|        8980|       NA|         US|     US-CO|Crested Butte|    0CO2|      CSE|      0CO2|-106.928341, 38.8...|
| 0TE7|small_airport|   LBJ Ranch Airport|        1515|       NA|         US

In [8]:
# check a IATA code
df_apt_codes.where(df_apt_codes.iata_code == 'ESP').show()

+-----+-------------+--------------------+------------+---------+-----------+----------+----------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|    municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+----------------+--------+---------+----------+--------------------+
|  ESP|       closed|Birchwood-Pocono ...|         965|       NA|         US|     US-PA|East Stroudsburg|    null|      ESP|      null|   -75.2521, 41.0643|
| KN53|small_airport|Stroudsburg Pocon...|         480|       NA|         US|     US-PA|East Stroudsburg|    KN53|      ESP|       N53|-75.1605987549, 4...|
+-----+-------------+--------------------+------------+---------+-----------+----------+----------------+--------+---------+----------+--------------------+



In [9]:
# Get state code from iso_region
df_apt_codes = df_apt_codes.withColumn('State_Code',split(df_apt_codes.iso_region,'-').getItem(1))

In [10]:
df_apt_codes.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+----------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|State_Code|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+----------+
| 07FA|small_airport|Ocean Reef Club A...|           8|       NA|         US|     US-FL|    Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|        FL|
|  0AK|small_airport|Pilot Station Air...|         305|       NA|         US|     US-AK|Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|        AK|
| 0CO2|small_airport|Crested Butte Air...|        8980|       NA|         US|     US-CO|Crested Butte|    0CO2|      CSE|      0CO2|-106.928341, 38.8...|        CO|
| 0TE7|sma

In [11]:
# Check for nulls
df_apt_codes.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_apt_codes.columns]).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|State_Code|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+----------+
|    0|   0|   0|          34|        0|          0|         0|           6|      81|        0|        50|          0|         0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+----------+



#### Temperature Data

In [12]:
# Check data
df_temp = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv',header=True)
df_temp.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [13]:
# check amount of rows
df_temp.count()

8599212

In [14]:
# Check data types
df_temp.dtypes

[('dt', 'string'),
 ('AverageTemperature', 'string'),
 ('AverageTemperatureUncertainty', 'string'),
 ('City', 'string'),
 ('Country', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string')]

In [15]:
# Change AverageTemperature to Double
df_temp = df_temp.withColumn("AverageTemperature", df_temp["AverageTemperature"].cast("double"))

In [16]:
# check amount for United states
df_temp.filter(df_temp.Country == 'United States').count()

687289

In [17]:
# filter just United states
df_temp = df_temp.filter(df_temp.Country == 'United States')

In [18]:
# check for nulls
df_temp.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_temp.columns]).show()

+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|             25765|                        25765|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+



In [19]:
# drop nulls
df_temp = df_temp.where(col('AverageTemperature').isNotNull())

In [20]:
df_temp.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_temp.columns]).show()

+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|                 0|                            0|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+



In [21]:
# get average temp for each city
avg_temp = df_temp.cube(['City','Latitude','Longitude']).agg({'AverageTemperature':'avg'})
avg_temp = avg_temp.na.drop()
avg_temp = avg_temp.withColumnRenamed('avg(AverageTemperature)', 'AverageTemperature')
avg_temp.show()

+----------------+--------+---------+------------------+
|            City|Latitude|Longitude|AverageTemperature|
+----------------+--------+---------+------------------+
|      Saint Paul|  45.81N|   93.46W| 4.946022285896202|
|        Torrance|  34.56N|  118.70W|15.878038442083941|
|     Clarksville|  36.17N|   87.51W|14.281885540237276|
|          Rialto|  34.56N|  116.76W| 17.05575872534143|
|          Denver|  39.38N|  104.05W| 8.777836262323191|
|     Little Rock|  34.56N|   91.46W|16.382053970436562|
|      Providence|  42.59N|   72.00W| 7.341440525809558|
|       Arlington|  32.95N|   96.70W|18.062719999999995|
|       Arlington|  39.38N|   76.99W|11.918474511061234|
|         Madison|  42.59N|   89.45W| 7.999159821712816|
|          Fresno|  36.17N|  119.34W|15.817692463328274|
|Port Saint Lucie|  26.52N|   80.60W| 23.06892444289695|
|          Pueblo|  37.78N|  103.73W|10.730368330464712|
|      Chesapeake|  36.17N|   75.58W|15.678856043603744|
|      Des Moines|  40.99N|   9

- Because Immigration data is from 2016 and the latest temperature data is 2013 we will take the average temp of each city.

#### Demographic Data
We will match demographic data to an individuals `i94addr`. This is a state value so we will need to condense the demographic data down to the state level.  While doing this we will also introduce average temperature data.  We will match temperature on city, then collapse it down to the state level.

In [22]:
# Read Data
df_demo = spark.read.csv('us-cities-demographics.csv',sep = ';',header=True)

# Combine temp data
df_demo = df_demo.join(avg_temp,avg_temp.City==df_demo.City).select(df_demo["*"],avg_temp["AverageTemperature"])
df_demo.show(5)

+----------+---------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+------------------+
|      City|    State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|AverageTemperature|
+----------+---------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+------------------+
|Saint Paul|Minnesota|      31.5|         149547|           151293|          300840|             10548|       56514|                  2.58|        MN|American Indian a...|  6858| 4.946022285896202|
|Saint Paul|Minnesota|      31.5|         149547|           151293|          300840|             10548|       56514|                  2.58|        MN|Black or African-...| 54665| 4.946022285896202|
|Saint Pau

In [23]:
# Break data down to state level
state_df = df_demo.cube('State','State Code').agg({'Male Population':'sum',
                                     'Female Population':'sum',
                                     'Total Population':'sum',
                                     'Number of Veterans':'sum',
                                     'Foreign-born':'sum',
                                     'Average Household Size':'avg',
                                     'AverageTemperature':'avg'})
state_df = state_df.na.drop()
state_df = state_df.withColumnRenamed('avg(AverageTemperature)', 'AverageTemperature')
state_df = state_df.withColumnRenamed('avg(Average Household Size)', 'Average Household Size')
state_df = state_df.withColumnRenamed('sum(Total Population)', 'Total Population')
state_df = state_df.withColumnRenamed('sum(Female Population)', 'Female Population')
state_df = state_df.withColumnRenamed('sum(Male Population)', 'Male Population')
state_df = state_df.withColumnRenamed('sum(Number of Veterans)', 'Number of Veterans')
state_df = state_df.withColumnRenamed('sum(Foreign-born)', 'Foreign-born')



state_df.show(5)

+-----------+----------+------------------+----------------+-----------------+------------------+------------+---------------+----------------------+
|      State|State Code|AverageTemperature|Total Population|Female Population|Number of Veterans|Foreign-born|Male Population|Average Household Size|
+-----------+----------+------------------+----------------+-----------------+------------------+------------+---------------+----------------------+
|  Wisconsin|        WI| 8.394430860660089|       4771655.0|        2464515.0|          179205.0|    491120.0|      2307140.0|    2.3699999999999997|
|   Oklahoma|        OK|15.616847364286048|       5773255.0|        2942520.0|          360465.0|    663105.0|      2830735.0|    2.4733333333333336|
|Connecticut|        CT|10.896842198400124|       3640550.0|        1883690.0|           94910.0|    925555.0|      1756860.0|    2.6733333333333333|
|   Colorado|        CO| 8.425446029673148|      1.372057E7|        6923450.0|          914575.0|   

In [24]:
df_demo.select('Race').distinct().show()

+--------------------+
|                Race|
+--------------------+
|Black or African-...|
|  Hispanic or Latino|
|               White|
|               Asian|
|American Indian a...|
+--------------------+



In [25]:
# Sum up Races for each state
white_df = df_demo.where(df_demo.Race =='White').cube('State').agg({'Count':'sum'})
white_df = white_df.withColumnRenamed('sum(Count)', 'White')

asian_df = df_demo.where(df_demo.Race =='Asian').cube('State').agg({'Count':'sum'})
asian_df = asian_df.withColumnRenamed('sum(Count)', 'Asian')

black_df = df_demo.where(df_demo.Race =='Black or African-American').cube('State').agg({'Count':'sum'})
black_df = black_df.withColumnRenamed('sum(Count)', 'Black or African-American')

hispanic_df = df_demo.where(df_demo.Race =='Hispanic or Latino').cube('State').agg({'Count':'sum'})
hispanic_df = hispanic_df.withColumnRenamed('sum(Count)', 'Hispanic or Latino')

native_df = df_demo.where(df_demo.Race =='American Indian and Alaska Native').cube('State').agg({'Count':'sum'})
native_df = native_df.withColumnRenamed('sum(Count)', 'American Indian and Alaska Native')

white_df.collect()
asian_df.collect()
black_df.collect()
hispanic_df.collect()
native_df.collect()

[Row(State='Rhode Island', American Indian and Alaska Native=4171.0),
 Row(State='District of Columbia', American Indian and Alaska Native=6130.0),
 Row(State='Connecticut', American Indian and Alaska Native=9643.0),
 Row(State='Missouri', American Indian and Alaska Native=20469.0),
 Row(State='Utah', American Indian and Alaska Native=11466.0),
 Row(State='Arizona', American Indian and Alaska Native=120750.0),
 Row(State=None, American Indian and Alaska Native=1397283.0),
 Row(State='Tennessee', American Indian and Alaska Native=15899.0),
 Row(State='New Jersey', American Indian and Alaska Native=7755.0),
 Row(State='Maine', American Indian and Alaska Native=662.0),
 Row(State='Wisconsin', American Indian and Alaska Native=19656.0),
 Row(State='Oklahoma', American Indian and Alaska Native=94626.0),
 Row(State='Massachusetts', American Indian and Alaska Native=15532.0),
 Row(State='Illinois', American Indian and Alaska Native=36446.0),
 Row(State='Kansas', American Indian and Alaska Nat

In [26]:
# Combine race data
state_df = state_df.join(white_df,white_df.State==state_df.State).select(state_df["*"],white_df["White"])
state_df = state_df.join(asian_df,asian_df.State==state_df.State).select(state_df["*"],asian_df["Asian"])
state_df = state_df.join(black_df,black_df.State==state_df.State).select(state_df["*"],black_df["Black or African-American"])
state_df = state_df.join(hispanic_df,hispanic_df.State==state_df.State).select(state_df["*"],hispanic_df["Hispanic or Latino"])
state_df = state_df.join(native_df,native_df.State==state_df.State).select(state_df["*"],native_df["American Indian and Alaska Native"])

In [27]:
state_df.show()

+--------------------+----------+-------------------+----------------+-----------------+------------------+------------+---------------+----------------------+---------+--------+-------------------------+------------------+---------------------------------+
|               State|State Code| AverageTemperature|Total Population|Female Population|Number of Veterans|Foreign-born|Male Population|Average Household Size|    White|   Asian|Black or African-American|Hispanic or Latino|American Indian and Alaska Native|
+--------------------+----------+-------------------+----------------+-----------------+------------------+------------+---------------+----------------------+---------+--------+-------------------------+------------------+---------------------------------+
|                Utah|        UT|  9.898601558734654|       2780420.0|        1376780.0|           87315.0|    426475.0|      1403640.0|    3.2325000000000004| 447902.0| 30346.0|                  12828.0|          122622.0|   

#### i94cit_res_codes Data

In [28]:
# pull in data
i94cit_res_codes = spark.read.csv('i94cit_res_codes.csv')
i94cit_res_codes = i94cit_res_codes.withColumnRenamed('_c0', 'i94cit')
i94cit_res_codes = i94cit_res_codes.withColumnRenamed('_c1', 'i94_City')

#Copy df,rename columns, and join
res_codes = i94cit_res_codes
res_codes = res_codes.withColumnRenamed('i94cit','i94res')
res_codes = res_codes.withColumnRenamed('i94_City','i94_Resident')

#Join
i94cit_res_codes = i94cit_res_codes.join(res_codes,i94cit_res_codes.i94cit==res_codes.i94res)
#change data type
i94cit_res_codes = i94cit_res_codes.withColumn("i94cit", i94cit_res_codes["i94cit"].cast("double"))
i94cit_res_codes = i94cit_res_codes.withColumn("i94res", i94cit_res_codes["i94res"].cast("double"))


i94cit_res_codes.show()

+------+---------------+------+---------------+
|i94cit|       i94_City|i94res|   i94_Resident|
+------+---------------+------+---------------+
| 582.0|         MEXICO| 582.0|         MEXICO|
| 236.0|    AFGHANISTAN| 236.0|    AFGHANISTAN|
| 101.0|        ALBANIA| 101.0|        ALBANIA|
| 316.0|        ALGERIA| 316.0|        ALGERIA|
| 102.0|        ANDORRA| 102.0|        ANDORRA|
| 324.0|         ANGOLA| 324.0|         ANGOLA|
| 529.0|       ANGUILLA| 529.0|       ANGUILLA|
| 518.0|ANTIGUA-BARBUDA| 518.0|ANTIGUA-BARBUDA|
| 687.0|     ARGENTINA | 687.0|     ARGENTINA |
| 151.0|        ARMENIA| 151.0|        ARMENIA|
| 532.0|          ARUBA| 532.0|          ARUBA|
| 438.0|      AUSTRALIA| 438.0|      AUSTRALIA|
| 103.0|        AUSTRIA| 103.0|        AUSTRIA|
| 152.0|     AZERBAIJAN| 152.0|     AZERBAIJAN|
| 512.0|        BAHAMAS| 512.0|        BAHAMAS|
| 298.0|        BAHRAIN| 298.0|        BAHRAIN|
| 274.0|     BANGLADESH| 274.0|     BANGLADESH|
| 513.0|       BARBADOS| 513.0|       BA

In [29]:
# Check for nulls
i94cit_res_codes.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in i94cit_res_codes.columns]).show()

+------+--------+------+------------+
|i94cit|i94_City|i94res|i94_Resident|
+------+--------+------+------------+
|     0|       0|     0|           0|
+------+--------+------+------------+



#### i94port_code Data

In [34]:
# Read data
i94port_df = spark.read.csv('i94port_code.csv') 

# Drop empty column
i94port_df = i94port_df.drop('_c2')

# Remove spaces
i94port_df = i94port_df.select('_c0',trim(col('_c1')))

# Change column names
i94port_df = i94port_df.withColumnRenamed('_c0','i94port')
i94port_df = i94port_df.withColumnRenamed('trim(_c1)','location')

# get port city and state
i94port_df = i94port_df.withColumn('Port City',split(i94port_df.location,',').getItem(0))
i94port_df = i94port_df.withColumn('Port State',split(i94port_df.location,',').getItem(1))



i94port_df.show()

+-------+--------------------+--------------------+----------+
|i94port|            location|           Port City|Port State|
+-------+--------------------+--------------------+----------+
|    ALC|           ALCAN, AK|               ALCAN|        AK|
|    ANC|       ANCHORAGE, AK|           ANCHORAGE|        AK|
|    BAR|BAKER AAF - BAKER...|BAKER AAF - BAKER...|        AK|
|    DAC|   DALTONS CACHE, AK|       DALTONS CACHE|        AK|
|    PIZ|DEW STATION PT LA...|DEW STATION PT LA...|        AK|
|    DTH|    DUTCH HARBOR, AK|        DUTCH HARBOR|        AK|
|    EGL|           EAGLE, AK|               EAGLE|        AK|
|    FRB|       FAIRBANKS, AK|           FAIRBANKS|        AK|
|    HOM|           HOMER, AK|               HOMER|        AK|
|    HYD|           HYDER, AK|               HYDER|        AK|
|    JUN|          JUNEAU, AK|              JUNEAU|        AK|
|    5KE|       KETCHIKAN, AK|           KETCHIKAN|        AK|
|    KET|       KETCHIKAN, AK|           KETCHIKAN|    

In [35]:
# Check for nulls
i94port_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in i94port_df.columns]).show()

+-------+--------+---------+----------+
|i94port|location|Port City|Port State|
+-------+--------+---------+----------+
|      0|       1|        1|        76|
+-------+--------+---------+----------+



In [36]:
#Check what null states are
i94port_df.where(col('Port State').isNull()).show()

+-------+--------------------+--------------------+----------+
|i94port|            location|           Port City|Port State|
+-------+--------------------+--------------------+----------+
|    WAS|       WASHINGTON DC|       WASHINGTON DC|      null|
|    XXX|NOT REPORTED/UNKNOWN|NOT REPORTED/UNKNOWN|      null|
|    888|UNIDENTIFED AIR /...|UNIDENTIFED AIR /...|      null|
|    UNK|         UNKNOWN POE|         UNKNOWN POE|      null|
|    ZZZ|MEXICO Land (Banc...|MEXICO Land (Banc...|      null|
|    CHN|  No PORT Code (CHN)|  No PORT Code (CHN)|      null|
|    MAA|           Abu Dhabi|           Abu Dhabi|      null|
|    FRG|Collapsed (FOK) 0...|Collapsed (FOK) 0...|      null|
|    HRL|Collapsed (HLG) 0...|Collapsed (HLG) 0...|      null|
|    ISP|Collapsed (FOK) 0...|Collapsed (FOK) 0...|      null|
|    JSJ|Collapsed (SAJ) 0...|Collapsed (SAJ) 0...|      null|
|    BUS|Collapsed (BUF) 0...|Collapsed (BUF) 0...|      null|
|    IAG|Collapsed (NIA) 0...|Collapsed (NIA) 0...|    

#### i94addr Data

In [37]:
#read data
i94addr = spark.read.csv('i94addr.csv')

# Change column names
i94addr = i94addr.withColumnRenamed('_c0','State Code')
i94addr = i94addr.withColumnRenamed('_c1','State')

i94addr.show()

+----------+-----------------+
|State Code|            State|
+----------+-----------------+
|        AL|          ALABAMA|
|        AK|           ALASKA|
|        AZ|          ARIZONA|
|        AR|         ARKANSAS|
|        CA|       CALIFORNIA|
|        CO|         COLORADO|
|        CT|      CONNECTICUT|
|        DE|         DELAWARE|
|        DC|DIST. OF COLUMBIA|
|        FL|          FLORIDA|
|        GA|          GEORGIA|
|        GU|             GUAM|
|        HI|           HAWAII|
|        ID|            IDAHO|
|        IL|         ILLINOIS|
|        IN|          INDIANA|
|        IA|             IOWA|
|        KS|           KANSAS|
|        KY|         KENTUCKY|
|        LA|        LOUISIANA|
+----------+-----------------+
only showing top 20 rows



#### Visa data

In [38]:
# Create df to merge i94visa and write to csv
df_visa = pd.DataFrame({
    'i94visa': [1,2,3],
    'Visa': ['Business','Pleasure','Student']
})

# write to csv
df_visa.to_csv('visa_data.csv',header=True)
# move to s3
upload_file(file_name='visa_data.csv' ,bucket='de-capstone',object_name='raw_data/visa_data.csv')

True

In [39]:
# read in as spark df
df_visa = spark.read.csv('visa_data.csv',header=True)
# Drop empty column
df_visa = df_visa.drop('_c0')
# change data type
df_visa = df_visa.withColumn("i94visa", df_visa["i94visa"].cast("double"))

df_visa.show()

+-------+--------+
|i94visa|    Visa|
+-------+--------+
|    1.0|Business|
|    2.0|Pleasure|
|    3.0| Student|
+-------+--------+



#### Mode data

In [40]:
# Create df to merge i94mode and write to csv
df_mode = pd.DataFrame({
    'i94mode': [1,2,3,9],
    'Mode': ['Air','Sea','Land','Not reported']
})

#write to csv
df_mode.to_csv('mode_data.csv',header=True)
#move to s3
upload_file(file_name='mode_data.csv' ,bucket='de-capstone',object_name='raw_data/mode_data.csv')

True

In [41]:
# read in as spark df
df_mode = spark.read.csv('mode_data.csv',header=True)
# Drop empty column
df_mode = df_mode.drop('_c0')
# change data type
df_mode = df_mode.withColumn("i94mode", df_mode["i94mode"].cast("double"))


df_mode.show()

+-------+------------+
|i94mode|        Mode|
+-------+------------+
|    1.0|         Air|
|    2.0|         Sea|
|    3.0|        Land|
|    9.0|Not reported|
+-------+------------+



#### Immigration Data

In [53]:
# read data
df_imm = spark.read.csv('immigration_data_sample.csv', header=True)

# Change data types to Double
double_type = [
    '_c0','cicid','i94yr','i94mon','admnum'
]
for c in double_type:
    df_imm = df_imm.withColumn(c, df_imm[c].cast(DoubleType()))

# change data types to integer
int_type = [
    'i94yr','i94mon','i94bir','biryear'
]
for c in int_type:
    df_imm = df_imm.withColumn(c, df_imm[c].cast(IntegerType()))
    
#change data to date
date_type = [
    'dtadfile','dtaddto'
]
for c in date_type:
    df_imm = df_imm.withColumn(c, to_date(df_imm[c],'yyyyMMdd'))

df_imm.show()

+---------+---------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+---------------+-----+--------+
|      _c0|    cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+---------+---------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+---------------+-----+--------+
|2027561.0|4084316.0| 2016|     4| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|    61|    2.0|  1.0|2016-04-22|    null| null|      G|      O|   null|      M|   1955|   null|     F|  null|     JL|5.6582674633E10|00782|      WT|
|2171295.0|4422636.0| 2016|     4| 582.0| 582.0|    

In [54]:
df_imm.dtypes

[('_c0', 'double'),
 ('cicid', 'double'),
 ('i94yr', 'int'),
 ('i94mon', 'int'),
 ('i94cit', 'string'),
 ('i94res', 'string'),
 ('i94port', 'string'),
 ('arrdate', 'string'),
 ('i94mode', 'string'),
 ('i94addr', 'string'),
 ('depdate', 'string'),
 ('i94bir', 'int'),
 ('i94visa', 'string'),
 ('count', 'string'),
 ('dtadfile', 'date'),
 ('visapost', 'string'),
 ('occup', 'string'),
 ('entdepa', 'string'),
 ('entdepd', 'string'),
 ('entdepu', 'string'),
 ('matflag', 'string'),
 ('biryear', 'int'),
 ('dtaddto', 'date'),
 ('gender', 'string'),
 ('insnum', 'string'),
 ('airline', 'string'),
 ('admnum', 'double'),
 ('fltno', 'string'),
 ('visatype', 'string')]

#### Combining Data
Here we will combine the data sets for the data lake.  
- We will start with the immigration data to combine it on other sets.
- We will combine imigration IATA and airport data on IATA code.
- We will then combine the demographics/temp data on state.  

In [44]:
df_imm.show(5)

+---------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|      _c0|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+---------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|2027561.0|4084316.0|2016.0|   4.0| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|  61.0|    2.0|  1.0|20160422|    null| null|      G|      O|   null|      M| 1955.0|07202016|     F|  null|     JL|5.6582674633E10|00782|      WT|
|2171295.0|4422636.0|2016.0|   4.0| 582.0| 582.0|   

In [45]:
df_visa.show()

+-------+--------+
|i94visa|    Visa|
+-------+--------+
|    1.0|Business|
|    2.0|Pleasure|
|    3.0| Student|
+-------+--------+



In [16]:
# merge immigration and df_visa data
staging_df = df_imm.join(df_visa,df_visa.i94visa==df_imm.i94visa,how='left').select(df_imm["*"],df_visa["Visa"])

# merge immigration and df_mode data
staging_df = staging_df.join(df_mode,df_mode.i94mode==staging_df.i94mode,how='left').select(staging_df["*"],df_mode["Mode"])

# merge immigration and i94port_df data
staging_df = staging_df.join(i94port_df,i94port_df.i94port==staging_df.i94port,how='left').select(staging_df["*"],i94port_df["Port City"])
staging_df = staging_df.join(i94port_df,i94port_df.i94port==staging_df.i94port,how='left').select(staging_df["*"],i94port_df["Port State"])


# merge immigration and i94cit_res_codes data
staging_df = staging_df.join(i94cit_res_codes,i94cit_res_codes.i94cit==staging_df.i94cit,how='left').select(staging_df["*"],i94cit_res_codes["i94_City"])
staging_df = staging_df.join(i94cit_res_codes,i94cit_res_codes.i94res==staging_df.i94res,how='left').select(staging_df["*"],i94cit_res_codes["i94_Resident"])

# merge immigration and i94addr data
staging_df = staging_df.join(i94addr,i94addr["State Code"]==staging_df["i94addr"],how='left').select(staging_df["*"],i94addr["State"])




In [17]:
# merge immigration and mode data
staging_df.show()

+---------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------+----+---------------+----------+-----------+--------------+-------------+
|      _c0|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|    Visa|Mode|      Port City|Port State|   i94_City|  i94_Resident|        State|
+---------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------+----+---------------+----------+-----------+--------------+-------------+
|2027561.0|4084316.0|2016.0|   4.0| 209.0| 

In [18]:
# Check for nulls
staging_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in staging_df.columns]).show()

+---+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+----+----+---------+----------+--------+------------+-----+
|_c0|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|Visa|Mode|Port City|Port State|i94_City|i94_Resident|State|
+---+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+----+----+---------+----------+--------+------------+-----+
|  0|    0|    0|     0|     0|     0|      0|      0|      0|     59|     49|     0|      0|    0|       0|     618|  996|      0|     46|   1000|     46|      0|      0|   1

In [19]:
# Combine staging, demo, and airport code data
final_df = staging_df.join(state_df,lower(state_df["State"])==lower(staging_df["State"]),how='left').drop(state_df["State Code"]).drop(state_df["State"])
final_df = final_df.join(df_apt_codes,df_apt_codes["iata_code"]==final_df["i94port"],how='left')

In [20]:
final_df.show(5)

+---------+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------+----+-------------+----------+--------+------------+-------------+------------------+----------------+-----------------+------------------+------------+---------------+----------------------+--------+-------+-------------------------+------------------+---------------------------------+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+----------+
|      _c0|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|    Visa|Mode|    Port City|Port State|i94_City|i94_Resident|        

In [23]:
# Check for nulls
final_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in final_df.columns]).show()

+---+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+----+----+---------+----------+--------+------------+-----+------------------+----------------+-----------------+------------------+------------+---------------+----------------------+-----+-----+-------------------------+------------------+---------------------------------+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+----------+
|_c0|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|Visa|Mode|Port City|Port State|i94_City|i94_Resident|State|AverageTemperature|Total Population|Female Population|Number of Veterans|Foreign-born|Male Population|Avera

In [22]:
final_df.count()

1000

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

![](Immigration_star_schema.png)

The Purpose of the data sets are to provide information about to to the individual immigrants.  The only field that tells us about where the individual will be in the United States is the `i94add` field.  This field tells us which U.S. State they are in.  We will use this information to guide how we plan our tables.
- Immigration and demographic data will we joined on `State`.
- Immigration and airport codes will be joined on `fltno`.
- Every individual does not have a matching `fltno` because of land and sea entries.
- Every individual does not have a documented `i94addr`, which means they won't have a matching `State`.


Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines

##### EMR cluster
- Set up an AWS EMR cluster
    - EMR cluster used: emr-5.20.0
    - Cluster needs to have spark and yarn
- Move `de-capstone-etl.py` to the EMR cluster
- Run `de-capstone-etl.py`
    - /usr/bin/spark-submit --master yarn de-capstone-etl.py


![](de-pipeline-model.png)




### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [1]:
# Do all imports and installs here
from pyspark.sql import SparkSession
import numpy as np
import os
import configparser
import logging
from pyspark.sql.functions import split
import pyspark.sql
from pyspark.sql.functions import isnan, when, count, col, regexp_replace, lower, trim, to_date
from pyspark.sql.types import DoubleType,IntegerType

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
########  Airport data
# Read in the data here
df_apt_codes = spark.read.csv('airport-codes_csv.csv',header=True)

# pull just the airports in the US
df_apt_codes = df_apt_codes.filter(df_apt_codes.iso_country=='US')

# remove iata code == null
df_apt_codes = df_apt_codes.filter(df_apt_codes.iata_code.isNotNull())

# Get state code from iso_region
df_apt_codes = df_apt_codes.withColumn('State_Code',split(df_apt_codes.iso_region,'-').getItem(1))

############### Temp data
# Check data
df_temp = spark.read.csv('../../data2/GlobalLandTemperaturesByCity.csv',header=True)
# Change AverageTemperature to Double
df_temp = df_temp.withColumn("AverageTemperature", df_temp["AverageTemperature"].cast("double"))
# filter just United states
df_temp = df_temp.filter(df_temp.Country == 'United States')
# drop nulls
df_temp = df_temp.where(col('AverageTemperature').isNotNull())

# get average temp for each city
avg_temp = df_temp.cube(['City','Latitude','Longitude']).agg({'AverageTemperature':'avg'})
avg_temp = avg_temp.na.drop()
avg_temp = avg_temp.withColumnRenamed('avg(AverageTemperature)', 'AverageTemperature')

################ Demographic data
# Read Data
df_demo = spark.read.csv('us-cities-demographics.csv',sep = ';',header=True)

# Combine temp data
df_demo = df_demo.join(avg_temp,avg_temp.City==df_demo.City).select(df_demo["*"],avg_temp["AverageTemperature"])

# Break data down to state level
state_df = df_demo.cube('State','State Code').agg({'Male Population':'sum',
                                     'Female Population':'sum',
                                     'Total Population':'sum',
                                     'Number of Veterans':'sum',
                                     'Foreign-born':'sum',
                                     'Average Household Size':'avg',
                                     'AverageTemperature':'avg'})
state_df = state_df.na.drop()
state_df = state_df.withColumnRenamed('avg(AverageTemperature)', 'AverageTemperature')
state_df = state_df.withColumnRenamed('avg(Average Household Size)', 'Average Household Size')
state_df = state_df.withColumnRenamed('sum(Total Population)', 'Total Population')
state_df = state_df.withColumnRenamed('sum(Female Population)', 'Female Population')
state_df = state_df.withColumnRenamed('sum(Male Population)', 'Male Population')
state_df = state_df.withColumnRenamed('sum(Number of Veterans)', 'Number of Veterans')
state_df = state_df.withColumnRenamed('sum(Foreign-born)', 'Foreign-born')

# Sum up Races for each state
white_df = df_demo.where(df_demo.Race =='White').cube('State').agg({'Count':'sum'})
white_df = white_df.withColumnRenamed('sum(Count)', 'White')

asian_df = df_demo.where(df_demo.Race =='Asian').cube('State').agg({'Count':'sum'})
asian_df = asian_df.withColumnRenamed('sum(Count)', 'Asian')

black_df = df_demo.where(df_demo.Race =='Black or African-American').cube('State').agg({'Count':'sum'})
black_df = black_df.withColumnRenamed('sum(Count)', 'Black or African-American')

hispanic_df = df_demo.where(df_demo.Race =='Hispanic or Latino').cube('State').agg({'Count':'sum'})
hispanic_df = hispanic_df.withColumnRenamed('sum(Count)', 'Hispanic or Latino')

native_df = df_demo.where(df_demo.Race =='American Indian and Alaska Native').cube('State').agg({'Count':'sum'})
native_df = native_df.withColumnRenamed('sum(Count)', 'American Indian and Alaska Native')

# Combine race data
state_df = state_df.join(white_df,white_df.State==state_df.State).select(state_df["*"],white_df["White"])
state_df = state_df.join(asian_df,asian_df.State==state_df.State).select(state_df["*"],asian_df["Asian"])
state_df = state_df.join(black_df,black_df.State==state_df.State).select(state_df["*"],black_df["Black or African-American"])
state_df = state_df.join(hispanic_df,hispanic_df.State==state_df.State).select(state_df["*"],hispanic_df["Hispanic or Latino"])
state_df = state_df.join(native_df,native_df.State==state_df.State).select(state_df["*"],native_df["American Indian and Alaska Native"])

#########i94cit_res_codes
# pull in data
i94cit_res_codes = spark.read.csv('i94cit_res_codes.csv')
i94cit_res_codes = i94cit_res_codes.withColumnRenamed('_c0', 'i94cit')
i94cit_res_codes = i94cit_res_codes.withColumnRenamed('_c1', 'i94_City')

#Copy df,rename columns, and join
res_codes = i94cit_res_codes
res_codes = res_codes.withColumnRenamed('i94cit','i94res')
res_codes = res_codes.withColumnRenamed('i94_City','i94_Resident')

#Join
i94cit_res_codes = i94cit_res_codes.join(res_codes,i94cit_res_codes.i94cit==res_codes.i94res)
#change data type
i94cit_res_codes = i94cit_res_codes.withColumn("i94cit", i94cit_res_codes["i94cit"].cast("double"))
i94cit_res_codes = i94cit_res_codes.withColumn("i94res", i94cit_res_codes["i94res"].cast("double"))

################ i94port_code
# Read data
i94port_df = spark.read.csv('i94port_code.csv') 

# Drop empty column
i94port_df = i94port_df.drop('_c2')

# Remove spaces
i94port_df = i94port_df.select('_c0',trim(col('_c1')))

# Change column names
i94port_df = i94port_df.withColumnRenamed('_c0','i94port')
i94port_df = i94port_df.withColumnRenamed('trim(_c1)','location')

# get port city and state
i94port_df = i94port_df.withColumn('Port City',split(i94port_df.location,',').getItem(0))
i94port_df = i94port_df.withColumn('Port State',split(i94port_df.location,',').getItem(1))

############## i94addr data
#read data
i94addr = spark.read.csv('i94addr.csv')

# Change column names
i94addr = i94addr.withColumnRenamed('_c0','State Code')
i94addr = i94addr.withColumnRenamed('_c1','State')

############  Visa
# read in as spark df
df_visa = spark.read.csv('visa_data.csv',header=True)
# Drop empty column
df_visa = df_visa.drop('_c0')
# change data type
df_visa = df_visa.withColumn("i94visa", df_visa["i94visa"].cast("double"))
############ Mode
# read in as spark df
df_mode = spark.read.csv('mode_data.csv',header=True)
# Drop empty column
df_mode = df_mode.drop('_c0')
# change data type
df_mode = df_mode.withColumn("i94mode", df_mode["i94mode"].cast("double"))

############# Immigration data
# read data
df_imm = spark.read.csv('immigration_data_sample.csv', header=True)

# Change data types to Double
double_type = [
    '_c0','cicid','i94yr','i94mon','admnum'
]
for c in double_type:
    df_imm = df_imm.withColumn(c, df_imm[c].cast(DoubleType()))

# change data types to integer
int_type = [
    'i94yr','i94mon','i94bir','biryear'
]
for c in int_type:
    df_imm = df_imm.withColumn(c, df_imm[c].cast(IntegerType()))
    
#change data to date
date_type = [
    'dtadfile','dtaddto'
]
for c in date_type:
    df_imm = df_imm.withColumn(c, to_date(df_imm[c],'yyyyMMdd'))

########### Combining data
# merge immigration and df_visa data
staging_df = df_imm.join(df_visa,df_visa.i94visa==df_imm.i94visa,how='left').select(df_imm["*"],df_visa["Visa"])

# merge immigration and df_mode data
staging_df = staging_df.join(df_mode,df_mode.i94mode==staging_df.i94mode,how='left').select(staging_df["*"],df_mode["Mode"])

# merge immigration and i94port_df data
staging_df = staging_df.join(i94port_df,i94port_df.i94port==staging_df.i94port,how='left').select(staging_df["*"],i94port_df["Port City"])
staging_df = staging_df.join(i94port_df,i94port_df.i94port==staging_df.i94port,how='left').select(staging_df["*"],i94port_df["Port State"])


# merge immigration and i94cit_res_codes data
staging_df = staging_df.join(i94cit_res_codes,i94cit_res_codes.i94cit==staging_df.i94cit,how='left').select(staging_df["*"],i94cit_res_codes["i94_City"])
staging_df = staging_df.join(i94cit_res_codes,i94cit_res_codes.i94res==staging_df.i94res,how='left').select(staging_df["*"],i94cit_res_codes["i94_Resident"])

# merge immigration and i94addr data
staging_df = staging_df.join(i94addr,i94addr["State Code"]==staging_df["i94addr"],how='left').select(staging_df["*"],i94addr["State"])


# Combine staging, demo, and airport code data
final_df = staging_df.join(state_df,lower(state_df["State"])==lower(staging_df["State"]),how='left').drop(state_df["State Code"]).drop(state_df["State"])
final_df = final_df.join(df_apt_codes,df_apt_codes["iata_code"]==final_df["i94port"],how='left')




In [4]:
df_imm.show(5)

+---------+---------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+---------------+-----+--------+
|      _c0|    cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+---------+---------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+---------------+-----+--------+
|2027561.0|4084316.0| 2016|     4| 209.0| 209.0|    HHW|20566.0|    1.0|     HI|20573.0|    61|    2.0|  1.0|2016-04-22|    null| null|      G|      O|   null|      M|   1955|   null|     F|  null|     JL|5.6582674633E10|00782|      WT|
|2171295.0|4422636.0| 2016|     4| 582.0| 582.0|    

In [5]:
state_df.show(5)

+---------+----------+------------------+----------------+-----------------+------------------+------------+---------------+----------------------+---------+--------+-------------------------+------------------+---------------------------------+
|    State|State Code|AverageTemperature|Total Population|Female Population|Number of Veterans|Foreign-born|Male Population|Average Household Size|    White|   Asian|Black or African-American|Hispanic or Latino|American Indian and Alaska Native|
+---------+----------+------------------+----------------+-----------------+------------------+------------+---------------+----------------------+---------+--------+-------------------------+------------------+---------------------------------+
|     Utah|        UT| 9.898601558734654|       2780420.0|        1376780.0|           87315.0|    426475.0|      1403640.0|    3.2325000000000004| 447902.0| 30346.0|                  12828.0|          122622.0|                          11466.0|
|Minnesota|     

In [6]:
df_apt_codes.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+----------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region| municipality|gps_code|iata_code|local_code|         coordinates|State_Code|
+-----+-------------+--------------------+------------+---------+-----------+----------+-------------+--------+---------+----------+--------------------+----------+
| 07FA|small_airport|Ocean Reef Club A...|           8|       NA|         US|     US-FL|    Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|        FL|
|  0AK|small_airport|Pilot Station Air...|         305|       NA|         US|     US-AK|Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|        AK|
| 0CO2|small_airport|Crested Butte Air...|        8980|       NA|         US|     US-CO|Crested Butte|    0CO2|      CSE|      0CO2|-106.928341, 38.8...|        CO|
| 0TE7|sma

In [7]:
final_df.show(5)

+---------+---------+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+---------------+-----+--------+--------+----+-------------+----------+--------+------------+-------------+------------------+----------------+-----------------+------------------+------------+---------------+----------------------+--------+-------+-------------------------+------------------+---------------------------------+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+----------+
|      _c0|    cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|         admnum|fltno|visatype|    Visa|Mode|    Port City|Port State|i94_City|i94_Resident|        

In [9]:
# Check for nulls
final_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in final_df.columns if c not in {'dtadfile','dtaddto'}]).show()

+---+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-----+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+----+----+---------+----------+--------+------------+-----+------------------+----------------+-----------------+------------------+------------+---------------+----------------------+-----+-----+-------------------------+------------------+---------------------------------+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+----------+
|_c0|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|gender|insnum|airline|admnum|fltno|visatype|Visa|Mode|Port City|Port State|i94_City|i94_Resident|State|AverageTemperature|Total Population|Female Population|Number of Veterans|Foreign-born|Male Population|Average Household Size|White|Asian|Blac

#### 4.2 Data Quality Checks

Data quality checks will include the following:

- Check primary key for nulls.
    - We want to make sure we are able to match the data on primary keys.
    - `state_df.State`
    - `staging_df.admnum`
    - `df_apt_codes.ident`
- Check number of entries in staging_df and final_df are greater than 1 million.
    - We want to make sure we have enough data to pass the 1 million records mark.

Not all immigrants have a US address or a US i94port so not everyone will have state data or US airport info attached to the final data frame.  Data quality checks will be performed on both the final data frame as well as the three main data frames prior to joining them.  

#### Run Quality Checks

In [10]:
# Perform quality checks here

assert state_df.where(col('State').isNull()).count()==0,'Null values in state_df.State'
assert df_imm.where(col('admnum').isNull()).count()==0,'Null values in df_imm.admnum'
assert df_apt_codes.where(col('ident').isNull()).count()==0,'Null values in df_apt_codes.ident'
assert final_df.count()>1000000,'final_df has less than 1 million entries'
assert df_imm.count()>1000000,'df_imm has less than 1 million entries'

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [3]:
# Airport dictionary
apt_dict = {
    'ident':'Identity code of airport',
    'type':'Type of airport(size/type)',
    'name':'Name of airport',
    'elevation_ft':'elevation of airport in feet',
    'continent':'Continent of airport',
    'iso_country':'Country of airport',
    'iso_region':'Region, country and state',
    'municipality':'Municipality/location',
    'gps_code':'GPS code',
    'iata_code':'3 Letter abbreviation of airport',
    'local_code':'Local code',
    'coordinates':'Longitude and latitude location',
    'State_Code':'State abbreviation'
}
apt_dict

{'ident': 'Identity code of airport',
 'type': 'Type of airport(size/type)',
 'name': 'Name of airport',
 'elevation_ft': 'elevation of airport in feet',
 'continent': 'Continent of airport',
 'iso_country': 'Country of airport',
 'iso_region': 'Region, country and state',
 'municipality': 'Municipality/location',
 'gps_code': 'GPS code',
 'iata_code': '3 Letter abbreviation of airport',
 'local_code': 'Local code',
 'coordinates': 'Longitude and latitude location',
 'State_Code': 'State abbreviation'}

In [5]:
# Immigation dictionary
imm_dict = {
    '_c0':'Indenifier',
    'cicid':'ID of some sort',
    'i94yr':'Year of immigration application',
    'i94mon':'Month of immigration application',
    'i94cit':'City of application code',
    'i94res':'City of residence code',
    'i94port':'Port of entry',
    'arrdate':'Arrival date',
    'i94mode':'Mode of entry code',
    'i94addr':'US address state of expected residence',
    'depdate':'Departure date',
    'i94bir':'Age',
    'i94visa':'Visa type code',
    'count':'Number of people on application',
    'dtadfile':'Date application recieved',
    'visapost':'Visa post of application',
    'occup':'Occupation',
    'entdepa':'Admitted date ',
    'entdepd':'Departed date',
    'entdepu':'Either apprehended, overstayed, adjusted to perm residence',
    'matflag':'Match of arrival and departure records',
    'biryear':'4 digit birth year',
    'dtaddto':'Date to which admitted to U.S.',
    'gender':'Gender',
    'insnum':'INS number',
    'airline':'Airline used to arrive in U.S.',
    'admnum':'Admission Number',
    'fltno':'Flight number of Airline used to arrive in U.S.',
    'visatype':'Class of admission legally admitting the non-immigrant to temporarily stay in U.S.',
    'Visa':'Type of visa',
    'Mode':'Mode of entry',
    'Port City':'Port city',
    'Port State':'Port state',
    'i94_City':'City of application',
    'i94_Resident':'City of residence',
    'State':'US address state of expected residence'
}
imm_dict

{'_c0': 'Indenifier',
 'cicid': 'ID of some sort',
 'i94yr': 'Year of immigration application',
 'i94mon': 'Month of immigration application',
 'i94cit': 'City of application code',
 'i94res': 'City of residence code',
 'i94port': 'Port of entry',
 'arrdate': 'Arrival date',
 'i94mode': 'Mode of entry code',
 'i94addr': 'US address state of expected residence',
 'depdate': 'Departure date',
 'i94bir': 'Age',
 'i94visa': 'Visa type code',
 'count': 'Number of people on application',
 'dtadfile': 'Date application recieved',
 'visapost': 'Visa post of application',
 'occup': 'Occupation',
 'entdepa': 'Admitted date ',
 'entdepd': 'Departed date',
 'entdepu': 'Either apprehended, overstayed, adjusted to perm residence',
 'matflag': 'Match of arrival and departure records',
 'biryear': '4 digit birth year',
 'dtaddto': 'Date to which admitted to U.S.',
 'gender': 'Gender',
 'insnum': 'INS number',
 'airline': 'Airline used to arrive in U.S.',
 'admnum': 'Admission Number',
 'fltno': 'Fligh

In [6]:
# State dictionary
state_dict = {
    'State':'State',
    'State Code':'State abbreviation',
    'AverageTemperature':'Average temperature of state',
    'Total Population':'Total population',
    'Female Population':'Female population',
    'Number of Veterans':'Number of veterans',
    'Foreign-born':'Number of foreign born',
    'Male Population':'Male population',
    'Average Household Size':'Average household size',
    'White':'White population',
    'Asian':'Asian population',
    'Black or African-American':'Black population',
    'Hispanic or Latino':'Hispanic population',
    'American Indian and Alaska Native':'Indian and alaskan population'
}
state_dict

{'State': 'State',
 'State Code': 'State abbreviation',
 'AverageTemperature': 'Average temperature of state',
 'Total Population': 'Total population',
 'Female Population': 'Female population',
 'Number of Veterans': 'Number of veterans',
 'Foreign-born': 'Number of foreign born',
 'Male Population': 'Male population',
 'Average Household Size': 'Average household size',
 'White': 'White population',
 'Asian': 'Asian population',
 'Black or African-American': 'Black population',
 'Hispanic or Latino': 'Hispanic population',
 'American Indian and Alaska Native': 'Indian and alaskan population'}

#### Step 5: Complete Project Write Up

Rational for tools and technologies:
- We used an EMR cluster and spark to be able to handle the data.  Immigration data had over 3 million records.
- Spark requires less resources, as compared to pandas, and allows us to deal with the amount of data.

Updating methods:
- The immigration data in this project is from April 2016.  The data is then partitioned by State when being written to S3.
- If we need to update the data with new data for different months/years we would partition the data by year and month as well.
- If we need to update the month with new data we would need to read the data in and overwrite the data after is has been cleaned.

Potential problems in different scenarios:
- The data was increased by 100x.
    - We could easily add additional nodes to the EMR cluster and use optimized nodes for RAM.
- The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - We could add airflow integration to automate and schedule the program.
    - We could also use other AWS data pipeline solutions ex: Lambda functions.
- The database needed to be accessed by 100+ people.
    - The current s3 solution can be accessed by 100+ people.  The data stored for a data warehouse can be moved to a AWS Redshift cluster for people to query if needed.

* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.