# Relationship between several factors and US immigration events

## Project Summary
Project to get analytic table to show relationship between several factors and US immigration events

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import re
from datetime import datetime, timedelta
import pyspark.sql.functions as F
from pyspark.sql.types import DateType, IntegerType, DoubleType

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
            config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
            .enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project I extracted data from different sources, transformed them with Spark operations and loaded them into several dimensionsional tables. 
The fact table is about the effect of several factors on the immigration movements in the U.S. in 2016.  

#### Describe and Gather Data
1. **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office.
2. **I94_SAS_Labels_Description**: Description of data attributes of I94 Immigration data
2. **US-Cities-Demographics**: U.S. City Demographic Data from OpenSoft
3. **World Temperature Data**: This dataset came from Kaggle
4. **Airport Code Table**: This is a simple table of airport codes and corresponding cities. It comes from datahub.io.

In [2]:
im_data = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
sas_labels = 'I94_SAS_Labels_Descriptions.SAS'
demogr_data = "us-cities-demographics.csv"
temp_data = '../../data2/GlobalLandTemperaturesByCity.csv'
airport_codes = 'airport-codes_csv.csv'

### Step 2: Explore and Assess the Data
#### Explore the Data 
In all datasets we have issues with NaNs and scope. Since we are checking the effect of several factors on the immigration numbers in 2016 we can already scope the data based on that year. 


#### Cleaning Steps

1. **I94 Immigration Data**: 
    - In this example notebook I only selected one month (april-2016), but can be scaled up to all data.
    - Filtered on valid states in the **I94_SAS_Labels_Description** file
2. **I94_SAS_Labels_Description**: 
    - we can get all valid cities and airports in the immigrants dataset, which will be the main datasource for our analytical table. This is a raw text format so I extracted the nessesary fiels with regex functions to extract the valid ports, the valid cities, the visa codes, the visa modes and countries.
2. **US-Cities-Demographics**:
    - The trick with this dataset is to pivot on state for the column Race (so you get the racial groups in the columns), State in the index and the counts as values 
3. **World Temperature Data**: 
    - Only selected valid cities from U.S. because of the scope. 
    - Past 3 years. 
4. **Airport Code Table**:
    - In this table there the column iata_code respresents the code for the airport. The make it feasible for this analysis I dropped the NaNs
    - Airports with `type=='closed'`are filtered out. 
    - Drop duplicates

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
![Data-Model](data-model-capstone.PNG "Data Model for Immigration data")

#### 3.2 Mapping Out Data Pipelines

- **DIM_airports**: Get the valid ports from **I94_SAS_Labels_Description** and join them with the valid fields in **Airport Code Table**
- **DIM_visa**: Get the reason for immigration from **I94_SAS_Labels_Description**
- **DIM_mode**: Get the travel mode from **I94_SAS_Labels_Description**
- **DIM_state**: 
    - Get the demographic data per state from **US-Cities-Demographics**
    - Get the valid cities from **I94_SAS_Labels_Description**
    - Get the temperature data per valid city from **World Temperature Data**
    - Aggregate and pivot the data per state
- **DIM_country**: Get the country id and names from **I94_SAS_Labels_Description**
- **FACT_immigration**: 
    - Get the valid ports from **I94_SAS_Labels_Description**
    - Cast double types to integer so we can later join the id's with the dimensions and sas-date fields to datetime format

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model


#### DIM_mode

In [3]:
flist = open('I94_SAS_Labels_Descriptions.SAS').readlines()
idx = [k for (k,v) in enumerate(flist) if v.startswith("/* I94ADDR") or v.startswith("value i94model")]
p = re.compile("\t(\d) = '(.*)'")
dim_mode = spark.createDataFrame(p.findall(''.join(flist[idx[0]:idx[1]][1:])), schema=['mode_id', 'mode'])
dim_mode = dim_mode.withColumn("mode_id", F.col("mode_id").cast("integer")).dropDuplicates(['mode_id'])
dim_mode.show()

+-------+------------+
|mode_id|        mode|
+-------+------------+
|      1|         Air|
|      3|        Land|
|      9|Not reported|
|      2|         Sea|
+-------+------------+



#### DIM_visa

In [4]:
idx = [k for (k,v) in enumerate(flist) if v.startswith("/* I94VISA") or v.startswith("/* COUNT")]
p = re.compile("(\d) = (\w+)\n")
dim_visa = spark.createDataFrame(p.findall(''.join(flist[idx[0]:idx[1]][1:])), schema=['visa_id', 'visa'])
dim_visa = dim_visa.withColumn("visa_id", F.col("visa_id").cast("integer")).dropDuplicates(['visa_id'])
dim_visa.show()

+-------+--------+
|visa_id|    visa|
+-------+--------+
|      1|Business|
|      3| Student|
|      2|Pleasure|
+-------+--------+



#### DIM_airports

In [5]:
idx = [k for (k,v) in enumerate(flist) if v.startswith("/* I94PORT") or v.startswith("/* ARRDATE")]
p = re.compile("'(.*)'\t=\t'(.*)'")
airport_list = [(i[0], i[1].rstrip()) for i in p.findall(''.join(flist[idx[0]:idx[1]][1:]))]
df_valid_ports = spark.createDataFrame([(i[0], i[1].rstrip()) for i in p.findall(''.join(flist[idx[0]:idx[1]][1:]))], schema=['port_id', 'airport'])

df_airport_codes = spark.read.format("csv").option("header", "true").option("delimiter", ',')\
            .load(airport_codes)

df_airport_codes = df_airport_codes\
                        .filter(F.col('iata_code').isNotNull())\
                        .filter(F.col('type')!='closed')\
                        .select('name', 'type', 'continent', 'iso_country', 'iso_region', 'municipality', 'iata_code')\
                        .withColumnRenamed('iata_code', 'port_id')

dim_airports = df_valid_ports.join(df_airport_codes, on='port_id').dropDuplicates(['port_id'])
dim_airports.show(5, truncate=False)

+-------+--------------+-------------------------------------+--------------+---------+-----------+----------+-------------+
|port_id|airport       |name                                 |type          |continent|iso_country|iso_region|municipality |
+-------+--------------+-------------------------------------+--------------+---------+-----------+----------+-------------+
|BGM    |BANGOR, ME    |Greater Binghamton/Edwin A Link field|medium_airport|NA       |US         |US-NY     |Binghamton   |
|FMY    |FORT MYERS, FL|Page Field                           |medium_airport|NA       |US         |US-FL     |Fort Myers   |
|LEB    |LEBANON, NH   |Lebanon Municipal Airport            |medium_airport|NA       |US         |US-NH     |Lebanon      |
|DNS    |DUNSEITH, ND  |Denison Municipal Airport            |small_airport |NA       |US         |US-IA     |Denison      |
|EGL    |EAGLE, AK     |Negele Airport                       |small_airport |AF       |ET         |ET-OR     |Negele Borana|


#### DIM_Country

In [6]:
idx = [k for (k,v) in enumerate(flist) if v.startswith("/* I94CIT & I94RES") or v.startswith("/* I94PORT")]
p = re.compile("(\d{3}) = *'(.*)'")
dim_country = spark.createDataFrame(p.findall(''.join(flist[idx[0]:idx[1]][1:])), schema=['country_id', 'country'])
dim_country = dim_country.withColumn("country_id", F.col("country_id").cast("integer")).dropDuplicates(['country_id'])
dim_country.show(5, truncate=False)

+----------+----------------------------------+
|country_id|country                           |
+----------+----------------------------------+
|471       |INVALID: MARIANA ISLANDS, NORTHERN|
|243       |BURMA                             |
|392       |MALI                              |
|737       |INVALID: MIDWAY ISLANDS           |
|516       |TRINIDAD AND TOBAGO               |
+----------+----------------------------------+
only showing top 5 rows



#### DIM_state

This dimension combines the data from the demographics and the temperature data. For The immigrant and temperature data we need to filter the cities and ports to only U.S. 

In [7]:
valid_ports_cities = [(i[0], i[1].split(', ')[0].title()) for i in airport_list  if len(i[1].split(', ')[-1])==2]
valid_ports_cities = spark.createDataFrame(valid_ports_cities, schema=['port_id', 'city'])
valid_ports = [i[0] for i in valid_ports_cities.select('port_id').distinct().collect()]
valid_cities = [i[0] for i in valid_ports_cities.select('city').distinct().collect()]

In [8]:
df_demo = spark.read.format("csv").option("header", "true").option("delimiter", ';')\
            .load(demogr_data)
df_demo = df_demo.withColumn("Count", df_demo["Count"].cast(IntegerType()))

df_temp = spark.read.format("csv").option("header", "true").option("delimiter", ',')\
            .load(temp_data)

def clean_temp_data(df, from_year, valid_cities):
    
    df = df\
            .withColumn("dt", df_temp['dt'].cast(DateType()))\
            .filter(F.col('AverageTemperature').isNotNull())\
            .filter(F.col("City").isin(valid_cities))\
            .filter(F.col("dt")>from_year)
                    
    return df

df_temp_clean = clean_temp_data(df_temp, from_year='2010', valid_cities = valid_cities)
df_temp_clean = df_temp_clean.withColumn("AverageTemperature", df_temp_clean["AverageTemperature"].cast(DoubleType()))

temp_per_state = df_demo.select('City', 'State Code').distinct()\
        .join(
            df_temp_clean.withColumn("AverageTemperature", df_temp_clean["AverageTemperature"].cast(DoubleType()))\
                .groupby('City').agg(F.round(F.avg("AverageTemperature"),2).alias("avg_temp")), on='City')\
                .groupby('State Code').agg(F.round(F.avg('avg_temp'), 2).alias('avg_temp'))

dim_state = df_demo.dropDuplicates(['State Code', 'City']).groupBy("State Code", "State")\
    .agg(
      F.sum("Total Population").cast('integer').alias("total_population"),
      F.sum("Number of Veterans").cast('integer').alias("num_veterans"),
      F.sum("Foreign-born").cast('integer').alias("foreign_born"),
      F.round(F.avg("Average Household Size"),2).alias("avg_hh_size"))\
    .join(df_demo.groupBy("State Code")\
        .pivot('Race').sum("Count")\
        .select(F.col('State Code'),
                F.col('American Indian and Alaska Native').alias('num_native'),
                F.col('Asian').alias('num_asian'),
                F.col('Black or African-American').alias('num_afr'),
                F.col('Hispanic or Latino').alias('num_lat'),
                F.col('White').alias('num_white')), on='State Code')\
    .join(temp_per_state, on='State Code')\
    .withColumnRenamed("State Code","state_id")\
    .withColumnRenamed("State","state").dropDuplicates(['state_id'])
dim_state.show(5)

+--------+--------------+----------------+------------+------------+-----------+----------+---------+-------+-------+---------+--------+
|state_id|         state|total_population|num_veterans|foreign_born|avg_hh_size|num_native|num_asian|num_afr|num_lat|num_white|avg_temp|
+--------+--------------+----------------+------------+------------+-----------+----------+---------+-------+-------+---------+--------+
|      AZ|       Arizona|         4499542|      264505|      682313|       2.77|    129708|   229183| 296222|1508157|  3591611|   20.47|
|      SC|South Carolina|          533657|       33463|       27744|       2.47|      3705|    13355| 175064|  29863|   343764|   19.74|
|      LA|     Louisiana|         1300595|       69771|       83419|       2.47|      8263|    38739| 602377|  87133|   654578|   21.28|
|      MN|     Minnesota|         1422403|       64894|      215873|        2.5|     25242|   151544| 216731| 103229|  1050239|    9.81|
|      NJ|    New Jersey|         1428908

#### Fact Immigration

In [9]:
fact_im = spark.read.format('com.github.saurfang.sas.spark')\
            .load(im_data)

# Selecting the right columns from the Data Model
col_sel = ['cicid', 'i94port', 'i94visa', 'i94addr', 'i94mode', 'i94cit', 'i94res', 'i94yr', 'i94mon', 'i94bir', 'gender', 'depdate', 'arrdate']
new_col = ['im_id', 'port_id', 'visa_id', 'state_id', 'mode_id', 'country_id_cit', 'country_id_res', 'im_yr', 'im_mon', 'bir_yr', 'gender', 'depdate', 'arrdate']
fact_im = fact_im.select([F.col(c).alias(dict(zip(col_sel, new_col)).get(c, c)) for c in col_sel])

# Filter only valid ports
fact_im = fact_im.filter(F.col('port_id').isin(valid_ports))

# Casting 

## the double columns to integer type
for col_name in [i[0] for i in fact_im.dtypes if i[1] =='double']:
    fact_im = fact_im.withColumn(col_name, F.col(col_name).cast('integer'))

## The two date columns to readable format
def get_datetime(sas_days):
    if sas_days:
        return datetime(1960,1,1) + timedelta(days=sas_days)
    else:
        return None

f_get_datetime = F.udf(get_datetime, DateType())

fact_im = fact_im.withColumn('depdate', f_get_datetime('depdate'))\
        .withColumn('arrdate', f_get_datetime('arrdate')).dropDuplicates(['im_id'])

fact_im.show(5)

+-----+-------+-------+--------+-------+--------------+--------------+-----+------+------+------+----------+----------+
|im_id|port_id|visa_id|state_id|mode_id|country_id_cit|country_id_res|im_yr|im_mon|bir_yr|gender|   depdate|   arrdate|
+-----+-------+-------+--------+-------+--------------+--------------+-----+------+------+------+----------+----------+
|  148|    NEW|      2|      NY|      1|           103|           103| 2016|     4|    21|     F|2016-04-08|2016-04-01|
|  463|    MIA|      2|      FL|      1|           103|           103| 2016|     4|    25|  null|2016-04-02|2016-04-01|
|  471|    MIA|      2|    null|      2|           103|           103| 2016|     4|    63|     M|2016-04-03|2016-04-01|
|  496|    CHI|      1|      IL|      1|           103|           103| 2016|     4|    64|  null|2016-04-04|2016-04-01|
|  833|    BOS|      2|      NY|      1|           104|           104| 2016|     4|    16|     F|2016-04-09|2016-04-01|
+-----+-------+-------+--------+-------+

#### 4.2 Data Quality Checks
- All dimensional tables ends with a drop duplicates so we ensure we don't have duplicate keys in these tables. 
- All dimensinal tables has at least one row of data. See the test below

In [10]:
def check_zero_records(spark_df):
    """
    Checks if input DataFrame has 0 records
    
    :param df: Spark DataFrame
    """
    
    check_zero = f"In this DataFrame there are {spark_df.count()} records found"
    
    if spark_df.count() == 0:
        print(f"Check failed: {check_zero}")
    else:
        print(f"Check passed: {check_zero}")

In [13]:
for table, spark_df in zip(['DIM_air', 'DIM_visa', 'DIM_mode', 'DIM_state', 'DIM_country', 'FACT_Immigration'], [dim_airports, dim_visa, dim_mode, dim_state, dim_country, fact_im]):
    print(f'Table: {table}:')
    check_zero_records(spark_df)
    print('\n')
    

Table: DIM_air:
Check passed: In this DataFrame there are 537 records found


Table: DIM_visa:
Check passed: In this DataFrame there are 3 records found


Table: DIM_mode:
Check passed: In this DataFrame there are 4 records found


Table: DIM_state:
Check passed: In this DataFrame there are 40 records found


Table: DIM_country:
Check passed: In this DataFrame there are 287 records found


Table: FACT_Immigration:
Check passed: In this DataFrame there are 2873793 records found




#### 4.3 Data dictionary 

- **DIM_airports**:
    - port_id: Identifier for airport (VARCHAR)
    - airport: Name of the airport in format 'City, State' (VARCHAR)
    - name: Official name of the airport (VARCHAR)
    - type: Type of airport (VARCHAR)
    - iso_country: Country of airport in iso-format (VARCHAR)
    - iso_region: Region of airport in iso-format (VARCHAR)
    - municipality: Municipality of the airport (VARCHAR)
  
  
- **DIM_visa**:
    - visa_id: Identifier for visa (INT)
    - visa: Reason for immigration (VARCHAR)
    
    
- **DIM_mode**:
    - mode_id: Identifier for mode (INT)
    - mode: Mode of travel (VARCHAR)


- **DIM_state**:
    - state_id: Identifier of U.S. state (VARCHAR)
    - state: Name of the state (VARCHAR)
    - total_population: Total populatin in state (INT)
    - num_veraterans: Number of vetarans in state (INT)
    - foreign_born: Number of foreign born people in state (INT)
    - avg_hh_size: Average household size in state (DOUBLE)
    - num_native: Number of American Indian and Alaska Native people in state (INT)
    - num_asian: Number of Asian people in state (INT)
    - num_afr: Number of Black or African-American people in state (INT)
    - num_lat: Number of Hispanic or Latino people in state (INT)
    - num_white: Number of White people in state (INT)
    - avg_temp: Average temperature in state (DOUBLE)


- **DIM_country**: 
    - country_id: Identifier of country (INT)
    - country: Country (VARCHAR)
    
    
- **FACT_immigration**: 
    - im_id: Identifier for immigration (INT)
    - port_id: Identifier for airport (VARCHAR)
    - visa_id: Identifier for visa (INT)
    - mode_id: Identifier for mode (INT)
    - state_id: Identifier of U.S. state (VARCHAR)
    - country_id_cit: Identifier of country (INT)
    - country_id_res: Identifier of country (INT)
    - im_year: Year of immigration (INT)
    - im_month: Month of immigration (INT)
    - biryear: Year of birth of immigrant (INT)
    - gender: Gender of immigrant (VARCHAR)
    - depdate: Departure date (DATETIME)
    - arrdate: Departure date (DATETIME)


#### 4.4 Write the tables to parquet 

In [12]:
# Write the dimensions
for table, spark_df in zip(['DIM_air', 'DIM_visa', 'DIM_mode',  'DIM_state', 'DIM_country'], [dim_airports, dim_visa, dim_state, dim_country]):
    spark_df.write.parquet(f'output_data/{table}/', mode='overwrite')
    
# Partion the fact by year and month
fact_im.write.partitionBy('im_yr', 'im_mon').parquet(f'output_data/FACT_immigration/', mode='overwrite')

#### Step 5: Complete Project Write Up

1. If the data of this project would be increased by 100x, I would make use of distributed power. All scripts in this notebook are mode with PySpark so can be reused for scaling. 
2. In the notebook I followed the pipeline for one month of immigration data. If we can query this data frequently (for instance daily) and use it in a dashboard I would recoomend to use Airflow. In this workflow the different source data can be extracted, transformed and loaded into dimensional tables in for instance Amazon Redshift. With the start and end date parameters in Airflow we can choose to load a subset of the immigration data to partion on time. 
3. If the data needs to be accesse dby +100 people I would write the output to Amazon Redshift since it is optimized for doing aggregations and has a good read performance