# Project Title
### US Demographic and Immigration Data

#### Project Summary

The goal of this project is to construct a pipeline for building a data lake in S3 with Spark for immigration data in the US. Three data sources are used:

* I94 Immigration Data
* USA Airport Codes
* Country Code Data


# Imports and setup

In [2]:
import os
import configparser
import pandas
from pyspark.sql import SparkSession
from datetime import datetime

In [42]:
import pyspark.sql.functions as f
from pyspark.sql.functions import udf, monotonically_increasing_id
from pyspark.sql import types as t

In [86]:
config = configparser.ConfigParser()

config.read_file(open('config.cfg'))

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

bucket = config['S3']['BUCKET']
immigration_path = bucket + config['S3']['IMMIGRATION_DATA']
immigration_labels = bucket + config['S3']['IMMIGRATION_LABELS']
airport_path = bucket + config['S3']['AIRPORT_CODES_DATA']
city_path = bucket + config['S3']['CITY_DATA']
output_path = bucket + config['S3']['OUTPUT']
country_i94_path = bucket + config['S3']['COUNTRY_I94_DATA']
country_path = bucket + config['S3']['COUNTRY_DATA']

print(f'{immigration_path}, {immigration_labels}')
print(f'{airport_path}')
print(f'{city_path}')
print(f'{output_path}')
print(f'{country_i94_path}, {country_path}')

s3a://udacity-data/capstone/immigration/sas_data/, s3a://udacity-data/capstone/immigration/I94_SAS_LABELS_Descriptions.SAS
s3a://udacity-data/capstone/airport/airport-codes_csv.csv
s3a://udacity-data/capstone/demographic/us-cities-demographics.csv
s3a://udacity-data/capstone/output/
s3a://udacity-data/capstone/country_codes/country_codes_i94_clean.csv, s3a://udacity-data/capstone/country_codes/country_codes.csv


In [6]:
os.environ['PYSPARK_PYTHON']='/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON']='/usr/bin/python3'

In [9]:
spark = SparkSession.builder.config('spark.jars.packages', 
                                    'org.apache.hadoop:hadoop-aws:2.7.0').getOrCreate()
spark

In [10]:
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key",
                                     os.environ['AWS_ACCESS_KEY_ID'])
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", 
                                     os.environ['AWS_SECRET_ACCESS_KEY'])
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

#### Helper functions

In [52]:
def get_time():
    n = datetime.now()

    timestamp = f"""
    {n.year}-{n.month}-{n.day}_{n.hour}-{n.minute}-{n.second}-{n.microsecond}
    """.strip()
    
    return timestamp

# Load data from buckets

In [12]:
immigration_data = spark.read.parquet(immigration_path)

In [17]:
immigration_data.limit(2).show(truncate=False, vertical=True)

-RECORD 0------------------
 cicid    | 5748517.0      
 i94yr    | 2016.0         
 i94mon   | 4.0            
 i94cit   | 245.0          
 i94res   | 438.0          
 i94port  | LOS            
 arrdate  | 20574.0        
 i94mode  | 1.0            
 i94addr  | CA             
 depdate  | 20582.0        
 i94bir   | 40.0           
 i94visa  | 1.0            
 count    | 1.0            
 dtadfile | 20160430       
 visapost | SYD            
 occup    | null           
 entdepa  | G              
 entdepd  | O              
 entdepu  | null           
 matflag  | M              
 biryear  | 1976.0         
 dtaddto  | 10292016       
 gender   | F              
 insnum   | null           
 airline  | QF             
 admnum   | 9.495387003E10 
 fltno    | 00011          
 visatype | B1             
-RECORD 1------------------
 cicid    | 5748518.0      
 i94yr    | 2016.0         
 i94mon   | 4.0            
 i94cit   | 245.0          
 i94res   | 438.0          
 i94port  | LOS     

In [21]:
airport_data = spark.read.csv(airport_path, header=True)

In [22]:
airport_data.limit(2).show(truncate=False, vertical=True)

-RECORD 0------------------------------------------
 ident        | 00A                                
 type         | heliport                           
 name         | Total Rf Heliport                  
 elevation_ft | 11                                 
 continent    | NA                                 
 iso_country  | US                                 
 iso_region   | US-PA                              
 municipality | Bensalem                           
 gps_code     | 00A                                
 iata_code    | null                               
 local_code   | 00A                                
 coordinates  | -74.93360137939453, 40.07080078125 
-RECORD 1------------------------------------------
 ident        | 00AA                               
 type         | small_airport                      
 name         | Aero B Ranch Airport               
 elevation_ft | 3435                               
 continent    | NA                                 
 iso_country

In [33]:
country_i94_data = spark.read.csv(country_i94_path, header=True)

In [34]:
country_i94_data.limit(2).show(truncate=False, vertical=True)

-RECORD 0-----------------------------
 _c0                    | 0           
 i94cit_clean           | 582         
 i94_country_name_clean | MEXICO      
 iso_country_code_clean | 484         
-RECORD 1-----------------------------
 _c0                    | 1           
 i94cit_clean           | 236         
 i94_country_name_clean | AFGHANISTAN 
 iso_country_code_clean | 4           



In [68]:
country_data = spark.read.csv(country_path, header=True)

In [69]:
country_data.limit(2).show(truncate=False, vertical=True)

-RECORD 0-----------------------------------
 name                     | Afghanistan     
 alpha-2                  | AF              
 alpha-3                  | AFG             
 country-code             | 004             
 iso_3166-2               | ISO 3166-2:AF   
 region                   | Asia            
 sub-region               | Southern Asia   
 intermediate-region      | null            
 region-code              | 142             
 sub-region-code          | 034             
 intermediate-region-code | null            
-RECORD 1-----------------------------------
 name                     | Åland Islands   
 alpha-2                  | AX              
 alpha-3                  | ALA             
 country-code             | 248             
 iso_3166-2               | ISO 3166-2:AX   
 region                   | Europe          
 sub-region               | Northern Europe 
 intermediate-region      | null            
 region-code              | 150             
 sub-regio

In [87]:
city_data = spark.read.option('delimiter', ';').csv(city_path, header=True)

In [88]:
city_data.limit(2).show(truncate=False, vertical=True)

-RECORD 0------------------------------------
 City                   | Silver Spring      
 State                  | Maryland           
 Median Age             | 33.8               
 Male Population        | 40601              
 Female Population      | 41862              
 Total Population       | 82463              
 Number of Veterans     | 1562               
 Foreign-born           | 30908              
 Average Household Size | 2.6                
 State Code             | MD                 
 Race                   | Hispanic or Latino 
 Count                  | 25924              
-RECORD 1------------------------------------
 City                   | Quincy             
 State                  | Massachusetts      
 Median Age             | 41.0               
 Male Population        | 44129              
 Female Population      | 49500              
 Total Population       | 93629              
 Number of Veterans     | 4147               
 Foreign-born           | 32935   


# Define the Data Model
### Conceptual Data Model
##### Map out the conceptual data model and explain why you chose that model

The idea behind this pipeline is to combine different sources of data for the following purposes:

* Track immigration by airport in the USA
* Store details about each of those airports
* Store details about the countries that USA gets immigrants from
* Parse cities/states from airport data and store demographic data for those cities

As such, the model would have a fact table and multiple dimension tables, with the fact table being the immigration data itself. This data would hold records to:

* Uniquely identify each arrival
* Hold mappings to dimensions for airport and country
* Hold additional data about the entry like visa type

The airpot, city and country dimension tables would contain additional information, not limited to the state/city the airports are in.

For further reference, a schema diagram is included in the documentation.


##### Mapping Out Data Pipelines


1. Save initial datasets/sources to S3 buckets
2. Load datasets into memory with Spark
3. Create dimension tables for airports, countries, states
4. Create the fact table for immigration, hence creating a mapping between each immigration record and its airport and country
5. Perform data quality checks
6. Save data as .parquet to S3 buckets

# Building out the pipeline

## Build `countries` table

In [58]:
country_i94_data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- i94cit_clean: string (nullable = true)
 |-- i94_country_name_clean: string (nullable = true)
 |-- iso_country_code_clean: string (nullable = true)



In [59]:
country_data.printSchema()

root
 |-- name: string (nullable = true)
 |-- alpha-2: string (nullable = true)
 |-- alpha-3: string (nullable = true)
 |-- country-code: string (nullable = true)
 |-- iso_3166-2: string (nullable = true)
 |-- region: string (nullable = true)
 |-- sub-region: string (nullable = true)
 |-- intermediate-region: string (nullable = true)
 |-- region-code: string (nullable = true)
 |-- sub-region-code: string (nullable = true)
 |-- intermediate-region-code: string (nullable = true)



In [62]:
country_i94_data.createOrReplaceTempView('country_i94_data')
country_data.createOrReplaceTempView('country_data')

In [75]:
countries_table = spark.sql("""
    SELECT 
    i.i94cit_clean AS i94_country_id,
    i.iso_country_code_clean AS iso_country_id,
    c.name AS country_name,
    c.region AS region,
    c.`sub-region` AS sub_region,
    c.`alpha-2` AS iso_country_alpha
    FROM country_data AS c
    JOIN country_i94_data AS i ON
        i.iso_country_code_clean = c.`country-code`
""")

In [76]:
countries_table.limit(2).show()

+--------------+--------------+------------+--------+--------------------+-----------------+
|i94_country_id|iso_country_id|country_name|  region|          sub_region|iso_country_alpha|
+--------------+--------------+------------+--------+--------------------+-----------------+
|           529|           660|    Anguilla|Americas|Latin America and...|               AI|
|           532|           533|       Aruba|Americas|Latin America and...|               AW|
+--------------+--------------+------------+--------+--------------------+-----------------+



In [78]:
countries_table.createOrReplaceTempView('countries_table')

In [84]:
countries_table_path = f'{output_path}countries_table_{get_time()}'
print(f'Writing airports_table to {countries_table_path}..')
# countries_table.write.parquet(countries_table_path)

Writing airports_table to s3a://udacity-data/capstone/output/countries_table_2020-6-20_20-29-37-832176..


## Build `cities` table

In [95]:
@udf
def make_iso_region(state):
    return f'US-{state}'

In [94]:
city_data.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [96]:
city_data.createOrReplaceTempView('city_data')

In [99]:
cities_table = spark.sql("""
    SELECT
        City AS city,
        `State Code` AS state,
        `Median Age` AS median_age,
        `Male Population` AS population_m,
        `Female Population` AS population_f,
        `Total Population` AS population_total,
        `Race` AS race
    FROM city_data
""")

In [102]:
cities_table = cities_table.withColumn('iso_region', make_iso_region('state'))

In [None]:
cities_table_path = f'{output_path}cities_table_{get_time()}'
print(f'Writing airports_table to {cities_table_path}..')
# cities_table.write.parquet(cities_table_path)

## Build `airports` table

In [41]:
airport_data.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [44]:
airport_data.createOrReplaceTempView('airport_data')

In [81]:
airport_table = spark.sql("""
    SELECT
        a.name AS airport_name,
        c.iso_country_id AS iso_country_id,
        a.local_code AS local_code,
        a.Iata_code AS iata_code,
        a.coordinates AS coordinates
    FROM airport_data AS a
    JOIN countries_table AS c
    ON (c.iso_country_alpha = a.iso_country)
""")

In [82]:
airport_table = airport_table.withColumn('airport_id', monotonically_increasing_id())

In [83]:
airport_table.limit(2).show()

+--------------------+--------------+----------+---------+-------------------+----------+
|        airport_name|iso_country_id|local_code|iata_code|        coordinates|airport_id|
+--------------------+--------------+----------+---------+-------------------+----------+
|Clayton J Lloyd I...|           660|      null|      AXA|-63.055099, 18.2048|         0|
|Queen Beatrix Int...|           533|      null|      AUA|-70.015198, 12.5014|         1|
+--------------------+--------------+----------+---------+-------------------+----------+



In [56]:
airport_table_path = f'{output_path}airport_table_{get_time()}'
print(f'Writing airports_table to {airport_table_path}..')
# airport_table.write.parquet(airport_table_path)

Writing airports_table to s3a://udacity-data/capstone/output/airport_table_2020-6-20_19-57-49-239188..


# Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness


In [None]:
# Perform quality checks here

# Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

# Project write-up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**Credit to the following authors for additional datasets on countries and country codes:**

* https://github.com/lukes/

**Credit to the following authors for inspiration for the project and additional datasets:**

* https://github.com/jukkakansanaho