# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [23]:
!pip install pyspark
#!pip install pyarrow

You should consider upgrading via the '/Library/Frameworks/Python.framework/Versions/3.9/bin/python3.9 -m pip install --upgrade pip' command.[0m


In [28]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.functions import lower, col
from pprint import pprint
import configparser
from datetime import datetime
import pyarrow.parquet as pq
import re
import os, sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col



import pyspark.sql.functions as F
pd.set_option('display.width',170, 'display.max_rows',200, 'display.max_columns',900)

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
##### The immigration data comes form US National Tourism and Trade Office. 

In [51]:
#Data source
air_data = "airport-codes_csv.csv"
city_data="us-cities-demographics.csv"
temp_file="weather/GlobalLandTemperaturesByCity.csv"
sas="I94_SAS_Labels_Descriptions.SAS"

In [30]:
spark = SparkSession.builder \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport() \
    .getOrCreate()

In [31]:
# Get the imgration data
df_img = pq.ParquetDataset('sas_data').read_pandas().to_pandas(split_blocks=True, self_destruct=True)

In [32]:
# Explor the Data
df_img.head(2)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1


#### Data Dictionary that we are interesting in:
##### Immigration Data
- cicid - float64 - ID that is unique, identify the dataset
- i94yr - float64 - 4 digit year
- i94cit - float64 - 3 digit code of immigrant born country
- i94res - float64 - 3 digit code of immigrant country of residence
- arrdate - float64 - Arrival date in the USA
- i94port = 3 character code of destination USA city
- i94mode - float64 - Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
- biryear - float64 - 4 digit year of birth
- i94visa - float64 - Visa codes modified into: (B1, E2 = Business; B2 = Pleasure; F1 = Student, WB, WT= Three months visit, others = the rest)
- gender - 
- fltno - object - Flight number of Airline used to arrive in U.S.
- visatype - object - Class of admission legally admitting the non-immigrant to temporarily stay in U.S. 

In [33]:
# select the data and show missing value
new_df_img=df_img[['cicid', 'i94yr', 'i94cit', 'i94port','i94res', 'arrdate', 'i94mode', 'biryear','i94visa', 'gender', 'fltno', 'visatype']] 
(new_df_img.isna().sum() /len(new_df_img)) * 100


cicid        0.000000
i94yr        0.000000
i94cit       0.000000
i94port      0.000000
i94res       0.000000
arrdate      0.000000
i94mode      0.007719
biryear      0.025902
i94visa      0.000000
gender      13.379429
fltno        0.631364
visatype     0.000000
dtype: float64

In [34]:
new_df_img[['i94cit', 'i94port','i94res', 'arrdate', 'i94mode', 'biryear','i94visa', 'gender']].head(2)

Unnamed: 0,i94cit,i94port,i94res,arrdate,i94mode,biryear,i94visa,gender
0,692.0,XXX,692.0,20573.0,,1979.0,2.0,
1,254.0,ATL,276.0,20551.0,1.0,1991.0,3.0,M


In [35]:
# Cleaning: replace visatype values into 4 catogories: (B1, E2 = Business; B2 = Pleasure; F1 = Student, WB, WT= Three months visit, others = the rest)
new_df_img['visatype']= new_df_img['visatype'].replace(['B1', 'E2'], 'Business')
new_df_img['visatype']= new_df_img['visatype'].replace('B2', 'Pleasure')
new_df_img['visatype']= new_df_img['visatype'].replace('F1', 'Student')
new_df_img['visatype']= new_df_img['visatype'].replace(['WB','WT'], '3 Months Visa')
new_df_img['visatype']= new_df_img['visatype'].replace(['WB','WT'], '3 Months Visa')
new_df_img['visatype']= new_df_img['visatype'].replace(['GMT','CP', 'E1','I', 'F2','M1', 'I1', 'GMB', 'M2', 'SBP', 'CPL'], 'Others')

new_df_img.visatype.value_counts()

3 Months Visa    1592042
Pleasure         1117897
Business          231793
Others            115565
Student            39016
Name: visatype, dtype: int64

In [36]:
new_df_img.gender.value_counts()

M    1377224
F    1302743
X       1610
U        467
Name: gender, dtype: int64

In [37]:
new_df_img.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 12 columns):
 #   Column    Dtype  
---  ------    -----  
 0   cicid     float64
 1   i94yr     float64
 2   i94cit    float64
 3   i94port   object 
 4   i94res    float64
 5   arrdate   float64
 6   i94mode   float64
 7   biryear   float64
 8   i94visa   float64
 9   gender    object 
 10  fltno     object 
 11  visatype  object 
dtypes: float64(8), object(4)
memory usage: 283.5+ MB


In [38]:
# Collect data from SAS file(port_code, port_city, port_state) 
with open(sas) as f:
    lines = f.readlines()    

comments = [line for line in lines if '/*' in line and '*/\n' in line]
regexp = re.compile(r'^/\*\s+(?P<code>.+?)\s+-\s+(?P<description>.+)\s+\*/$')
matches = [regexp.match(c) for c in comments]

for m in matches:
    print(m.group("code"), ":", m.group('description'))

I94YR : 4 digit year
I94MON : Numeric month
I94CIT & I94RES : This format shows all the valid and invalid codes for processing
I94PORT : This format shows all the valid and invalid codes for processing
I94MODE : There are missing values as well as not reported (9)
I94BIR : Age of Respondent in Years
COUNT : Used for summary statistics
DTADFILE : Character Date Field - Date added to I-94 Files - CIC does not use
VISAPOST : Department of State where where Visa was issued - CIC does not use
OCCUP : Occupation that will be performed in U.S. - CIC does not use
ENTDEPA : Arrival Flag - admitted or paroled into the U.S. - CIC does not use
ENTDEPD : Departure Flag - Departed, lost I-94 or is deceased - CIC does not use
ENTDEPU : Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use
MATFLAG : Match flag - Match of arrival and departure records
BIRYEAR : 4 digit year of birth
DTADDTO : Character Date Field - Date to which admitted to U.S. (allowed to stay un

In [46]:
### i94_port_lines = (303, 962)
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
valid_ports = {}
for line in lines[302:961]:
    match = re_obj.search(line)
    valid_ports[match.group(1)]=[match.group(2)]
from pprint import pprint
pprint(valid_ports)


 'BUS': ['Collapsed (BUF) 06/15'],
 'BWA': ['BOUNDARY, WA          '],
 'BWM': ['BRIDGEWATER, ME       '],
 'BYO': ['BAYONNE, NJ           '],
 'BZN': ['GALLATIN FIELD - BOZEMAN, MT'],
 'CAE': ['COLUMBIA, SC #ARPT'],
 'CAK': ['AKRON, OH'],
 'CAL': ['CALEXICO, CA          '],
 'CAN': ['CAPE CANAVERAL, FL    '],
 'CAO': ['CAMPO, CA             '],
 'CAP': ['CAPE VINCENT, NY      '],
 'CAR': ['CARIBOU MUNICIPAL AIRPORT, MN'],
 'CDD': ['CRANE LAKE - ST. LOUIS CNTY, NM'],
 'CHA': ['CHARLOTTE AMALIE, VI  '],
 'CHF': ['CHIEF MT, MT          '],
 'CHI': ['CHICAGO, IL           '],
 'CHL': ['CHARLESTON, SC        '],
 'CHM': ['CHAMPLAIN, NY         '],
 'CHN': ['No PORT Code (CHN)'],
 'CHO': ['ALBEMARLE CHARLOTTESVILLE, VA'],
 'CHR': ['CHRISTIANSTED, VI     '],
 'CHS': ['CHARLESTON, WV        '],
 'CHT': ['CHATEAUGAY, NY        '],
 'CID': ['CEDAR RAPIDS/IOWA CITY, IA'],
 'CIN': ['CINCINNATI, OH        '],
 'CLA': ['CLAYTON, NY           '],
 'CLE': ['CLEVELAND, OH         '],
 'CLG': ['CALGAR

In [39]:
# Get port locations from SAS text file
with open("./I94_SAS_Labels_Descriptions.SAS") as f:
    content = f.readlines()
content = [x.strip() for x in content]
ports = content[302:962]
splitted_ports = [port.split("=") for port in ports]
port_codes = [x[0].replace("'","").strip() for x in splitted_ports]
port_locations = [x[1].replace("'","").strip() for x in splitted_ports]
port_cities = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
df_port_locations = pd.DataFrame({"port_code" : port_codes, "port_city": port_cities, "port_state": port_states})
df_port_locations.head(5)


Unnamed: 0,port_code,port_city,port_state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [40]:
#read weather file
temp_df=pd.read_csv(temp_file)
temp_df.tail(1)
temp_df['City'].value_counts()

Springfield     9545
Worcester       8359
León            7469
Rongcheng       6526
Manchester      6478
                ... 
Machala         1591
Trujillo        1584
Chimbote        1584
Chiclayo        1584
Port Moresby    1581
Name: City, Length: 3448, dtype: int64

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [41]:
# Performing cleaning tasks here
df1 = spark.createDataFrame(new_df_img)

In [44]:
def clean_data(df):  
     temp = df1.filter(df1.i94port.isin(list(valid_ports.keys())))
     return temp

In [75]:
img_df=clean_data(df1)

In [37]:
img_df.show()

+-----+------+------+-------+------+-------+-------+-------+-------+------+-----+--------+
|cicid| i94yr|i94cit|i94port|i94res|arrdate|i94mode|biryear|i94visa|gender|fltno|visatype|
+-----+------+------+-------+------+-------+-------+-------+-------+------+-----+--------+
|  6.0|2016.0| 692.0|    XXX| 692.0|20573.0|    NaN| 1979.0|    2.0|  null| null|Pleasure|
|  7.0|2016.0| 254.0|    ATL| 276.0|20551.0|    1.0| 1991.0|    3.0|     M|00296| Student|
| 15.0|2016.0| 101.0|    WAS| 101.0|20545.0|    1.0| 1961.0|    2.0|     M|   93|Pleasure|
| 16.0|2016.0| 101.0|    NYC| 101.0|20545.0|    1.0| 1988.0|    2.0|  null|00199|Pleasure|
| 17.0|2016.0| 101.0|    NYC| 101.0|20545.0|    1.0| 2012.0|    2.0|  null|00199|Pleasure|
| 18.0|2016.0| 101.0|    NYC| 101.0|20545.0|    1.0| 1959.0|    1.0|  null|00602|Business|
| 19.0|2016.0| 101.0|    NYC| 101.0|20545.0|    1.0| 1953.0|    2.0|  null|00602|Pleasure|
| 20.0|2016.0| 101.0|    NYC| 101.0|20545.0|    1.0| 1959.0|    2.0|  null|00602|Pleasure|

In [57]:
df_temp = spark.read.format("csv").option("header", "true").load(temp_file)

In [53]:
# Create dictionary of valid i94port codes
re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('i94port_valid.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

In [73]:
@udf()
def get_i94port(city):    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

In [61]:
# Cleaning data

def clean_data(temp_df):
    temp_df = temp_df.filter(temp_df.AverageTemperature != 'NaN')
    temp_df = temp_df.dropDuplicates(['City', 'Country'])
    temp_df = temp_df.withColumn("i94port", get_i94port(temp_df.City))
    return temp_df.filter(temp_df.i94port != 'null')


In [62]:
temp_df = clean_data(df_temp)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [74]:
# ## Conceptual Data Model
# #### imigration dim:
img_df.write.mode("append").partitionBy("i94port").parquet("/results/immigration.parquet")

In [None]:
#dim_temperature
temp_df.write.mode("append").partitionBy("i94port").parquet("/results/temperature.parquet")

In [None]:
# Create temporary views of the immigration and temperature data
img_df.createOrReplaceTempView("new_df_img")
temp_df.createOrReplaceTempView("temp_view")

In [None]:
# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT new_df_img.i94yr as year,
       new_df_img.i94mon as month,
       new_df_img.i94cit as city,
       new_df_img.i94port as i94port,
       new_df_img.arrdate as arrival_date,
       new_df_img.depdate as departure_date,
       new_df_img.i94visa as reason,
       df_temp.AverageTemperature as temperature,
       df_temp.Latitude as latitude,
       df_temp.Longitude as longitude
FROM new_df_img
JOIN df_temp ON (new_df_img.i94port = df_temp.i94port)
''')

In [None]:
# Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("/results/fact.parquet")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here
def quality_check(df, description):
    result = df.count()
    
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

In [None]:
# Perform data quality check
quality_check(img_df, "immigration table")
quality_check(temp_df, "temperature table")