# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [37]:
import pandas as pd
import pyspark
from IPython.display import Image

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will integrate I94 immigration data, world temperature data and US demographic data to setup a data warehouse with fact and dimension tables.

#### Data Sets 
1. I94 Immigration Data
2. World Temperature Data
3. U.S. City Demographic Data

#### Tools
- AWS S3: data storage
- Python for data processing
    - Pandas - exploratory data analysis on small data set
    - PySpark - data processing on large data set

#### Describe and Gather Data 

| Data Set | Format | Description |
| --- | --- | --- |
| I94 Immigration Data | SAS | Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries). |
| World Temperature Data | CSV | This dataset is from Kaggle and contains monthly average temperature data at different country in the world wide. |
| U.S. City Demographic Data | CSV | This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. |


### Step 2: Explore and Assess the Data

#### Data Examination
- Employ pandas for initial data analysis to gain insights into these datasets.
- Break down datasets into dimensional tables and modify column names for enhanced clarity.
- Leverage PySpark on one of the SAS datasets to validate the logic of the ETL data pipeline.

#### Immigration Data Investigation
- Import the I94 Immigration dataset into a pandas DataFrame.
- Conduct exploratory data analysis to comprehend the data's structure and content.
- Spot any data that is missing or inconsistent.
- Partition the dataset into dimensional tables for simplified analysis and comprehension.
- Alter column names to more meaningful ones for enhanced understanding.
- Verify the ETL data pipeline logic using PySpark on a data subset.


In [2]:
# Read in the data here
df_immi = pd.read_csv("immigration_data_sample.csv")

In [3]:
df_immi.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
df_immi.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [5]:
fact_immigration = df_immi[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']]
fact_immigration.columns = ['cic_id', 'year', 'month', 'city_code', 'state_code', 'arrive_date', 'departure_date', 'mode', 'visa']
fact_immigration.head(5)

Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0


In [6]:
fact_immigration['country'] = 'United States'
fact_immigration.head(5)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0,United States


In [7]:
dim_immi_personal = df_immi[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']]
dim_immi_personal.columns = [['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']]
dim_immi_personal.head(5)

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,


In [8]:
dim_immi_airline = df_immi[['cicid', 'airline', 'admnum', 'fltno', 'visatype']]
dim_immi_airline.columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
dim_immi_airline.head(5)

Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


In [9]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)

In [10]:
df_temp.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [11]:
df_temp_usa = df_temp[df_temp['Country'] == 'United States']
df_temp_usa = df_temp_usa[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
df_temp_usa.columns = ['dt', 'avg_temp', 'avg_temp_uncertnty', 'city', 'country']
df_temp_usa.head(5)

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country
47555,1820-01-01,2.101,3.217,Abilene,United States
47556,1820-02-01,6.926,2.853,Abilene,United States
47557,1820-03-01,10.767,2.395,Abilene,United States
47558,1820-04-01,17.989,2.202,Abilene,United States
47559,1820-05-01,21.809,2.036,Abilene,United States


In [12]:
df_temp_usa['dt'] = pd.to_datetime(df_temp_usa['dt'])
df_temp_usa['year'] = df_temp_usa['dt'].apply(lambda t: t.year)
df_temp_usa['month'] = df_temp_usa['dt'].apply(lambda t: t.month)
df_temp_usa.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertnty,city,country,year,month
47555,1820-01-01,2.101,3.217,Abilene,United States,1820,1
47556,1820-02-01,6.926,2.853,Abilene,United States,1820,2
47557,1820-03-01,10.767,2.395,Abilene,United States,1820,3
47558,1820-04-01,17.989,2.202,Abilene,United States,1820,4
47559,1820-05-01,21.809,2.036,Abilene,United States,1820,5


In [13]:
df_demog = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df_demog.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [14]:
dim_city_population = df_demog[['City', 'State', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Race']]
dim_city_population.columns = ['city', 'state', 'male_pop', 'female_pop', 'num_vetarans', 'foreign_born', 'race']
dim_city_population.head(5)

Unnamed: 0,city,state,male_pop,female_pop,num_vetarans,foreign_born,race
0,Silver Spring,Maryland,40601.0,41862.0,1562.0,30908.0,Hispanic or Latino
1,Quincy,Massachusetts,44129.0,49500.0,4147.0,32935.0,White
2,Hoover,Alabama,38040.0,46799.0,4819.0,8229.0,Asian
3,Rancho Cucamonga,California,88127.0,87105.0,5821.0,33878.0,Black or African-American
4,Newark,New Jersey,138040.0,143873.0,5829.0,86253.0,White


In [15]:
dim_city_statistics = df_demog[['City', 'State', 'Median Age', 'Average Household Size']]
dim_city_statistics.columns = ['city', 'state', 'median_age', 'avg_household_size']
dim_city_statistics.head(5)

Unnamed: 0,city,state,median_age,avg_household_size
0,Silver Spring,Maryland,33.8,2.6
1,Quincy,Massachusetts,41.0,2.39
2,Hoover,Alabama,38.5,2.58
3,Rancho Cucamonga,California,34.5,3.18
4,Newark,New Jersey,34.6,2.73


In [16]:
import configparser
import os
import logging
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper, col, monotonically_increasing_id
# setup logging 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Read AWS configurations
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
config = configparser.ConfigParser()
config.read('config.cfg')
spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()
df_spark=spark.read.format('com.github.saurfang.sas.spark')\
.load("../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat")


In [18]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")
df_spark.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

### Cleaning Steps
1. **Transform Arrival and Departure Dates**: Convert `arrdate` and `depdate` in the immigration data from SAS time format to pandas datetime format.
2. **Parse I94_SAS_Labels_Descriptions.SAS File**: Extract auxiliary dimension tables such as `country_code`, `city_code`, and `state_code` from the `I94_SAS_Labels_Descriptions.SAS` file.
3. **Transform City and State Data**: Convert `city` and `state` fields in the demographic data to upper case to align with the `city_code` and `state_code` tables.


1. **Transform Arrival and Departure Dates**: Convert `arrdate` and `depdate` in the immigration data from SAS time format to pandas datetime format.

In [19]:
#cleaning tasks

def SAS_to_datetime(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')



In [20]:
fact_immigration['arrive_date'] = SAS_to_datetime(fact_immigration['arrive_date'])
fact_immigration['departure_date'] = SAS_to_datetime(fact_immigration['departure_date'])
fact_immigration.head(5)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Unnamed: 0,cic_id,year,month,city_code,state_code,arrive_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,2016-04-22,2016-04-29,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,2016-04-23,2016-04-24,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,2016-04-07,2016-04-27,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,2016-04-28,2016-05-07,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,2016-04-06,2016-04-09,3.0,2.0,United States


2. **Parse I94_SAS_Labels_Descriptions.SAS File**: Extract auxiliary dimension tables such as `country_code`, `city_code`, and `state_code` from the `I94_SAS_Labels_Descriptions.SAS` file.

In [21]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [22]:
country_code = {}
for countries in contents[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country

In [23]:
df_country_code = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
df_country_code.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [24]:
city_code = {}
for cities in contents[303:962]:
    pair = cities.split('=')
    code, city = pair[0].strip("\t").strip().strip("'"), pair[1].strip('\t').strip().strip("''")
    city_code[code] = city

In [25]:
df_city_code = pd.DataFrame(list(city_code.items()), columns=['code', 'city'])
df_city_code.head(5)

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"


In [26]:
state_code = {}
for states in contents[982:1036]:
    pair = states.split('=')
    code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    state_code[code] = state

In [27]:
df_state_code = pd.DataFrame(list(state_code.items()), columns=['code', 'state'])
df_state_code.head(5)

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


3. **Transform City and State Data**: Convert `city` and `state` fields in the demographic data to upper case to align with the `city_code` and `state_code` tables.

In [28]:
dim_city_statistics['city'] = dim_city_statistics['city'].str.upper()
dim_city_statistics['state'] = dim_city_statistics['state'].str.upper()
dim_city_statistics.head(5)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Unnamed: 0,city,state,median_age,avg_household_size
0,SILVER SPRING,MARYLAND,33.8,2.6
1,QUINCY,MASSACHUSETTS,41.0,2.39
2,HOOVER,ALABAMA,38.5,2.58
3,RANCHO CUCAMONGA,CALIFORNIA,34.5,3.18
4,NEWARK,NEW JERSEY,34.6,2.73


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Since the purpose of this data warehouse is for OLAP and BI app usage, we will model these data sets with star schema data modeling.

Star Schema

![conceptual_data_model.png](img/conceptual_data_model.png)

#### 3.2 Mapping Out Data Pipelines
##### Data Pipeline Build Up Steps

Assume all data sets are stored in S3 buckets as below:
- `[Source_S3_Bucket]/immigration/18-83510-I94-Data-2016/*.sas7bdat`
- `[Source_S3_Bucket]/I94_SAS_Labels_Descriptions.SAS`
- `[Source_S3_Bucket]/temperature/GlobalLandTemperaturesByCity.csv`
- `[Source_S3_Bucket]/demography/us-cities-demographics.csv`

##### Cleaning Step
Follow the cleaning steps outlined in Step 2 to clean up the data sets.

##### Data Transformation
- Transform the immigration data into one fact table and two dimension tables. The fact table will be partitioned by state.
- Parse the label description file to get auxiliary tables.
- Transform the temperature data into a dimension table.
- Split the demography data into two dimension tables.

##### Data Storage
Store these tables back to the target S3 bucket.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Data processing and data model was created by Spark.

Please refer to etl.ipynb.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Data quality checks includes

    1.No empty table after running ETL data pipeline
    2.Data schema of every dimensional table matches data model

 


In [29]:
# Perform quality checks here
import os
import configparser
from pathlib import Path
from pyspark.sql import SparkSession

In [33]:
config = configparser.ConfigParser()
config.read('capstone.cfg', encoding='utf-8-sig')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
SOURCE_S3_BUCKET = config['S3']['SOURCE_S3_BUCKET']
DEST_S3_BUCKET = config['S3']['DEST_S3_BUCKET']

In [31]:
spark = SparkSession.builder\
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
                    .enableHiveSupport().getOrCreate()

In [34]:
s3_bucket = Path(SOURCE_S3_BUCKET)

In [None]:
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        print("Table: " + path.split('/')[-1])
        schema = df.printSchema()

In [None]:
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        record_num = df.count()
        if record_num <= 0:
            raise ValueError("This table is empty!")
        else:
            print("Table: " + path.split('/')[-1] + f" is not empty: total {record_num} records.")

#### 4.3 Data dictionary 
please refer to imgage/data_dictionary

### Step 5: Project Completion Write-Up

#### Tools and Technologies
- **AWS S3**: Used as the primary data storage solution.
- **Pandas**: Deployed for exploratory data analysis on smaller data sets.
- **PySpark**: Leveraged for processing larger data sets and transforming staging tables into dimensional tables.

#### Data Update Frequency
- **Immigration and Temperature Data**: Given that the raw data sets are updated monthly, the corresponding tables should also be updated monthly.
- **Demography Data**: As demographic data collection is time-consuming and potentially costly, these tables could be updated annually.
- All tables should be updated in an append-only manner.

#### Future Design Considerations
- **Data Increase**: If the data increases by 100x and Spark with standalone server mode is insufficient, consider migrating the data to AWS EMR, a distributed data cluster for processing large data sets on the cloud.
- **Daily Dashboard Updates**: Apache Airflow could be utilized to construct an ETL data pipeline for regular data updates and report generation. Apache Airflow integrates well with Python and AWS, allowing for powerful task automation.
- **Database Access**: If the database needs to be accessed by 100+ people, consider migrating the database to AWS Redshift, which can handle up to 500 connections. A cost/benefit analysis would be necessary for this implementation.

#### Future Improvements
- **Data Completeness**: There are several gaps within these data sets. More data collection is needed to build a more comprehensive SSOT database.
- **Data Consistency**: The immigration data set is based on 2016 data, but the temperature data set only goes up to 2013. More recent temperature data is needed for a more accurate comparison.
- **Data Integration**: The label description file is missing state and city information, making it difficult to join immigration tables and demography tables.
