# Capstone Project
### Data Engineering Capstone Project

#### Project Summary
the objective of this project is to build a data warehouse with different sets of data, such as weather, travel and tourism information.
Propose to the business analysts that they identify trends, understand seasonality of visiting the United States and Answering business questions.
I Used in this project some lessons learned during the course, such as Spark, data model among others.  


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
import pandas as pd
import os
from pyspark.sql import SparkSession

In [108]:
# declare path of dataset

project_dir = os.path.abspath("./../data")

full_path_immigration  = os.path.join(project_dir , "immigration_data_sample.csv")
full_path_temperature  = os.path.join(project_dir , 'GlobalTemperature/GlobalLandTemperaturesByCity.csv')
full_path_demographics = os.path.join(project_dir , "us-cities-demographics.csv")
full_path_i94_sas 	   = os.path.join(project_dir , "sas_data")

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, I will understand information about US temperature, demographics and tourism by storing the data in a data warehouse using the star schema schema. Using Pandas and Spark to Explore the Dataset

#### Describe and Gather Data 

- **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office;
- **World Temperature Data**: Este conjunto de dados veio do Kaggle. E há informações sobre a média de temperatura de países e cidades;
- **U.S. City Demographic Data**: This dataset presents information on the city's population, such as median age, number of population 
separated by gender, number of people born abroad, among others.



### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
- Transform arrive_date, departure_date in immigration data from SAS time format to pandad datetime format
- Parse I94_SAS_Labels_Descriptions.SAS file to get auxiliary dimension table - country_code, city_code, state_code
- Tranform city, state in demography data to upper case to match city_code and state_code table

#### Immigration data

**I94 Immigration Data**: This data comes from the US National Tourism and Trade Office.

In [97]:
df_immigration = pd.read_csv(full_path_immigration)
df_immigration.head(3)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT


In [98]:
df_immigration.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Unnamed: 0  1000 non-null   int64  
 1   cicid       1000 non-null   float64
 2   i94yr       1000 non-null   float64
 3   i94mon      1000 non-null   float64
 4   i94cit      1000 non-null   float64
 5   i94res      1000 non-null   float64
 6   i94port     1000 non-null   object 
 7   arrdate     1000 non-null   float64
 8   i94mode     1000 non-null   float64
 9   i94addr     941 non-null    object 
 10  depdate     951 non-null    float64
 11  i94bir      1000 non-null   float64
 12  i94visa     1000 non-null   float64
 13  count       1000 non-null   float64
 14  dtadfile    1000 non-null   int64  
 15  visapost    382 non-null    object 
 16  occup       4 non-null      object 
 17  entdepa     1000 non-null   object 
 18  entdepd     954 non-null    object 
 19  entdepu     0 non-null      

In [101]:
df_fact_immigration = df_immigration[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', 'arrdate', 'depdate', 'i94mode', 'i94visa']].copy()
df_fact_immigration.columns = 		 ['cic_id', 'year', 'month', 'cod_port', 'cod_state', 'arrival_date', 'departure_date', 'mode', 'visa']
df_fact_immigration['country'] = 'United States'
df_fact_immigration.head(3)

Unnamed: 0,cic_id,year,month,cod_port,cod_state,arrival_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0,United States


In [None]:
#alterar o tipo dos dados immigration fact

In [102]:
df_dim_immigration_person = df_immigration[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']].copy()
df_dim_immigration_person.columns = [['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender', 'ins_num']]
df_dim_immigration_person.head(5)

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender,ins_num
0,4084316.0,209.0,209.0,1955.0,F,
1,4422636.0,582.0,582.0,1990.0,M,
2,1195600.0,148.0,112.0,1940.0,M,
3,5291768.0,297.0,297.0,1991.0,M,
4,985523.0,111.0,111.0,1997.0,F,


In [None]:
#alterar o tipo dos dados immigration dimension person

In [103]:
df_dim_immigration_airline = df_immigration[['cicid', 'airline', 'admnum', 'fltno', 'visatype']].copy()
df_dim_immigration_airline.columns = ['cic_id', 'airline', 'admin_num', 'flight_number', 'visa_type']
df_dim_immigration_airline.head(5)

Unnamed: 0,cic_id,airline,admin_num,flight_number,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


In [None]:
#alterar o tipo dos dados immigration dimension airline

#### Temperature Dataset

**World Temperature Data**: Este conjunto de dados veio do Kaggle. E há informações sobre a média de temperatura de países e cidades.

In [109]:
# Read temperature's dataset
df_temperature = pd.read_csv(full_path_temperature)

# Print temperature dataset
df_temperature.head(3)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


In [110]:
df_temperature.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
 #   Column                         Dtype  
---  ------                         -----  
 0   dt                             object 
 1   AverageTemperature             float64
 2   AverageTemperatureUncertainty  float64
 3   City                           object 
 4   Country                        object 
 5   Latitude                       object 
 6   Longitude                      object 
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [111]:
df_fact_temperature = df_temperature.copy()
df_fact_temperature.columns = ['measurement_date', 'average_temp', 'average_temperature_uncertainty', 'city', 'country','latitude', 'longitude']
df_fact_temperature.head(5)

Unnamed: 0,measurement_date,average_temp,average_temperature_uncertainty,city,country,latitude,longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [None]:
#alterar o tipo dos dados temperatura fact

In [112]:
df_dim_temperature_region = df_temperature[['City', 'Country', 'Latitude', 'Longitude']]

df_dim_temperature_region.columns = ['city', 'country','latitude', 'longitude']
df_fact_temperature.head(5)

Unnamed: 0,measurement_date,average_temp,average_temperature_uncertainty,city,country,latitude,longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [None]:
#alterar o tipo dos dados temperatura dimension

In [113]:
# incluir month and year of measurement

df_fact_temperature['measurement_date'] = pd.to_datetime(df_fact_temperature['measurement_date'])
df_fact_temperature['measuremnt_year'] = df_fact_temperature['measurement_date'].apply(lambda t: t.year)
df_fact_temperature['measuremnt_month'] = df_fact_temperature['measurement_date'].apply(lambda t: t.month)
df_fact_temperature.head()

Unnamed: 0,measurement_date,average_temp,average_temperature_uncertainty,city,country,latitude,longitude,measuremnt_year,measuremnt_month
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E,1743,11
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E,1743,12
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E,1744,1
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E,1744,2
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E,1744,3


In [61]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')



In [94]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet(full_path_i94_sas)
df_spark.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1
5,5748522.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20579.0,...,,M,1959.0,10292016,M,,NZ,94981800000.0,10,B2
6,5748523.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1950.0,10292016,F,,NZ,94979690000.0,10,B2
7,5748524.0,2016.0,4.0,245.0,464.0,HHW,20574.0,1.0,HI,20586.0,...,,M,1975.0,10292016,F,,NZ,94979750000.0,10,B2
8,5748525.0,2016.0,4.0,245.0,464.0,HOU,20574.0,1.0,FL,20581.0,...,,M,1989.0,10292016,M,,NZ,94973250000.0,28,B2
9,5748526.0,2016.0,4.0,245.0,464.0,LOS,20574.0,1.0,CA,20581.0,...,,M,1990.0,10292016,F,,NZ,95013550000.0,2,B2


In [63]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [None]:
#alterar o tipo dos dados immigration fact spark

#### U.S. City Demographic Data

**U.S. City Demographic Data**: This dataset presents information on the city's population, such as median age, number of population 
separated by gender, number of people born abroad, among others.

In [87]:
df_demographics = pd.read_csv(full_path_demographics, delimiter=";")
df_demographics.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [88]:
df_demographics.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   City                    2891 non-null   object 
 1   State                   2891 non-null   object 
 2   Median Age              2891 non-null   float64
 3   Male Population         2888 non-null   float64
 4   Female Population       2888 non-null   float64
 5   Total Population        2891 non-null   int64  
 6   Number of Veterans      2878 non-null   float64
 7   Foreign-born            2878 non-null   float64
 8   Average Household Size  2875 non-null   float64
 9   State Code              2891 non-null   object 
 10  Race                    2891 non-null   object 
 11  Count                   2891 non-null   int64  
dtypes: float64(6), int64(2), object(4)
memory usage: 271.2+ KB


In [114]:
df_fact_demographics = df_demographics[['City', 'State', 'Median Age', 'Male Population', 'Female Population',
                                        'Total Population', 'Number of Veterans', 'Foreign-born',
                                        'Average Household Size', 'State Code', 'Race', 'Count']
                                   ].copy()
df_fact_demographics.columns = ['city', 'state','median_age', 'male_population', 'female_population', 'total_population', 'number_veterans', 'foreign_born', 'average_household_size', 'cod_state', 'race', 'count']
df_fact_demographics.head(3)

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_veterans,foreign_born,average_household_size,cod_state,race,count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759


In [None]:
#alterar o tipo dos dados fact demographics

In [115]:
df_dim_city_location = df_demographics[['City', 'State']].copy()
df_dim_city_location.columns = ['city', 'state']
df_dim_city_location.head(5)

Unnamed: 0,city,state
0,Silver Spring,Maryland
1,Quincy,Massachusetts
2,Hoover,Alabama
3,Rancho Cucamonga,California
4,Newark,New Jersey


22/10/20 04:15:19 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 906959 ms exceeds timeout 120000 ms
22/10/20 04:15:19 WARN SparkContext: Killing executors is not supported by current scheduler.


In [None]:
with open(full_path_i94_sas) as f:
    content = f.readlines()
content = [x.strip() for x in content]
ports = content[302:962]
splitted_ports = [port.split("=") for port in ports]
port_codes = [x[0].replace("'","").strip() for x in splitted_ports]
port_locations = [x[1].replace("'","").strip() for x in splitted_ports]
port_cities = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
df_port_locations = pd.DataFrame({"port_code" : port_codes, "port_city": port_cities, "port_state": port_states})


In [None]:
df_port_locations.head(3)

Transform the columns arrival_date and departure_date from SAS time to datetime.

In [None]:
df_fact_immigration['arrival_date'] = pd.to_timedelta(df_fact_immigration['arrival_date'], unit='D') + pd.Timestamp('1960-1-1')
df_fact_immigration['departure_date'] = pd.to_timedelta(df_fact_immigration['departure_date'], unit='D') + pd.Timestamp('1960-1-1')
df_fact_immigration.head(5)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be upmeasurement_dated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be upmeasurement_dated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.