# Project Title
### Data Engineering Capstone Project

#### Project Summary
This is the capstone project of the udacity nanodegree for data engineering. The aim of the project is to apply learned skills during the course.

This project will show how to load and transform data from four different data sources, load the data in spark apply quality checks and store the data into a star schema so that it can be used for analysis.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

# Step 1: Scope the Project and Gather Data

### **Scope**

This project pulls data from all the sources and creates fact and dimension tables to show immigration in the US.

### **Describe and Gather Data**

- U.S. City Demographic Data (demog): This contains data by city, state, age, population, veteran status and race.

- I94 Immigration Data (sas_data): This contains data on incoming immigrants and their ports of entry.

- Airport Code Table (airport): This contains data airport codes and corresponding cities.

- Countries (countries): I94_SAS_Labels_Descriptions.sas 

- Visas (visa): from I94_SAS_Labels_Descriptions.sas 

- Immigrant Entry Mode (mode): I94_SAS_Labels_Descriptions.sas

# Step 2: Explore and Assess the Data

In [1]:
import pandas as pd

### Immigrant data

In [2]:
immigrant_dt = 'immigration_data_sample.csv'
df_immigrant = pd.read_csv(immigrant_dt)
df_immigrant.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [3]:
df_immigrant.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

### I94_SAS_Labels_Descriptions.SAS contains these descriptions


| column name| description | type | is used by CIC | personal observations for ETL | 
| --- | --- | --- | --- | --- | 
| cicid |  unique number for the immigrants | int | true | might be primary key  | 
| i94yr | 4 digit year  | float  | true | use int  | 
| i94mon | numeric month | float |true  | use int  | 
| i94cit | 3 digit code of origin city | float | true | use int, if empty remove line, quality check for valid code according to list  in .sas file | 
| i94res | 3 digit code from country from where one has travelled. | float |true | use int, if empty remove line, quality check for valid code according to list  in .sas file  | 
| i94port | 3 character code of destination USA city | varchar  | true | use int, if empty remove line, quality check for valid code according to list  in .sas file |  
| arrdate | date of arrrival in U.S.A | SAS date numeric   | true | It is a SAS date numeric field that a permament format has not been applied.  Please apply whichever date format works for you.   | 
| i94mode | travel code (transportation) | 1 digit | true | 1 = 'Air' 2 = 'Sea' 3 = 'Land' 9 = 'Not reported'  | 
| i94addr | ??? | two digit | true | There is lots of invalid codes in this variable and the list below shows what we have found to be valid, everything else goes into 'other', not sure if we should use it | 
| depdate | the Departure Date from the USA | SAS numeric field |true | Please apply whichever date format | 
|i94bir  |Age of Respondent in Years  | float |true  | use int | 
| i94visa | Visa codes collapsed into three categories | 1 char | true | 1 = Business 2 = Pleasure 3 = Student | 
| count|Used for summary statistics | int | true | not sure if needed |
|dtadfile |Character Date Field - Date added to I-94 Files | |false | |
| visapost| Department of State where where Visa was issued | |false | |
| occup|Occupation that will be performed in U.S. | |false | |
| entdepa| Arrival Flag - admitted or paroled into the U.S. | |false | |
|entdepd |Departure Flag - Departed, lost I-94 or is deceased  | | false| |
|entdepu|Update Flag - Either apprehended, overstayed, adjusted to perm residence||false| |
|matflag|Match flag - Match of arrival and departure records| 1 char|true| not sure if needed |
|biryear | year of birth| 4 digit|true||
|dtaddto|Character Date Field - Date to which admitted to U.S. (allowed to stay until)||false||
|gender| Non-immigrant sex |1 digit|true||
|insnum|INS number|number|true|check how many NaN, and if to skip whole column|
|airline|Airline used to arrive in U.S|varchar|true|check how many NaN, and if to skip whole column|
|admnum| Admission Number ||true| find out what this is|
|fltno| Flight number of Airline used to arrive in U.S|varchar|true||
|visatype| Class of admission legally admitting the non-immigrant to temporarily stay in U.S.||||


"CIC does not use" means that the column has not been used by CIC for analysis. 
This could be interpreted in multiple ways. One interpretation is probably the data are not as clean as those from other columns.
So we will not takeover this data.

In [4]:
print('number of rows:', len(df_immigrant))

number of rows: 1000


In [5]:
df_immigrant.groupby(['biryear'])[['i94yr']].count()

Unnamed: 0_level_0,i94yr
biryear,Unnamed: 1_level_1
1923.0,1
1928.0,1
1929.0,1
1931.0,1
1932.0,1
1933.0,1
1935.0,1
1936.0,2
1938.0,4
1939.0,5


In [6]:
df_immigrant.groupby(['gender']).count()

Unnamed: 0_level_0,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepd,entdepu,matflag,biryear,dtaddto,insnum,airline,admnum,fltno,visatype
gender,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
F,386,386,386,386,386,386,386,386,386,365,...,366,0,366,386,386,12,374,386,384,386
M,471,471,471,471,471,471,471,471,471,449,...,449,0,449,471,471,21,452,471,467,471
X,2,2,2,2,2,2,2,2,2,0,...,1,0,1,2,2,2,1,2,0,2


In [7]:
df_immigrant.groupby(['biryear'])[['i94yr']].count()

Unnamed: 0_level_0,i94yr
biryear,Unnamed: 1_level_1
1923.0,1
1928.0,1
1929.0,1
1931.0,1
1932.0,1
1933.0,1
1935.0,1
1936.0,2
1938.0,4
1939.0,5


### Demography Data

In [8]:
demography_dt = 'us-cities-demographics.csv'
demography_df = pd.read_csv(demography_dt, delimiter=';')

In [9]:
demography_df.head(10)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


In [10]:
print(demography_df.columns)

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')


In [11]:
demography_df.count()

City                      2891
State                     2891
Median Age                2891
Male Population           2888
Female Population         2888
Total Population          2891
Number of Veterans        2878
Foreign-born              2878
Average Household Size    2875
State Code                2891
Race                      2891
Count                     2891
dtype: int64

### Demography data descriptions


| column name| description | type |personal observations for ETL | 
| --- | --- | --- | --- |
| City | name of the city| varchar | | 
| State | name of the state| varchar | | 
| Median Age| median age|float||
| Male population|number of male poppulation|float| use int|
| Female population|number of femal population| float| use int|
|Total population |total population| int| maybe qa check for sum of male and female|
|Number of Veterans|number of veterans| float| can be int, but not needed for immigration data analytics|
|Foreign-born| foreign born| float| use int|
|Average household size |average household size| float||
|State code | state code| varchar| american states, maybe important for dim_location |
|Race| race| string ||
|Count|???| int| maybe some count from other analysis, can be skipped |


### Airport Data

In [12]:
airport_dt = 'airport-codes_csv.csv'
airport_codes_dt = pd.read_csv(airport_dt)
print(airport_codes_dt.head(20))

   ident           type                                name  elevation_ft  \
0    00A       heliport                   Total Rf Heliport          11.0   
1   00AA  small_airport                Aero B Ranch Airport        3435.0   
2   00AK  small_airport                        Lowell Field         450.0   
3   00AL  small_airport                        Epps Airpark         820.0   
4   00AR         closed  Newport Hospital & Clinic Heliport         237.0   
5   00AS  small_airport                      Fulton Airport        1100.0   
6   00AZ  small_airport                      Cordes Airport        3810.0   
7   00CA  small_airport             Goldstone /Gts/ Airport        3038.0   
8   00CL  small_airport                 Williams Ag Airport          87.0   
9   00CN       heliport     Kitchen Creek Helibase Heliport        3350.0   
10  00CO         closed                          Cass Field        4830.0   
11  00FA  small_airport                 Grass Patch Airport          53.0   

### Airport data descriptions

| column name| description | type |personal observations for ETL | 
| --- | --- | --- | --- |
|  ident| unique identifier of airport | Varchar |  |
| type | type of airport  | varchar | could be enum:   heliport, small_airport, medium_airport, closed |
| name | name of airport | varchar |  |
| elevation_ft| elevation in feet | float |  |
| continent | varchar |  | a lot of NaN, maybe skip this column  |
| iso_country | country iso |varchar  |  2 chars|
| iso_region | region iso | varchar |  pattern XX-XX|
| municipality |  municipality | string |  |
| gps_code | gps code |  short varchar | check for NaN  |
| iata_code | iata code |  | as there are a lot of NaN, we will skip this |
| local_code |  |  |  also a lot of NaN, see if should be skipped|
| coordinates | len and lat  | float, float | len and lat as duple |

### Step 3: Define the Data Model

#### 3.1 Conceptual Data Model
The data will be used for adhoc queries and Business analytics Apps. As a result, data shall be represented in a Star schema. 

Star schema is easy to query an easy to understand.

![star schema](schema.png)

#### 3.2 Mapping Out Data Pipelines

* Transformations are done in memory, before writing the data to the tables - one of the benefits of spark

* Load only the needed columns from the immigrant files into a spark dataframe.
* Transform arrival and departure date to timestamps
* Create the fact_immigration table from the loaded immigrant files and write to parquet files.
* Create the dim_immigrant_person table from the loaded immigrant files and write to parquet files.
* Create the dim_time table from the transformed timestamps
* Load the city data and write to parquet files as dim_city

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model. The code can be found in etl.py

#### Configuration to write the Model

In [13]:
%load_ext autoreload
%autoreload 2
import pyspark
from pyspark.sql import SparkSession

In [14]:
#Create SparkSession
spark = SparkSession.builder.appName("Capstone Project").getOrCreate()

#### I.  Loading the needed immigration data from parquet files

In [29]:
 %reload_ext autoreload

In [16]:
import etl

df_immigration = etl.get_df_immigration(spark)
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- arrival_ts: string (nullable = true)
 |-- departure_ts: string (nullable = true)



#### II.  Creating fact table and writing to it

In [17]:
etl.create_fact_immigrant(df_immigration)

#### III.  Creating date dimension table and writing to it

In [24]:
etl.create_dimension_date(df_immigration)

date dimension data validation complete


#### IV.  Creating city dimension table and writing to it

In [41]:
etl.create_city_dimension(spark)

root
 |-- city_name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- total_population: string (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- average_householdsize: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_code: string (nullable = true)

Info: Current DF contains 2891 rows
Info: 9 columnns for current DF.
date dimension data validation complete


#### V.  Creating immigrant dimension table and writing to it

In [43]:
etl.create_immigrant_dimension(df_immigration)

Info: Current DF contains 219268 rows
Info: 4 columnns for current DF.
immigrant dimension data validation complete


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Count the number of rows and columns for the output tables. See datacheck.py
 * dropDuplicates() to get rid of duplicate data
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### FACT IMMIGRANT

| column name| description | type | data source | 
| --- | --- | --- | --- |
|cicid | unique number for an immigrant | integer, not nullable | sas_data  immigration |
|arrival_ts | timestamp  of the arrival date | timestamp | sas_data  immigration: transformed from field "arrdata": SAS numeric |
|departure_ts | timestamp  of the arrival date | timestamp | sas_data  immigration: transformed from field "depdate": SAS numeric |
|i94cit | 3 digit code of origin city | short, not nullable | sas_data  immigration |
|i94res | 3 digit from the country one has travelled | short | sas_data  immigration |
|i94port | 3 char code of origin city in USA| string | sas_data  immigration |
|fltno | flight number of airline that arrived in us | string | sas_data  immigration |

#### DIM date

Datasource: all timestamps are taken frome the arrival and departure date of the fact_immigration table

| column name| description | type |
| --- | --- | --- | 
| ts | unix timestamp, not nullable | ts |
| date | date | string |
| year | year | integer |
| month | month | integer |
| weekday | weekday | integer |
| day | day | integer |
| hour | hour | integer |

#### DIM IMMIGRANT
Data Source:  sas_data  immigration

| column name| description | type |
| --- | --- | --- | 
|cicid|  unique number for the immigrants| int |
|biryear| year of birth| int |
|gender| gender of immigrant| string |
|i94visa| Visa codes collapsed into three categories, 1 = Business 2 = Pleasure 3 = Student |int |


#### DIM CITY

Datasource: https://public.opendatasoft.com/explore/dataset/us-cities-demographics

| column name| description | type | 
| --- | --- | --- |
|city_code| code for the city| string |
|city_name| name of the city| string |
|state | state | string |
|male_population | number of male population| int |
|male_population | number of female population| int |
|total_population | number of total population| int |
|foreign_born | number of foreign born | int |
|average_household | average number of people in a household | double |
| state_code | code of the state | int |


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. Apache Spark was used to do all the processing data and create the model. 
- This is because Spark can scale a lot of data 
- It can handle a lot of different file formats
- The spark.sql library has many tools to transform data easily

2. Given the kind of data, it should be updated every day. Airflow can be used to ingest every day (arrival date) because fact table are partitioned by arrival date.

3. Under the following scenarios, I would approach the problem differently:

If the data was increased by 100x - Spark can handle the volume. We would just be adding more nodes to our cluster.

To update on a daily basis I would use Apache Airflow to create a schedule to update all the data,

If the data needs to be accessed by 100+ people, we can use Hive, Spark sql template views, ...