# Data Engineering Capstone Project

#### Project Summary
In this project a data warehouse as a single source of truth database will be built by integrating data from different data sources for data analysis purposes and future backend usage.


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import pyspark

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, I94 immigration data, world temperature data and US demographic data will be used to set up a data warehouse that comprises of dimension tables and facts tables.

#### Describe and Gather Data 

Data Sets
    <br>
    1. [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)
    <br>
    2. [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
    <br>
    3. [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
    
| Data Set | Format | Description |
| ---      | ---    | ---         |
|[I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)| SAS | This data is from US Department of Homeland Security and contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).|
|[World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)| CSV | This dataset is from Kaggle and contains monthly average temperature data at different country in the world.|
|[U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)| CSV | This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.|

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

1. Perform exploratory analysis on the datasets.
2. Subset the datasets into dimension tables.
3. Run pyspark on sas data

#### Explore immigration dataset

In [2]:
# Read in the data here

df_im = pd.read_csv("immigration_data_sample.csv")
df_im.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [3]:
df_im.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [4]:
# create immigration facts table

f_immi = df_im[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']]
f_immi.columns = ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrival_date', 'departure_date', 'mode', 'visa_type_code']
f_immi.head()

Unnamed: 0,cic_id,year,month,city_code,state_code,arrival_date,departure_date,mode,visa_type_code
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0


In [5]:
# create immigration dimensional person table

dim_immi_person = df_im[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum', 'visatype']]
dim_immi_person.columns = ['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num', 'visa_type']
dim_immi_person.head()

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,ins_num,visa_type
0,4084316.0,209.0,209.0,1955.0,F,,WT
1,4422636.0,582.0,582.0,1990.0,M,,B2
2,1195600.0,148.0,112.0,1940.0,M,,WT
3,5291768.0,297.0,297.0,1991.0,M,,B2
4,985523.0,111.0,111.0,1997.0,F,,WT


In [6]:
# create flight dimensional table

dim_immi_flight = df_im[['cicid', 'airline', 'admnum', 'fltno']]
dim_immi_flight.columns = ['cic_id', 'airline', 'admin_num', 'flight_number']
dim_immi_flight.head()

Unnamed: 0,cic_id,airline,admin_num,flight_number
0,4084316.0,JL,56582670000.0,00782
1,4422636.0,*GA,94362000000.0,XBLNG
2,1195600.0,LH,55780470000.0,00464
3,5291768.0,QR,94789700000.0,00739
4,985523.0,,42322570000.0,LAND


In [7]:
f_immi.describe()

Unnamed: 0,cic_id,year,month,arrival_date,departure_date,mode,visa_type_code
count,1000.0,1000.0,1000.0,1000.0,951.0,1000.0,1000.0
mean,3040461.0,2016.0,4.0,20559.68,20575.037855,1.078,1.859
std,1799818.0,0.0,0.0,8.995027,24.211234,0.485955,0.386353
min,13208.0,2016.0,4.0,20545.0,20547.0,1.0,1.0
25%,1412170.0,2016.0,4.0,20552.0,20561.0,1.0,2.0
50%,2941176.0,2016.0,4.0,20560.0,20570.0,1.0,2.0
75%,4694151.0,2016.0,4.0,20567.25,20580.0,1.0,2.0
max,6061994.0,2016.0,4.0,20574.0,20715.0,9.0,3.0


#### Explore temperature data

In [8]:
df_temp = pd.read_csv("../../data2/GlobalLandTemperaturesByCity.csv")
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [9]:
df_temp_usa = df_temp[df_temp['Country'] == 'United States']
df_temp_usa = df_temp_usa[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
df_temp_usa.columns = ['dt', 'avg_temp', 'avg_temp_uncertnty', 'city', 'country']
df_temp_usa.head(5)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country
47555,1820-01-01,2.101,3.217,Abilene,United States
47556,1820-02-01,6.926,2.853,Abilene,United States
47557,1820-03-01,10.767,2.395,Abilene,United States
47558,1820-04-01,17.989,2.202,Abilene,United States
47559,1820-05-01,21.809,2.036,Abilene,United States


In [10]:
df_temp_usa['dt'] = pd.to_datetime(df_temp_usa['dt'])
df_temp_usa['year'] = df_temp_usa['dt'].dt.year
df_temp_usa['month'] = df_temp_usa['dt'].dt.month
df_temp_usa.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,1820,5


In [11]:
df_temp_usa.describe()

Unnamed: 0,avg_temp,avg_temp_uncertnty,year,month
count,661524.0,661524.0,687289.0,687289.0
mean,13.949335,1.08955,1897.201336,6.497065
std,9.173337,1.15068,71.601625,3.450982
min,-25.163,0.04,1743.0,1.0
25%,7.787,0.3,1845.0,3.0
50%,14.922,0.524,1902.0,6.0
75%,21.081,1.646,1958.0,9.0
max,34.379,10.519,2013.0,12.0


#### Explore demographic data

In [12]:
df_demog = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df_demog.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [13]:
df_demog[df_demog['City'] == 'Newark']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
1770,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,Black or African-American,144961
1967,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,Asian,7349
2168,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,American Indian and Alaska Native,2268
2580,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,Hispanic or Latino,100432


In [14]:
# population demension table

dim_city_pop = df_demog[['City', 'State', 'State Code', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born']].drop_duplicates()
dim_city_pop.columns = ['city', 'state', 'state_code', 'num_male', 'num_female', 'num_vetarans', 'num_foreign_born']
dim_city_pop.head()

Unnamed: 0,city,state,state_code,num_male,num_female,num_vetarans,num_foreign_born
0,Silver Spring,Maryland,MD,40601.0,41862.0,1562.0,30908.0
1,Quincy,Massachusetts,MA,44129.0,49500.0,4147.0,32935.0
2,Hoover,Alabama,AL,38040.0,46799.0,4819.0,8229.0
3,Rancho Cucamonga,California,CA,88127.0,87105.0,5821.0,33878.0
4,Newark,New Jersey,NJ,138040.0,143873.0,5829.0,86253.0


In [15]:
# race dimension table

dim_city_race = df_demog[['City', 'State', 'Race', 'Count']]
dim_city_race.columns = ['city', 'state', 'race', 'num']
dim_city_race.sort_values(by = ['city'], inplace = True)
dim_city_race.head()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """


Unnamed: 0,city,state,race,num
2727,Abilene,Texas,Asian,2929
1403,Abilene,Texas,Hispanic or Latino,33222
1533,Abilene,Texas,White,95487
245,Abilene,Texas,American Indian and Alaska Native,1813
2880,Abilene,Texas,Black or African-American,14449


In [16]:
# avg stats dimension table

dim_city_stats = df_demog[['City', 'State', 'Median Age', 'Average Household Size']].drop_duplicates()
dim_city_stats.columns = ['city', 'state', 'median_age', 'avg_household_size']
dim_city_stats.head(5)

Unnamed: 0,city,state,median_age,avg_household_size
0,Silver Spring,Maryland,33.8,2.6
1,Quincy,Massachusetts,41.0,2.39
2,Hoover,Alabama,38.5,2.58
3,Rancho Cucamonga,California,34.5,3.18
4,Newark,New Jersey,34.6,2.73


#### Run SAS Data with Spark

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [30]:
# write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

In [2]:
df_spark.head()

Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2')

#### Data Cleaning

##### 1. transform datetime

In [19]:
def sas_to_date(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

In [20]:
f_immi['arrival_date'] = sas_to_date(f_immi['arrival_date'])
f_immi['departure_date'] = sas_to_date(f_immi['departure_date'])
f_immi.head(5)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Unnamed: 0,cic_id,year,month,city_code,state_code,arrival_date,departure_date,mode,visa_type_code
0,4084316.0,2016.0,4.0,HHW,HI,2016-04-22,2016-04-29,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,2016-04-23,2016-04-24,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,2016-04-07,2016-04-27,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,2016-04-28,2016-05-07,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,2016-04-06,2016-04-09,3.0,2.0


##### 2. Parse description

In [21]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [22]:
country_code = {}
for countries in contents[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country

In [23]:
df_country_code = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
df_country_code.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [24]:
city_code = {}
for cities in contents[303:962]:
    pair = cities.split('=')
    code, city = pair[0].strip("\t").strip().strip("'"), pair[1].strip('\t').strip().strip("''")
    city_code[code] = city

In [25]:
df_city_code = pd.DataFrame(list(city_code.items()), columns=['code', 'city'])
df_city_code.head(5)

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"


In [26]:
state_code = {}
for states in contents[982:1036]:
    pair = states.split('=')
    code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    state_code[code] = state

In [27]:
df_state_code = pd.DataFrame(list(state_code.items()), columns=['code', 'state'])
df_state_code.head(5)

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


In [28]:
# change city info to upper case
dim_city_stats['city'] = dim_city_stats['city'].str.upper()
dim_city_stats['state'] = dim_city_stats['state'].str.upper()

In [29]:
dim_city_stats.head(5)

Unnamed: 0,city,state,median_age,avg_household_size
0,SILVER SPRING,MARYLAND,33.8,2.6
1,QUINCY,MASSACHUSETTS,41.0,2.39
2,HOOVER,ALABAMA,38.5,2.58
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,3.18
4,NEWARK,NEW JERSEY,34.6,2.73


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

#### 3.2 Mapping Out Data Pipelines

Step 3 please refer to README

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

For codes please refer to step 1 and step 2

#### 4.2 Data Quality Checks
 1. The schemas of tables matches those of the data models defined, data types are as expected.
 2. Tables are not empty and do a count of records.
 
 _Sample data quality check with Spark is provided in this notebook_

#### Uniqueness Check

In [7]:
def uniqueness_check(df):
    """
    Count rows of a data table before and after dropping duplicates
    Compare if there is a difference 
    
    Parameters:
    :param df: data frame to check uniqueness on 
    """
    before = df.count()
    after = df.dropDuplicates().count()
    
    if after < before:
        raise ValueError("Table has duplicate rows.")
    else:
        print("Uniqueness check passed.")

In [8]:
# Sample Data Quality Check

uniqueness_check(df_spark)

Uniqueness check passed.


#### Completeness Check

In [10]:
def completeness_check(df):
    """
    Count rows of a table to see if the table is empty
    
    Parameters:
    :param df: data frame to check completeness on
    """
    
    record_num = df.count()
    if record_num <= 0:
        raise ValueError("Empty Table.")
    else:
        print("Completeness check passed." + f"Table has total {record_num} records")

In [11]:
# Sample data quality check

completeness_check(df_spark)

Completeness check passed.Table has total 3096313 records


#### 4.3 Data dictionary 
Please refer to README

#### 4.4 Sample Queries to Answer Questions

In [23]:
# What is the most popular port of entry to US in 2016?

from pyspark.sql.functions import count, desc, col

df_spark.filter(df_spark.i94yr == '2016.0')\
        .select('cicid', 'i94port')\
        .groupBy('i94port')\
        .count()\
        .sort(desc('count'))\
        .head(3)


[Row(i94port='NYC', count=485916),
 Row(i94port='MIA', count=343941),
 Row(i94port='LOS', count=310163)]