# ETL Pipeline for Immigration Data
## Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import os
import re

from pyspark.sql.functions import udf

## Scope of the project and dataset description

### Scope of the project
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

### Dataset Used 
The I94 immigration data comes from the US National Tourism and Trade Office. It is provided in SAS7BDAT format which is a binary database storage format. Some relevant attributes include:

- i94yr: 4 digit year
- i94mon: Month's value in number
- i94cit: 3 digit code of origin city
- i94port: 3 character code of destination city in US
- arrdate: arrival date in USA
- i94mode: Numeric Value mode of travel (air, land, sea or Not reported)
- depdate: departure date from the USA
- i94visa: visa codes Numeric Value(business, pleasure or student)
- occup: occupation that would be performed in US

The "US city demographic data" data comes from Opensoft. It is provided in csv format. Some of relevant attributes include these columns:
- city: city name
- state: state name
- total population: population of city
- race: primary race of population living in the city
- average_household_size
- foreign_born: no of foreigners in the city

The "World Temperature Data" comes from Kaggle. It is also provided in CSV format. Some of the relevant attributes includes:
- AverageTemperature: average temperature of city
- City: city name
- Country: country name
- Latitude: latitude
- Longitude = longitude

### Reading the data & Priliminary check for NAN values

In [2]:
demographics_data = pd.read_csv("us-cities-demographics.csv",sep=";")
temperature_data = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
# we first read a sample from immigration data to see what type of data does the dataset contains
immigration_data = pd.read_csv("immigration_data_sample.csv")

In [3]:
print(demographics_data.columns)
print("NAN values containing columns")
print(demographics_data.isna().any()[lambda x: x])
demographics_data.head()

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')
NAN values containing columns
Male Population           True
Female Population         True
Number of Veterans        True
Foreign-born              True
Average Household Size    True
dtype: bool


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [4]:
len(demographics_data.State.unique())

49

In [5]:
print(temperature_data.columns)
print("NAN values containing columns")
print(temperature_data.isna().any()[lambda x: x])
temperature_data.head()

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')
NAN values containing columns
AverageTemperature               True
AverageTemperatureUncertainty    True
dtype: bool


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [6]:
print(len(temperature_data.Country.unique()))

159


In [7]:
print(immigration_data.columns)
print("NAN values containing columns")
print(immigration_data.isna().any()[lambda x: x])
immigration_data.head()

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')
NAN values containing columns
i94addr     True
depdate     True
visapost    True
occup       True
entdepd     True
entdepu     True
matflag     True
gender      True
insnum      True
airline     True
fltno       True
dtype: bool


Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


Since we are planning to use i94port column , IN THE DOCUMENT

In [8]:
immigration_data.i94port.unique()

array(['HHW', 'MCA', 'OGG', 'LOS', 'CHM', 'ATL', 'SFR', 'NYC', 'CHI',
       'PHI', 'FTL', 'BOS', 'SAI', 'NAS', 'SEA', 'ORL', 'PSP', 'HOU',
       'NEW', 'BAL', 'SNJ', 'DET', 'AGA', 'LVG', 'MIA', 'SDP', 'VCV',
       'DUB', 'PEM', 'TAM', 'BLA', 'WAS', 'KOA', 'DAL', 'SHA', 'SPM',
       'NIA', 'PHR', 'MIL', 'SLC', 'CLT', 'EPI', 'SNA', 'MON', 'DLR',
       'SFB', 'OPF', 'X96', 'CLM', 'LIH', 'DEN', 'PHO', 'POO', 'NOL',
       'WPB', 'PBB', 'TOR', 'MAA', 'RNO', 'FMY', 'HIG', 'OAK', 'OTM',
       'ONT', 'SRQ', 'LLB', 'NCA', 'SUM', 'STR', 'HAM'], dtype=object)

Since all the values seems to be properly formulated

In [None]:
immigration_data_segment = pd.read_sas('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat', 'sas7bdat', encoding="ISO-8859-1")

In [None]:
immigration_data_segment.i94port.unique()

In [None]:
len(immigration_data_segment)

In [None]:
len(immigration_data_segment.cicid.unique())

The dataset contains ill formulated values like 
'XXX, X96, 5T6, ML8, NC8 etc' for column i94port. We'll have to clean them before modeling the data

We will create a dictionary

In [None]:
# Create dictionary of valid i94port codes
code_city_dict = {}
city_code_dict = {}
r_exp = re.compile(r'\'(.*)\'.*\'(.*)\'')
with open('portcode_city.txt') as f:
     for line in f:
         match = r_exp.search(line)
         code_city_dict[match[1]]=match[2].strip()
         city_code_dict[match[2].strip()]=match[1]


# Testing the dictionary format
#code_city_dict
#city_code_dict

In [None]:
@udf()
def get_portcode_from_city(city):
    '''
    Input: City name
    Output: Corresponding i94port
    '''
    for key in code_city_dict:
        if city.lower() in code_city_dict[key].lower():
            return key

In [None]:
# Create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [None]:
#write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Cleaning Steps
Document steps necessary to clean the data

##### Cleaning Immigration data

In [None]:
def remove_invalid_ports(file_path):
    '''
    Args: 
    file_path: Path to I94 immigration data
    Returns: 
    Spark dataframe of the data with valid i94port
    '''
    imm_dataframe = spark.read.format('com.github.saurfang.sas.spark').load(file_path)
    # Filter entries where i94port is invalid
    valid_imm_dataframe = imm_dataframe.filter(imm_dataframe.i94port.isin(list(code_city_dict.keys())))

    return valid_imm_dataframe

immigration_df = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat' 
immigration_df = remove_invalid_ports(immigration_df)
immigration_df=immigration_df.dropDuplicates(['cicid'])
immigration_df=immigration_df.filter(immigration_df.i94port != 'null')
immigration_df.show(3)

In [None]:
temperature_df=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")
temperature_df=temperature_df.filter(temperature_df.AverageTemperature != 'NaN')
# Remove duplicate locations
temperature_df=temperature_df.dropDuplicates(['City', 'Country'])
# Get corresponding port name
temperature_df=temperature_df.withColumn("i94port", get_portcode_from_city(temperature_df.City))
# Remove entries with no iport94 code
temperature_df=temperature_df.filter(temperature_df.i94port != 'null')
# Show results
temperature_df.show(4)

In [None]:
demographics_df=spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', quote='"', delimiter=';').load('us-cities-demographics.csv')
# Remove duplicate locations
demographics_df=demographics_df.dropDuplicates(['City', 'State'])
# Get corresponding port name
demographics_df=demographics_df.withColumn("i94port", get_portcode_from_city(demographics_df.City))
# Remove entries with no iport94 code
demographics_df=demographics_df.filter(demographics_df.i94port != 'null')
# Show results
demographics_df.show(4)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

~~~~
cities_us_table
 |-- city: name of city
 |-- state: name of state
 |-- port_code: code for city
 |-- total_population: total population of the city
 |-- no_of_veterans: no of veterans in the city
 |-- no_of_foreignborns: no of foreign born residents in the city
 |-- average_household_size: no of average household size in the city
 |-- race: dominant racial group in the city 
 |-- average_temperature: average temperature of the city (joined from temperature data
~~~~

~~~~
immigrants_table
 |-- cicid: unique identifier for immigration/immigrants
 |-- birthdate: birthdate of immigrant
 |-- gender: gender of immigrant
 |-- occupation: occupation immigrant adopts in US (preferably)
 |-- visa_mode: business, pleasure or student
 |-- mode_of_arrival: mode of arrival to US eg: air, land etc
 |-- arrival_date: arrival date of immigrant in US
~~~~

~~~~
immigration_table:
 |-- year: year of immigration
 |-- month: month of immigration
 |-- source_city: source city port
 |-- destination_city: destination city
 |-- mode_of_arrival: mode of arrival to US eg: air, land etc
 |-- average_temperature: average_temperature of US city
 |-- race: dominant racial group of destination city
 |-- foreign_born_no: total no of people in the city who were foreign born
 
~~~~

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# immigrants_table
immigrants_table = immigration_df.select(["cicid","biryear", "gender", "occup", "i94visa", "i94mode", "arrdate"])

# drop city to resolve ambiguity
demographics_df = demographics_df.drop("City")
#joining city_us_table and temperature_df for average temperature
city_temperature_full= demographics_df.join(temperature_df, demographics_df.i94port == temperature_df.i94port)
#select columns for city_us_table
city_us_table = city_temperature_full.select(["City","State","Total Population","Number of Veterans","Foreign-born","Average Household Size","Race","AverageTemperature"])

#joining demographics_df and immigration_df
immigration_demographics_full= immigration_df.join(demographics_df, demographics_df.i94port == immigration_df.i94port).drop(immigration_df.i94port)
#selecting relevent columns for immigration table
immigration_table = immigration_demographics_full.select(["cicid","i94yr","i94mon","i94cit","i94port","i94mode","Average Household Size","Race","Foreign-born"])

In [None]:
immigrants_table.show(5)

In [None]:
city_us_table.show(5)

In [None]:
immigration_table.show(5)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
# def completeness_check(df, description):
#     '''
#     Input: Spark dataframe, description of Spark datafram
#     Output: Print outcome of data quality check
#     '''
#     result = df.count()
#     if result == 0:
#         print("Data quality check failed for {} with zero records".format(description))
#     else:
#         print("Data quality check passed for {} with {} records".format(description, result))
#     return 0

# # Perform data quality check
# quality_check(immigration_table, "immigration table")
# quality_check(city_us_table, "temperature table")
# quality_check(immigrants_table, "immigrants table")

In [None]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

#### 4.3 Data dictionary 
~~~~
cities_us_table
 |-- city: name of city
 |-- state: name of state
 |-- port_code:
 |-- total_population: total population of the city
 |-- no_of_veterans: no of veterans in the city
 |-- no_of_foreignborns: no of foreign born residents in the city
 |-- average_household_size: no of average household size in the city
 |-- race: dominant racial group in the city 
 |-- average_temperature: average temperature of the city (joined from temperature data
~~~~

~~~~
immigrants_table
 |-- cicid: unique identifier for immigration/immigrants
 |-- birthdate: birthdate of immigrant
 |-- gender: gender of immigrant
 |-- occupation: occupation immigrant adopts in US (preferably)
 |-- visa_mode: business, pleasure or student
 |-- mode_of_arrival: mode of arrival to US eg: air, land etc
 |-- arrival_date: arrival date of immigrant in US
~~~~

~~~~
immigration_table:
 |-- year: year of immigration
 |-- month: month of immigration
 |-- source_city: source city port
 |-- destination_city: destination city
 |-- mode_of_arrival: mode of arrival to US eg: air, land etc
 |-- average_temperature: average_temperature of US city
 |-- race: dominant racial group of destination city
 |-- foreign_born_no: total no of people in the city who were foreign born
 
~~~~

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.