# US Immigration Summary Datasets

### Data Engineering Capstone Project

#### Project Summary
This Project gathers data from multiple source to create a data model which can show the details of where people immigrated to the US in 2016 and the details of those areas

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import first
from pyspark.sql.functions import upper, col, isnan, when, count, col, round
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType
from pyspark.sql.functions import udf, date_format, split, monotonically_increasing_id
import datetime as dt

In [2]:
def check_df(df):
    print('NaNs:\n')
    df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
    print('nulls:\n')
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
    print( 'duplicates: ',df.count()-df.distinct().count())
    print(df.count())  

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config['default']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['default']['AWS_SECRET_ACCESS_KEY']
output_data = "S3_BUCKET_HERE"

In [4]:
# Read in the data here
spark = SparkSession.builder\
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.5")\
.config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID']) \
.config("spark.hadoop.fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY']) \
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

Datasets: 
* __us-cities-demographics__- Breakdown of the demographics of cities within the USA
* __GlobalLandTemperaturesByCity__- Worldwide temperature dataset 
* __iata-codes.csv__- Iata code reference csv(source https://www.airportcodes.us/us-airports.htm)
* __airport-codes_csv.csv__-Airport Dataset, containing airport and coordinate details
* __18-83510-I94-Data-2016__- Immigration data for 2016 (Only April data is used in the project to show MVP without breaking AWS tier limits)

ETL Process:

The Datasets will be processed by Spark on an EMR instance, this will import the data from S3 and rewrite back to s3 in Parquet format.


In [5]:
df_cities=spark.read.csv("us-cities-demographics.csv", sep=';', header=True)
df_temps=spark.read.csv("GlobalLandTemperaturesByCity.csv", sep=',', header=True)
df_iata=spark.read.csv("iata-codes.csv", sep='-', header=True)#source https://www.airportcodes.us/us-airports.htm
df_airports=spark.read.csv("airport-codes_csv.csv", sep=',', header=True)
df_immi =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps

Data is run through the check_df() function to check which columns have nulls or duplicates, the columns with large amount of nulls are removed from the data, the data then is subsetted to get the columns we need for the final datasets.

In [6]:
def check_df(df):
    print('NaNs:\n')
    df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
    print('nulls:\n')
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
    print( 'duplicates: ',df.count()-df.distinct().count())
    print(df.count())  

In [7]:
check_df(df_cities)

NaNs:

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              0|                0|               0|                 0|           0|                     0|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+

nulls:

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Populatio

In [8]:
check_df(df_temps)

NaNs:

+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|                 0|                            0|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+

nulls:

+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|            364130|                       364130|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+

duplicates:  0
8599212


In [9]:
check_df(df_airports)

NaNs:

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|           0|        0|          0|         0|           0|       0|        0|         0|          0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+

nulls:

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|        7006|  

In [10]:
check_df(df_immi)

NaNs:

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|      0|      0|      0|     0|      0|    0|       0|       0|    0|      0|      0|      0|      0|      0|      0|     0|     0|      0|     0|    0|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+--

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model


The Data Model was chosen as it allows the immigration data to be analysed easily on a city by city basis, a monthly join from immigration to weather data especially allows the analyst to see the weather details for the month of arrival over previous years.


The data was set up this way as it gives overall details of immigration throughout the US for easy summary reporting but also allowing the analyst to drill down to state and city level for more detail.

### 3.1 model:
#### dim_airport
 ident  
 type  
 name  
 elevation_ft  
 municipality  
 gps_code  
 iata_code   
 local_code  
 coordinates  
 
#### dim_demographic
 City  
 State  
 Median Age  
 Male Population  
 Female Population  
 Total Population  
 Average Household Size  
 State Code  
 Race  
 Count  

#### dim weather
 City  
 Country  
 Year  
 Month  
 AverageTemperature  
 Latitude  
 Longitude  
 
 
#### fact_immigration
 cicid  
 i94mon  
 city  
 statecode  
 i94port  
 arrdate  
 i94mode  
 i94addr  
 depdate  
 i94bir  
 i94visa  
 visapost  
 entdepd  
 biryear  
 gender  
 airline  
 fltno  
 visatype  



![Relationship Diagram](IMMI_DB_DIA.png)

#### 3.2 Mapping Out Data Pipelines

In [11]:
def clean_immigration(df_immi,df_iata):    
    df_immi= df_immi.drop('occup','insnum', 'entdepu','i94yr', 'admnum','dtaddto','entdepa','matflag','entdepa','count','bdtadfile','dtadfile', 'i94cit','i94res')
    df_immi= df_immi.join(df_iata,df_immi.i94port == df_iata.Code)
    df_immi= df_immi.drop(df_immi.Code)
    df_immi = df_immi.withColumn("biryear", df_immi["biryear"].cast(IntegerType()))
    df_immi = df_immi.withColumn("i94mon", df_immi["i94mon"].cast(IntegerType()))
    df_immi = df_immi.withColumn("cicid", df_immi["cicid"].cast(IntegerType()))
    df_immi = df_immi.withColumn("arrdate", df_immi["arrdate"].cast(IntegerType()))
    df_immi = df_immi.withColumn("arrdate", date_adding_udf('arrdate'))
    df_immi = df_immi.withColumn("depdate", df_immi["depdate"].cast(IntegerType()))
    df_immi = df_immi.withColumn("depdate", date_adding_udf('depdate'))
    
    return df_immi
    

In [12]:
def date_adding(days):
        if(days):
            date = dt.datetime(1960, 1, 1).date()
            return (date + dt.timedelta(days)).isoformat()
        return None

date_adding_udf = udf(lambda z: date_adding(z))

In [13]:
def clean_weather(df_temps):
    df_temps=df_temps.filter(df_temps.AverageTemperature.isNotNull())
    split_temp=split(df_temps['dt'], '-')     
    df_temps= df_temps.withColumn('Year', split_temp.getItem(0))
    df_temps= df_temps.withColumn('Month', split_temp.getItem(1))
    df_temps = df_temps.withColumn('AverageTemperature' ,round(df_temps["AverageTemperature"], 2))
    df_temps = df_temps.select("City","Country","AverageTemperature","Year","Month","Latitude", "Longitude")
    return df_temps


In [14]:
def clean_airports(df_airports):
    df_airports = df_airports.where("iso_country = 'US' and iata_code is not null")
    df_airports = df_airports.select("ident","type","name","elevation_ft","municipality","gps_code","iata_code","local_code","coordinates")
    return df_airports

In [15]:
def clean_demographics(df_cities):
    df_cities1 = df_cities.selectExpr("monotonically_increasing_id() as Id","*")
    df_cities = df_cities1
    for col in df_cities1.columns:
      df_cities = df_cities.withColumnRenamed(col,col.replace(" ", "_"))
    return df_cities

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [16]:
immi = clean_immigration(df_immi,df_iata)
immi.limit(5).show(5)

+-----+------+-------+----------+-------+-------+----------+------+-------+--------+-------+-------+------+-------+-----+--------+-------+-----+
|cicid|i94mon|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|visapost|entdepd|biryear|gender|airline|fltno|visatype|   City|State|
+-----+------+-------+----------+-------+-------+----------+------+-------+--------+-------+-------+------+-------+-----+--------+-------+-----+
|    7|     4|    ATL|2016-04-07|    1.0|     AL|      null|  25.0|    3.0|     SEO|   null|   1991|     M|   null|00296|      F1|Atlanta|   GA|
|   27|     4|    BOS|2016-04-01|    1.0|     MA|2016-04-05|  58.0|    1.0|     TIA|      O|   1958|     M|     LH|00422|      B1| Boston|   MA|
|   28|     4|    ATL|2016-04-01|    1.0|     MA|2016-04-05|  56.0|    1.0|     TIA|      O|   1960|     F|     LH|00422|      B1|Atlanta|   GA|
|   29|     4|    ATL|2016-04-01|    1.0|     MA|2016-04-17|  62.0|    2.0|     TIA|      O|   1954|     M|     AZ|00614|      B2|

In [17]:
temps = clean_weather(df_temps)
temps.limit(5).show(5)

+-----+-------+------------------+----+-----+--------+---------+
| City|Country|AverageTemperature|Year|Month|Latitude|Longitude|
+-----+-------+------------------+----+-----+--------+---------+
|Århus|Denmark|              6.07|1743|   11|  57.05N|   10.33E|
|Århus|Denmark|              5.79|1744|   04|  57.05N|   10.33E|
|Århus|Denmark|             10.64|1744|   05|  57.05N|   10.33E|
|Århus|Denmark|             14.05|1744|   06|  57.05N|   10.33E|
|Århus|Denmark|             16.08|1744|   07|  57.05N|   10.33E|
+-----+-------+------------------+----+-----+--------+---------+



In [18]:
airports = clean_airports(df_airports)
airports.limit(5).show(5)

+-----+-------------+--------------------+------------+-------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft| municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+-------------+--------+---------+----------+--------------------+
| 07FA|small_airport|Ocean Reef Club A...|           8|    Key Largo|    07FA|      OCA|      07FA|-80.274803161621,...|
|  0AK|small_airport|Pilot Station Air...|         305|Pilot Station|    null|      PQS|       0AK|-162.899994, 61.9...|
| 0CO2|small_airport|Crested Butte Air...|        8980|Crested Butte|    0CO2|      CSE|      0CO2|-106.928341, 38.8...|
| 0TE7|small_airport|   LBJ Ranch Airport|        1515| Johnson City|    0TE7|      JCY|      0TE7|-98.6224975585999...|
| 13MA|small_airport|Metropolitan Airport|         418|       Palmer|    13MA|      PMX|      13MA|-72.3114013671999...|
+-----+-------------+-----------

In [19]:
demographics = clean_demographics(df_cities)
demographics.limit(5).show(5)

+---+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
| Id|            City|        State|Median_Age|Male_Population|Female_Population|Total_Population|Number_of_Veterans|Foreign-born|Average_Household_Size|State_Code|                Race|Count|
+---+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|  0|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|  1|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|  2|          Hoover|      Alabama|    

## 4.2 Data Quality Checks
Final datasets are checked in a few ways:
 * Count row checks on every table to ensure no bugs happened in load
 * Test Fact Primary key for nulls 
 * Test Fact Primary key for duplicates
 

In [20]:
# Perform quality checks here
#check primary id of fact table for nulls and duplicates
immi.createOrReplaceTempView("immi_view")
fact_nulls=spark.sql("""SELECT COUNT(*) as count FROM immi_view WHERE cicid IS NULL""")
fact_duplicates=spark.sql("""select sum(*) as sum from (SELECT COUNT(cicid) FROM immi_view group by cicid HAVING COUNT(*) > 1) as A""")
null_count = fact_nulls.first()['count']
dupe_count= fact_duplicates.first()['sum']
if(dupe_count is None):
    dupe_count = 0


print('Count of Primary Key nulls in Fact Table: ',null_count,', Count of Primary Key Duplicates in Fact Table: ',dupe_count)
a = temps.count()
b = airports.count()
c = demographics.count()
d = immi.count()
print('Table Row Counts:','Temperatures ',a,', Airports ',b,', Demographics ' ,c,', Immigration ',d)

Count of Primary Key nulls in Fact Table:  0 , Count of Primary Key Duplicates in Fact Table:  0
Table Row Counts: Temperatures  8235082 , Airports  2019 , Demographics  2891 , Immigration  1069528


In [None]:
airports.write.mode("overwrite").parquet(output_data +"Airports")
temps.write.partitionBy("Month").mode("overwrite").parquet(output_data +"Temperatures")
immi.write.mode("overwrite").parquet(output_data +"Immigration")
demographics.write.mode("overwrite").parquet(output_data +"Demographics")

#### 4.3 Data dictionary 
Attached Seperately


#### Step 5: Complete Project Write Up

Tools used:
 * Amazon S3- Used for storage of initial datasets and final parquets- Chosen due to ease of access for all technologies and pricing scheme works well for data that isnt loaded in and out often.
 * Amazon EMR - Used to launch spark instance to run main.py, Running spark locally and via workspace is much slower and EMR is very simple to set up for repliucation.

Please see main.py for more information on the functions used

### 5 Different Scenario's
#### The data was increased by 100x.
 * Scale up the EMR cluster to allow for higher use
 * Move data to Redshift/Snowflake/Cassandra or another data warehouse as that will be more efficient when accessing large amounts of data
 
 #### The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * Set up a scheduler to run the job on the daily, Airflow or Cron job to trigger main.py
 
  #### The database needed to be accessed by 100+ people.
 * Set up Redshift, you can use concurrency scaling (https://docs.aws.amazon.com/redshift/latest/dg/concurrency-scaling.html) to allow as many users as needed access to the data, this however will scale up the price quite quickly


References:


    https://knowledge.udacity.com/
    https://stackoverflow.com/questions/51949414/read-sas-sas7bdat-data-with-spark
    https://www.airportcodes.us/us-airports.htm
    https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data/notebooks
    https://www.bmc.com/blogs/how-to-write-spark-udf-python/