# ETL pipeline to create data warehouse of international student immigration to USA
### Data Engineering Capstone Project

#### Project Summary
This project generates a data warehouse to allow analytics on international students in the USA. Data from following sources are extracted, transformed and loaded into a data warehouse:
* I94 data from DHS, US-Gov
* Weather data for different cities (US and global) from Kaggle
* Climate data - processed from aforementioned weather data
* US city demographics data from OpenSoft

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession, functions
from pyspark.sql.types import DateType
from pyspark.sql.functions import col, isnan, when, count, avg, max, min, round
import requests

### Step 1: Scope the Project and Gather Data

#### Scope 
Many international students come to the USA either for acedmic studies (F1, F2 visa) or vocational training (M1, M2 visa). Often, dependents accompany these students (F2/M2 visa holders). This project aims to prepare a data lake to perform analytics on incoming international students. 

Questions that are likely to be answered by the analytics team:
1. Which cities are preferred by international students?
2. Do international students directly arrive at their destination city or do they prefer traveling through busy hubs (often with cheaper tickets; i94 address vs. location of entry)?
3. Does city demographics play a role in inernational student's preference? For example, do international students prefer cities with more foreign-born population?
4. Do students prefer cities/localities with similar weathers to their origin cities?

This analytics pipeline will be utilized by Universities (to tailor their offerings to international students), airlines (for route optimization) and/or real-estate investors (student housing development).

#### Data Sources

Following data will be aggregated in the data warehouse:
1. Student arrivals data. Monthly data snapshot provided by DHS, US gov (from I94 records).
2. Demographics data for each US cities. Released during census.
3. Weather data for US cities (destination cities for international students) and foreign cities (origin cities for international immigrants; inferred data from I94 records).

# Goal: To develop a data warehouse on international students that arrive in the USA.
The data warehouse will allow analytics on international student trends, preference, from where most of the students arrive? What is their destination.

# Data source:
* I94 arrival data from US DHS.
* Weather data from Kaggle.
* Visa post data from US gov.
* US city demographics data.
* Student enrolment data from US DoE (still not decided)!

In [2]:
# Read in the data here
import pandas as pd
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


##### Step 1.1: I94 Data Loading

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

i94_df = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

##### 1.2: Weather Data Loading

In [2]:
weather_df = spark.read.option('header', True).csv('../../data2/GlobalLandTemperaturesByCity.csv')

##### 1.3: City data

In [3]:
us_city_df = spark.read.option('header', True).option('sep', ';').csv('us-cities-demographics.csv')

##### 1.4: Visa port code data

In [7]:
import requests

visa_post_code = requests.get('https://fam.state.gov/fam/09FAM/09FAM010205.html')
visa_post_code.status_code

200

In [8]:
import pandas as pd

visa_post_df = pd.read_html(visa_post_code.text)[0]

In [5]:
import pandas as pd
file = open('I94_SAS_Labels_Descriptions.SAS')
page_nos = range(303,962)

airport_dict = dict()
for pos, lne in enumerate(file):
    if pos in page_nos:
        lne_split = lne.split('=')
        airport_code = lne_split[0].replace('\'', '').strip()
        airport_city = lne_split[1].replace('\'', '').strip().split(',')[0]
        airport_dict[airport_code] = airport_city

file.close()
visa_port_table = pd.DataFrame([airport_dict]).T.reset_index()
visa_port_table.columns = ['i94post_code', 'entry_post_location']
visa_port_table.tail()

Unnamed: 0,i94post_code,entry_post_location
654,YSL,YSLETA
655,YUI,YUMA
656,YUM,YUMA
657,YXE,SASKATOON
658,ZZZ,MEXICO Land (Banco de Mexico)


In [11]:
visa_port_table[visa_port_table.i94post_code == 'NYC']

Unnamed: 0,i94post_code,entry_post_location
428,NYC,NEW YORK


In [6]:
visa_port_table = spark.createDataFrame(visa_port_table)
visa_port_table.show()

+------------+--------------------+
|i94post_code| entry_post_location|
+------------+--------------------+
|         .GA|  No PORT Code (.GA)|
|         060|   No PORT Code (60)|
|         48Y|PINECREEK BORDER ...|
|         5KE|           KETCHIKAN|
|         5T6|  No PORT Code (5T6)|
|         74S|  No PORT Code (74S)|
|         888|UNIDENTIFED AIR /...|
|         A2A|  No PORT Code (A2A)|
|         ABE|            ABERDEEN|
|         ABG|              ALBURG|
|         ABQ|         ALBUQUERQUE|
|         ABS|      ALBURG SPRINGS|
|         ACY|POMONA FIELD - AT...|
|         ADS|ADDISON AIRPORT- ...|
|         ADT|         AMISTAD DAM|
|         ADU|  No PORT Code (ADU)|
|         ADW|         ANDREWS AFB|
|         AFW| FORT WORTH ALLIANCE|
|          AG|   No PORT Code (AG)|
|         AG0|            MAGNOLIA|
+------------+--------------------+
only showing top 20 rows



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

##### 2.1: I94 Data Exploration & Cleaning

In [21]:
# Reading i94 data and getting only data slice for immigrant international students.

student_immigration_df = i94_df.filter(i94_df.i94visa == 3)

student_immigration_df.count()

43366

In [22]:
# Checking for columns with Null values

from pyspark.sql.functions import col, isnan, when, count

student_immigration_df_nullCount = student_immigration_df.select(
                    [count(when(isnan(column_title) | col(column_title).isNull(), column_title)).alias(f'NullCount_{column_title}') for column_title in student_immigration_df.columns]
                    ).toPandas()

In [23]:
student_immigration_df_nullCount.T

Unnamed: 0,0
NullCount_cicid,0
NullCount_i94yr,0
NullCount_i94mon,0
NullCount_i94cit,0
NullCount_i94res,0
NullCount_i94port,0
NullCount_arrdate,0
NullCount_i94mode,2
NullCount_i94addr,3126
NullCount_depdate,13057


In [24]:
# Dropping rows with Null values in  columns critical for analyzing immigrant student statistics

student_immigration_df = student_immigration_df[student_immigration_df.i94mode.isNotNull()]
student_immigration_df = student_immigration_df[student_immigration_df.i94bir.isNotNull()]
student_immigration_df = student_immigration_df[student_immigration_df.entdepa.isNotNull()]
student_immigration_df = student_immigration_df[student_immigration_df.biryear.isNotNull()]
student_immigration_df = student_immigration_df[student_immigration_df.gender.isNotNull()]

from pyspark.sql import functions

student_immigration_df = student_immigration_df.withColumn("arrdate_parsed", functions.expr("date_add('1960-1-1', arrdate)"))
student_immigration_df = student_immigration_df.withColumn("depdate_parsed", functions.expr("date_add('1960-1-1', depdate)"))

student_immigration_df.count()

43361

In [25]:
student_immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

##### 2.2: Weather Data Check

In [17]:
weather_df = weather_df.filter(weather_df.AverageTemperature.isNotNull())
weather_df.count()

8235082

In [18]:
weather_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [19]:
# Checking for columns with Null values

from pyspark.sql.functions import col, isnan, when, count

weather_df_nullCount = weather_df.select(
                    [count(when(isnan(column_title) | col(column_title).isNull(), column_title)).alias(f'NullCount_{column_title}') for column_title in weather_df.columns]
                    ).toPandas().T

In [20]:
weather_df_nullCount

Unnamed: 0,0
NullCount_dt,0
NullCount_AverageTemperature,0
NullCount_AverageTemperatureUncertainty,0
NullCount_City,0
NullCount_Country,0
NullCount_Latitude,0
NullCount_Longitude,0


##### 2.3: US City Data Cleaning/Check Integrity

In [21]:
us_city_df.filter(us_city_df['Median Age'].isNull()).count()

0

##### 2.4: Visa Post Data Check/Cleaning

In [22]:
visa_port_table.shape

(659, 2)

In [23]:
# visa_post_df.head(5)

In [24]:
# # Converting 4 column table into 2 column table.

# visa_post_df_1 = visa_post_df[[0,1]]
# visa_post_df_2 = visa_post_df[[2,3]]
# column_headers = ['post', 'code']

# visa_post_df_1.columns = column_headers
# visa_post_df_2.columns = column_headers
# visa_post_df = pd.concat([visa_post_df_1, visa_post_df_2], axis=0, ignore_index=True).dropna()

# visa_post_df.drop(labels=0, axis=0, inplace=True)
# visa_post_df.shape

In [25]:
visa_port_table.tail(3)

Unnamed: 0,i94post_code,entry_post_location
656,YUM,YUMA
657,YXE,SASKATOON
658,ZZZ,MEXICO Land (Banco de Mexico)


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model



The data is organized into one fact and six dimension tables.
1.  **Fact table - Arrivals Table**: This table records all arrivals data for students. This table contains *'cicid'* - immigrant id, *'arrdate'* - arrival date, *'i94yr'* - arrival year, *'i94mon'* - arrival month, *'entdepa'* - arrival flag, *'i94port'* - port of arrival, *'i94mode'* - entry mode, *'airline'* - carrier for arrival by air, *'fltno'* - Flight number, *'admnum'* - admission number.

2. **Dimension table - Student Table**: This table has info about students. Colums of this table: *'cicid'* - immigrant id, *'arrdate_parsed'* - arrival date (parsed), *'i94cit'* - immigrant citizenship, *'i94res'* - immigrant's country of residence, *'i94port'* - port of entry, *'i94mode'* - mode of entry, *'i94addr'* - immigrant address, *'depdate_parsed'* - parsed departure date, *'i94bir'* - immigrant birth year, *'i94visa'* - immigrant visa category, *'visapost'* - immigrant visa issuing post, *'dtadfile'* - date added to i94 file, *'entdepa'* - arrival flag,*'entdepd'* - departure flag, *'entdepu'* - update flag, *'matflag'* - match flag, *'biryear'* - birth year, *'dtaddto'* - admitted until (date of expected departure), *'gender'* - immigrant gender, *'insnum'* - INS number, *'visatype'* - visa type.

3. **Dimension table - City Table**: Demographic information about US cities.

4. **Dimension table - Weather Table**: Daily (historical) weather information of cities.

5. **Dimension table - Climate Table**: Monthly climate of all cities in the Weather Table. Derived by averaging historical data in the Weather Table.

6. **Dimension Table - Visa Post Code Table**: Table mapping visa post codes into visa issuing port location (US embassy/consulate location).

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

In [19]:
from pyspark.sql.functions import col

city_table = us_city_df.select('City', 'State', 'State Code' ,'Total Population', 'Foreign-born', 'Race', 'Count')
city_table = city_table.withColumn('Count', city_table['Count'].cast('int')). \
                        withColumn('Total Population', city_table['Total Population'].cast('int')). \
                        withColumn('Foreign-born', city_table['Foreign-born'].cast('int'))
city_table.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [20]:
from pyspark.sql import functions

pv_table_city = city_table.groupBy('City').pivot('Race').sum('Count')

In [21]:
city_table_pv = pv_table_city.join(city_table.select('City', 'State', 'Total Population', 'Foreign-born').drop_duplicates(), 
             on=['City'], 
             how='left')

In [22]:
city_table_pv = city_table_pv.select('City', 
                     'State',
                      city_table_pv['American Indian and Alaska Native'].alias('American_Indian_and_Alaska_Native'),
                      'Asian',
                      city_table_pv['Black or African-American'].alias('Black_or_African-American'),
                      city_table_pv['Hispanic or Latino'].alias('Hispanic_or_Latino'),
                      'White',
                      city_table_pv['Total Population'].alias('Total_Population'),
                      city_table_pv['Foreign-born'].alias('Foreign_born')
                    )
city_table_pv.show(4)

+------------+--------------+---------------------------------+-----+-------------------------+------------------+------+----------------+------------+
|        City|         State|American_Indian_and_Alaska_Native|Asian|Black_or_African-American|Hispanic_or_Latino| White|Total_Population|Foreign_born|
+------------+--------------+---------------------------------+-----+-------------------------+------------------+------+----------------+------------+
|   Worcester| Massachusetts|                             1917|15932|                    27491|             39000|131898|          184806|       36907|
|       Tyler|         Texas|                             1057| 2543|                    26156|             21536| 72728|          103705|        8225|
|Saint George|          Utah|                             2406| 1649|                     1376|             10829| 71915|           80207|        4824|
|  Charleston|South Carolina|                              633| 2773|                   

In [23]:
city_table_pv.columns

['City',
 'State',
 'American_Indian_and_Alaska_Native',
 'Asian',
 'Black_or_African-American',
 'Hispanic_or_Latino',
 'White',
 'Total_Population',
 'Foreign_born']

In [24]:
weather_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [25]:
from pyspark.sql.types import DateType

weather_df = weather_df.withColumn('date', weather_df.dt.cast(DateType()))

In [26]:
from pyspark.sql.functions import year, month, desc

weather_df = weather_df.withColumn('year', year('date')). \
                        withColumn('month', month('date'))

In [27]:
weather_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [28]:
from pyspark.sql.functions import avg, max, min, round
climate_table = weather_df.groupby('city', 'month', 'country').agg(round(avg('AverageTemperature'), 2).alias('AvgTempByMonth'),
                               round(max('AverageTemperature'), 2).alias('AvgMaxTempInMonth'),
                               round(min('AverageTemperature'), 2).alias('AvgMinTempInMonth')).\
                                sort(['city', 'month'], ascending=True)

In [29]:
climate_table.filter(climate_table.country == 'Pakistan').select('city').drop_duplicates().count()

59

In [27]:
student_immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [26]:
student_immigration_df = student_immigration_df.join(visa_port_table, 
                            student_immigration_df.i94port == visa_port_table.i94post_code,
                           how = 'left')

In [30]:
# student_immigration_df.show()

In [31]:
arrivals_table = student_immigration_df.select('cicid', 'arrdate_parsed', 'i94yr', 'i94mon', 'entdepa', 'i94port', 'entry_post_location',
                                              'i94mode', 'airline', 'fltno', 'admnum')
arrivals_table.show()

+--------+--------------+------+------+-------+-------+-------------------+-------+-------+-----+--------------+
|   cicid|arrdate_parsed| i94yr|i94mon|entdepa|i94port|entry_post_location|i94mode|airline|fltno|        admnum|
+--------+--------------+------+------+-------+-------+-------------------+-------+-------+-----+--------------+
| 79772.0|    2016-04-01|2016.0|   4.0|      G|    FMY|         FORT MYERS|    1.0|     PR|00102|9.251260993E10|
| 82086.0|    2016-04-01|2016.0|   4.0|      G|    FMY|         FORT MYERS|    1.0|     CI|00006|9.249853903E10|
| 82087.0|    2016-04-01|2016.0|   4.0|      G|    FMY|         FORT MYERS|    1.0|     DL|00938|9.250404273E10|
| 82266.0|    2016-04-01|2016.0|   4.0|      G|    FMY|         FORT MYERS|    1.0|     CX|00880|9.242060593E10|
| 82267.0|    2016-04-01|2016.0|   4.0|      G|    FMY|         FORT MYERS|    1.0|     CX|00880|9.242061403E10|
|241744.0|    2016-04-02|2016.0|   4.0|      T|    FMY|         FORT MYERS|    1.0|     DL|  109

In [32]:
student_table = student_immigration_df.select('cicid', 'arrdate_parsed', 'i94cit', 'i94res', 'i94port', 'entry_post_location',
                                              'i94mode', 'i94addr', 'depdate_parsed', 'i94bir', 'i94visa', 
                                              'visapost', 'dtadfile', 'entdepa','entdepd', 'entdepu', 
                                              'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'visatype')

In [34]:
student_table.show()

+--------+--------------+------+------+-------+-------------------+-------+-------+--------------+------+-------+--------+--------+-------+-------+-------+-------+-------+-------+------+------+--------+
|   cicid|arrdate_parsed|i94cit|i94res|i94port|entry_post_location|i94mode|i94addr|depdate_parsed|i94bir|i94visa|visapost|dtadfile|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|visatype|
+--------+--------------+------+------+-------+-------------------+-------+-------+--------------+------+-------+--------+--------+-------+-------+-------+-------+-------+-------+------+------+--------+
| 79772.0|    2016-04-01| 260.0| 260.0|    FMY|         FORT MYERS|    1.0|     CA|    2016-04-28|  18.0|    3.0|     MNL|20160401|      G|      O|   null|      M| 1998.0|    D/S|     F|  null|      F1|
| 82086.0|    2016-04-01| 263.0| 263.0|    FMY|         FORT MYERS|    1.0|     CA|    2016-07-04|  21.0|    3.0|     BNK|20160401|      G|      O|   null|      M| 1995.0|    D/S|     F|  

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [33]:
# Checking table sizes.
    
print('Student table size is:', student_table.count(), 'x' , len(student_table.columns))
print('Arrivals table size is: ', arrivals_table.count(), 'x' , len(arrivals_table.columns))
print('Weather table size is:', weather_df.count(), 'x' , len(weather_df.columns))
print('Climate table size is: :', climate_table.count(), 'x' , len(climate_table.columns))
print('City table size is: :', city_table_pv.count(), 'x' , len(city_table_pv.columns))

Student table size is: 43361 x 21
Arrivals table size is:  43361 x 10
Weather table size is: 8235082 x 10
Climate table size is: : 41880 x 6
City table size is: : 596 x 9


In [35]:
output_dir = 'analytics_data/'

In [35]:
weather_df.write.mode('append').parquet(output_dir + 'weather_table.parquet')

In [36]:
climate_table.write.mode('append').parquet(output_dir + 'climate_table.parquet')

In [37]:
city_table_pv.write.mode('append').parquet(output_dir + 'city_table.parquet')

In [36]:
student_table.write.mode('append').parquet(output_dir + 'student_table.parquet')

In [37]:
arrivals_table.write.mode('append').parquet(output_dir + 'arrivals_table.parquet')

In [31]:
# visa_port_table = spark.createDataFrame(visa_port_table)
visa_port_table.write.mode('append').parquet(output_dir + 'visa_post_table.parquet')

#### 4.2 Data Quality Checks

##### 4.2.1 Checking data quality - Weather table

In [64]:
# Read from parquet

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

weather_read = spark.read.parquet('analytics_data/weather_table.parquet')

In [65]:
# Check schema for proper column types.

weather_read.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



##### *Defining functions to check table(s)*

In [66]:
# Funcions to check tables.
from pyspark.sql.functions import col, isnan, when, count

def check_table_pk_entry_presence(pk_col, dataframe):
    """ This function checks for entries in the table. If no entries are present, ValueError is raised.
    Input:
        pk_column: primary key column
        dataframe: pyspark dataframe
    """
    if dataframe.select(pk_col).count() < 1:
        raise ValueError('No entries present in the table.')

def check_key_null_presence(col_of_interest, dataframe):
    """ This function checks for null entries at the column of interest in the table. ValueError is raised, if null entries are present.
    Input:
        col_of_interest: Column of interest to check for presence of null entries. If null entries are present, ValueError will be raised.
        dataframe: pyspark dataframe
    """
    num_entries = dataframe.select(
                    count(when(isnan(col_of_interest) | col(col_of_interest).isNull(), col_of_interest)).alias(f'NullCount_{col_of_interest}')
                    ).collect()[0][0]
    if num_entries > 0:
        raise ValueError(f'Null entries are present in the table: {dataframe}, column: {col_of_interest}')

In [67]:
# Checking weather table

print('Checking table for entries.')
check_table_pk_entry_presence(pk_col= 'dt', dataframe= weather_read)
print('Checking table for the presence of Null entries in colums where null is not expected.')
check_key_null_presence(col_of_interest='AverageTemperature', dataframe=weather_read)

Checking table for entries.
Checking table for the presence of Null entries in colums where null is not expected.


##### 4.2.2 Check data quality - climate table

In [5]:
climate_read = spark.read.parquet('analytics_data/climate_table.parquet/')

In [6]:
climate_read.printSchema()

root
 |-- city: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- AvgTempByMonth: double (nullable = true)
 |-- AvgMaxTempInMonth: double (nullable = true)
 |-- AvgMinTempInMonth: double (nullable = true)



In [59]:
# Check for entries.

print('Checking table for entries.')
check_table_pk_entry_presence(pk_col= 'city', dataframe= climate_read)
print('Checking table for the presence of Null entries in colums where null is not expected.')
check_key_null_presence(col_of_interest='City', dataframe=climate_read)

Checking table for entries.
Checking table for the presence of Null entries in colums where null is not expected.


##### 4.2.3 Checking data quality - city table

In [7]:
city_read = spark.read.parquet('analytics_data/city_table.parquet/')

In [8]:
# Check schema

city_read.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- American_Indian_and_Alaska_Native: long (nullable = true)
 |-- Asian: long (nullable = true)
 |-- Black_or_African-American: long (nullable = true)
 |-- Hispanic_or_Latino: long (nullable = true)
 |-- White: long (nullable = true)
 |-- Total_Population: integer (nullable = true)
 |-- Foreign_born: integer (nullable = true)



In [63]:
# Check for entries.

print('Checking table for entries.')
check_table_pk_entry_presence(pk_col= 'city', dataframe= city_read)
print('Checking table for the presence of Null entries in colums where null is not expected.')
check_key_null_presence(col_of_interest='City', dataframe=city_read)

Checking table for entries.
Checking table for the presence of Null entries in colums where null is not expected.


##### 4.2.4 Check data quality - arrivals table

In [9]:
arrivals_read = spark.read.parquet('analytics_data/arrivals_table.parquet/')

In [10]:
# Check schema

arrivals_read.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- arrdate_parsed: date (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- admnum: double (nullable = true)



In [68]:
# Check for entries.

print('Checking table for entries.')
check_table_pk_entry_presence(pk_col= 'cicid', dataframe= arrivals_read)
print('Checking table for the presence of Null entries in colums where null is not expected.')
check_key_null_presence(col_of_interest='cicid', dataframe=arrivals_read)

Checking table for entries.
Checking table for the presence of Null entries in colums where null is not expected.


##### 4.2.5 Check data quality - students table

In [69]:
students_read = spark.read.parquet('analytics_data/student_table.parquet/')

In [70]:
# Check schema

students_read.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- arrdate_parsed: date (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- entry_post_location: string (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate_parsed: date (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- visapost: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- visatype: string (nullable = true)



In [72]:
# Check for entries.

print('Checking table for entries.')
check_table_pk_entry_presence(pk_col= 'cicid', dataframe= students_read)
print('Checking table for the presence of Null entries in colums where null is not expected.')
check_key_null_presence(col_of_interest='cicid', dataframe=students_read)

Checking table for entries.
Checking table for the presence of Null entries in colums where null is not expected.


##### 4.2.6 Check data quality - visa post table

In [13]:
visa_post_read = spark.read.parquet('analytics_data/visa_post_table.parquet/')

In [14]:
visa_post_read.printSchema()

root
 |-- i94post_code: string (nullable = true)
 |-- entry_post_location: string (nullable = true)



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [68]:
%%html
<style>
  table {margin-left: 0 !important;}
</style>
# Defining table display style


**Arrivals Table** - Source of Data: i94 record from DHS, US-gov

| Column | Data Type | Description| key type |
| ---    | ---       | ---        | |
| cicid  | double    | arrival id  | primary key|
| arrdate_parsed   | date    | date of arrival  | |
| i94yr  | double    | year of arrival  | |
| i94mon  | double    | month of arrival  | |
| entdepa  | string    | arrival flag  | |
| i94port  | string    | port of entry; 3-letter code  | |
| entry_port_location  | string    | port of entry; full city name  | foreign-key to city table, weather table |
| i94mode  | double    | mode of entry  | |
| airline  | string    | arrival airline  | |
| fltno  | string    | arrival airline flight number | |
| admnum  | double    | admission number  | |


**Student Table** - Source of Data: i94 record from DHS, US-gov

| Column | Data Type | Description| key type |
| ---    | ---       | ---        | |
| cicid  | double    | arrival id  | primary key, also foreign key for Arrivals table|
| arrdate_parsed   | date    | date of arrival  | |
| i94cit  | double    | Code for immigrant citizenship country  | |
| i94res  | double    | Code for immigrant residency country  | |
| i94port  | string    | immigrant port of entry; 3-letter code  | |
| entry_port_location  | string    | port of entry; full city name  | foreign-key to city table, weather table |
| i94mode  | double    | mode of entry  | |
| i94addr  | string    | immigrant us address  | |
| depdate_parsed   | date    | date of departure  | |
| i94bir  | double    | birth year  | |
| i94visa  | double    | Visa type  | |
| visapost  | string    | visa issuing post  | foreign-key visa post table |
| dtadfile  | string    | date added to i94  | |
| entdepa  | string    | arrival flag  | |
| entdepd  | string    | departure flag  | |
| entdepu  | string    | update flag  | |
| matflag  | string    | match flag  | |
| biryear  | double    | birth year  | |
| dtaddto  | string    | expected departure date  | |
| gender  | string    | gender  | |
| insnum  | string    | ISN number | |
| visatype  | string    | Visa type  | |


**City Table** - Source of Data: City demographis data from OpenSoft.

| Column | Data Type | Description| key type   |
| ---    | ---       | ---        |  |
| City  | string    | city name  | primary key |
| State   | string    | state name  |  |
| American_Indian_and_Alaska_Native  | long    | Number of America Indian and Alaska Native  |  |
| Asian  | long    |  Number of Asians   |  |
| Black_or_African-American | long | Number of Black/or African-americans |  |
| Hispanic_or_Latino | long | Hispanic or Latino |  |
| White  | string    | Number of White  |  |
| Total_Population  | integer    | total population of city  |  |
| Foreign_born  | integer    | Foreign_born population of city  |  |

 
 **Weather Table**  - Source of Data: City weather data from Kaggle

| Column | Data Type | Description| key type |
| ---    | ---       | ---        |  |
| dt  | string    | date of record  | primary key, foreign key to arrivals table (arrdate_parsed) |
| AverageTemperature  | string    | Average daily temperature |  |
| City  | string    | city name  |  |
| Country  | string    | country name  |  |
| Latitude  | string    | city latitude  |  |
| Longitude  | string    | city longitude  |  |
| date  | date    | date of record, date object  |  |
| year  | integer    |  year   |  |
| month  | integer    |  month   |  |

 
 **Climate Table**  - Source of Data: Data from processing Weather table

| Column | Data Type | Description| key type   |
| ---    | ---       | ---        |  |
| city  | string    | city name  | primary key, foreign key for student table, visa post table, weather table |
| month  | integer    |  month   |  |
| country   | string    | country name  |  |
| AvgTempByMonth  | double    | Average temperature (of daily average temp.) by month  |  |
| AvgMaxTempInMonth  | double    | Average temperature (of daily maximum temp.) by month  |  |
| AvgMinTempInMonth  | double    | Average temperature (of daily minimum temp.) by month  |  |


**Visa post table**  - Source of Data: From US gov Dept. of State website.

| Column | Data Type | Description|key type   |
| ---    | ---       | ---        |  |
| entry_post_location   | string    | city name  | foreign-key for city table |
| i94post_code   | string    | 3-letter code for the city of issuing  | primary key, foreign key for students and arrival tables |


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

##### 5.1 Rationale for choice of tools and technologies

Data was loaded using PySpark and/or pandas.
* ETL of small datasets were performed using pandas due to its flexibility (esp. when the dataset can be easily loaded into RAM).
* Larger datasets were loaded, transformed in PySpark. PySpark is the preferred tool for the ETL of large datasets. PySpark can utilize distributed computing to analyze large datasets.
* After ETL, here, data was stored in parquet format. Parquet format is a compressed format that is widely read by commonly utilized large data analytics tools.

##### 5.2 Data update frequency

* I94 data will be updated monthly (or at the frequency US gov releases I94 data).
* Weather data update frequency will be based on I94 data update frequency.
* Climate data update will be yearly.
* Visa post data - whenever there is a new entry in i94 that does not correspond to current visa post table code(s).
* City demographics data - when new census is released.

##### 5.2 Future scenerio planning

*Data increased by 100x*

Proper data partitioning scheme has to be implemented. For I94 data tables, we can partition data by year of entry (it is likely that most of the analytics will look into student intake year-by-year basis). For student data table, we can partition data by year of arrival.

If partition does not work, then we can use hadoop/Amazon EMR clusters where data will be partitioned and distiributed to multiple nodes. Partitioned data (to each nodes) will be analyzed in-parallel and aggregated using Apache PySpark.

*Daily update by 7am every day.*

The ETL pipeline can be implemented in Apache Airflow to allow scheduled run at defined times.

*Database needed to be accessed by 100+ people*

Data lake can be transformed into Amazon Redshift DB. Redshift can easly manage/scale such loads. Also, if the business requirement for the clients are different, then database design can be optimized for each of the client-sets (with database designs optimized for different tasks).


##### 5.3 Example queries

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

students_read = spark.read.parquet('analytics_data/student_table.parquet/')
arrivals_read = spark.read.parquet('analytics_data/arrivals_table.parquet/')
weather_read = spark.read.parquet('analytics_data/weather_table.parquet')
city_read = spark.read.parquet('analytics_data/city_table.parquet/')
climate_read = spark.read.parquet('analytics_data/climate_table.parquet/')
visa_post_read = spark.read.parquet('analytics_data/visa_post_table.parquet/')

In [4]:
students_read.createOrReplaceTempView('student_table')
arrivals_read.createOrReplaceTempView('arrivals_table')
weather_read.createOrReplaceTempView('weather_table')
city_read.createOrReplaceTempView('city_table')
climate_read.createOrReplaceTempView('climate_table')
visa_post_read.createOrReplaceTempView('visa_post_table')

##### 5.3.1 International students and arrival city demographics.

In [62]:
# All city avg foreign born percentage

spark.sql("select round(avg(pct_foreign_born),2) as city_avg_foreing_born_percentage from \
          (select (Foreign_born/total_population*100) as pct_foreign_born from city_table)").show()

+--------------------------------+
|city_avg_foreing_born_percentage|
+--------------------------------+
|                           18.25|
+--------------------------------+



In [60]:
spark.sql(" SELECT l.entry_post_location, l.num_students, round((r.Foreign_born/r.total_population*100),2) as pct_foreign_born \
            FROM \
              (SELECT entry_post_location, count(cicid) as num_students \
              FROM arrivals_table \
              GROUP BY entry_post_location) as l \
            INNER JOIN city_table as r \
            on upper(l.entry_post_location)= upper(r.City) \
            ORDER BY num_students desc \
            LIMIT 20").show()

+-------------------+------------+----------------+
|entry_post_location|num_students|pct_foreign_born|
+-------------------+------------+----------------+
|           NEW YORK|        6122|           37.57|
|        LOS ANGELES|        5549|            37.4|
|            CHICAGO|        2800|           21.08|
|              MIAMI|        2686|           59.14|
|      SAN FRANCISCO|        2414|           34.37|
|             BOSTON|        2241|            28.4|
|            SEATTLE|        1699|           17.51|
|            HOUSTON|        1318|           30.29|
|    FORT LAUDERDALE|        1201|           26.64|
|            ATLANTA|        1162|             6.9|
|             DALLAS|        1139|           25.14|
|            DETROIT|        1009|            5.89|
|            ORLANDO|         983|           18.66|
|            PHOENIX|         574|           19.24|
|          LAS VEGAS|         563|           20.46|
|       PHILADELPHIA|         431|            13.1|
|          V

Summarized statistics from the city table indicates that ~18.25% of major US cities are foreign-born.

A join query on arrivals_table and city_table suggests that top cities where immigrants arrive have >18.25% foreign-born population - suggesting international students often prefer cities with high-foreign-born population.

*Point to note*: This enrichment may also be due to high-number of educational institutes in these cities. Education institute/student enrolment data (if available) will shed light into this issue.