# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import psycopg2
from pyspark.sql import SparkSession
import os
import configparser
from insert_data import insert_fact, insert_dim_imm_per, insert_dim_imm_air, insert_dim_demo_info, insert_dim_demo_stat, insert_dim_temp

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

The purpose of this project is to see the connection between the temperature of the cities that people immigrate to. We could check the preferred destination cities for immigrants based on the immigration year as well. This data will be used for a BI app.

__Data Used:__
- I94 Immigration Data
- Temperature Data
- Demographics Data

__Tools Used:__
- AWS
    - S3
    - Redshift
- Python
    - PySpark
- Airflow for Pipeline

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### I94 Immigration Data
This data comes from [National Travel and Trourism Office(NTTO)](https://www.trade.gov/national-travel-and-tourism-office). The subject of the data is the immigrants going to the U.S. and the information is gives are where they come from, birth year, gender, visa type, etc.

In [2]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
df.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [5]:
fact_immigration = df[['cicid', 'i94yr', 'i94mon', 'i94port', 'arrdate', 'depdate', 'i94mode', 'i94visa']]
fact_immigration.columns = ['cic_id', 'year', 'month', 'dep_city', 'arrival_date', 'dep_date', 'travel_code', 'visa']
fact_immigration['country'] = 'United States'
fact_immigration.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,cic_id,year,month,dep_city,arrival_date,dep_date,travel_code,visa,country
0,6.0,2016.0,4.0,XXX,20573.0,,,2.0,United States
1,7.0,2016.0,4.0,ATL,20551.0,,1.0,3.0,United States
2,15.0,2016.0,4.0,WAS,20545.0,20691.0,1.0,2.0,United States
3,16.0,2016.0,4.0,NYC,20545.0,20567.0,1.0,2.0,United States
4,17.0,2016.0,4.0,NYC,20545.0,20567.0,1.0,2.0,United States


In [6]:
dim_immigration_personal = df[['cicid', 'i94cit', 'i94res', 'biryear', 'gender', 'insnum']]
dim_immigration_personal.columns = ['cic_id', 'citizen_country', 'resident_country', 'birthyear', 'gender', 'ins_number']
dim_immigration_personal.head()

Unnamed: 0,cic_id,citizen_country,resident_country,birthyear,gender,ins_number
0,6.0,692.0,692.0,1979.0,,
1,7.0,254.0,276.0,1991.0,M,
2,15.0,101.0,101.0,1961.0,M,
3,16.0,101.0,101.0,1988.0,,
4,17.0,101.0,101.0,2012.0,,


In [7]:
dim_immigration_air = df[['cicid', 'airline', 'admnum', 'fltno', 'i94visa', 'visatype']]
dim_immigration_air.columns = ['cic_id', 'airline', 'admin_number', 'flight_number', 'visa', 'visa_type']
dim_immigration_air.head()

Unnamed: 0,cic_id,airline,admin_number,flight_number,visa,visa_type
0,6.0,,1897628000.0,,2.0,B2
1,7.0,,3736796000.0,296.0,3.0,F1
2,15.0,OS,666643200.0,93.0,2.0,B2
3,16.0,AA,92468460000.0,199.0,2.0,B2
4,17.0,AA,92468460000.0,199.0,2.0,B2


In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [6]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

AnalysisException: 'path file:/home/workspace/sas_data already exists.;'

#### Temperature Data
This data comes from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data), and it shows the temperature in different cities around the world. It's recorded monthly, and the values are the average temperature of that month.

In [8]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)

In [9]:
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [10]:
df_temp_us = df_temp[df_temp['Country'] == 'United States']
df_temp_us.columns = ['dt', 'avg_temp', 'avg_temp_uncertainty', 'city', 'country', 'latitude', 'longitude']
df_temp_us.head()

Unnamed: 0,dt,avg_temp,avg_temp_uncertainty,city,country,latitude,longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [11]:
df_temp_us['dt'] = pd.to_datetime(df_temp_us['dt'])
df_temp_us['month'] = pd.DatetimeIndex(df_temp_us['dt']).month
df_temp_us['year'] = pd.DatetimeIndex(df_temp_us['dt']).year
df_temp_us.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,dt,avg_temp,avg_temp_uncertainty,city,country,latitude,longitude,month,year
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W,1,1820
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W,2,1820
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W,3,1820
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W,4,1820
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W,5,1820


#### US Cities Demographics Data
This data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/), and it shows the demographics of the cities in the U.S. including the population, median age, the state the city belongs to, etc.

In [12]:
fname = './us-cities-demographics.csv'
df_demo = pd.read_csv(fname, delimiter=';')

In [13]:
df_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [14]:
df_demo_info = df_demo[['City', 'State', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'State Code', 'Race']]
df_demo_info.columns = ['city', 'state', 'm_population', 'f_population', 'total_population', 'num_of_veterans', 'foreign_born', 'state_code', 'race']
df_demo_info.head()

Unnamed: 0,city,state,m_population,f_population,total_population,num_of_veterans,foreign_born,state_code,race
0,Silver Spring,Maryland,40601.0,41862.0,82463,1562.0,30908.0,MD,Hispanic or Latino
1,Quincy,Massachusetts,44129.0,49500.0,93629,4147.0,32935.0,MA,White
2,Hoover,Alabama,38040.0,46799.0,84839,4819.0,8229.0,AL,Asian
3,Rancho Cucamonga,California,88127.0,87105.0,175232,5821.0,33878.0,CA,Black or African-American
4,Newark,New Jersey,138040.0,143873.0,281913,5829.0,86253.0,NJ,White


In [15]:
df_demo_stat = df_demo[['City', 'State', 'Median Age', 'Average Household Size', 'State Code']]
df_demo_stat.columns = ['city', 'state', 'median_age', 'avg_household_size', 'state_code']
df_demo_stat.head()

Unnamed: 0,city,state,median_age,avg_household_size,state_code
0,Silver Spring,Maryland,33.8,2.6,MD
1,Quincy,Massachusetts,41.0,2.39,MA
2,Hoover,Alabama,38.5,2.58,AL
3,Rancho Cucamonga,California,34.5,3.18,CA
4,Newark,New Jersey,34.6,2.73,NJ


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### Converting floats into datetime format for fact_immigration table

In [16]:
fact_immigration.dtypes

cic_id          float64
year            float64
month           float64
dep_city         object
arrival_date    float64
dep_date        float64
travel_code     float64
visa            float64
country          object
dtype: object

In [17]:
fact_immigration['arrival_date'] = pd.to_datetime(fact_immigration['arrival_date'], unit='D',
               origin=pd.Timestamp('1960-01-01'))
fact_immigration['dep_date'] = pd.to_datetime(fact_immigration['dep_date'], unit='D',
               origin=pd.Timestamp('1960-01-01'))
fact_immigration.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  after removing the cwd from sys.path.


Unnamed: 0,cic_id,year,month,dep_city,arrival_date,dep_date,travel_code,visa,country
0,6.0,2016.0,4.0,XXX,2016-04-29,NaT,,2.0,United States
1,7.0,2016.0,4.0,ATL,2016-04-07,NaT,1.0,3.0,United States
2,15.0,2016.0,4.0,WAS,2016-04-01,2016-08-25,1.0,2.0,United States
3,16.0,2016.0,4.0,NYC,2016-04-01,2016-04-23,1.0,2.0,United States
4,17.0,2016.0,4.0,NYC,2016-04-01,2016-04-23,1.0,2.0,United States


In [34]:
fact_immigration = fact_immigration[pd.notnull(fact_immigration['arrival_date'])]
fact_immigration = fact_immigration[pd.notnull(fact_immigration['dep_date'])]

#### Extracting necessary information from i94 file

In [19]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [20]:
countries = contents[9:298]
ports = contents[302:962]
modes = contents[972:976]
states = contents[981:1036]
visa = contents[1046:1049]

In [21]:
country = [x.strip().split('=') for x in countries]
country_code = [x[0].replace("'","") for x in country]
country_name = [x[1].replace("'","") for x in country]
df_country = pd.DataFrame({'country_code':country_code, 'country_name':country_name})
df_country.head()

Unnamed: 0,country_code,country_name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no l..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [22]:
port = [x.strip().split('=') for x in ports]
port_code = [x[0].replace("'","").strip('\t') for x in port]
port_loc = [x[1].replace("'","").strip('\t') for x in port]
port_loc_c = [x.split(',')[0] for x in port_loc]
port_loc_s = [x.split(',')[-1] for x in port_loc]
df_port = pd.DataFrame({'port_code':port_code, 'port_loc_city':port_loc_c, 'port_loc_state':port_loc_s})
df_port.head()

Unnamed: 0,port_code,port_loc_city,port_loc_state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [23]:
mode = [x.strip().split('=') for x in modes]
code = [x[0] for x in mode]
mode_name = [x[1].replace("'", "").strip(";") for x in mode]
df_mode = pd.DataFrame({'code':code, 'mode':mode_name})
df_mode.head()

Unnamed: 0,code,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [24]:
state = [x.strip().split('=') for x in states]
code = [x[0].replace("'", "") for x in state]
state_name = [x[1].replace("'", "") for x in state]
df_state = pd.DataFrame({'state_code':code, 'state':state_name})
df_state.head()

Unnamed: 0,state_code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [25]:
visa_list = [x.strip().split('=') for x in visa]
code = [x[0] for x in visa_list]
visa_name = [x[1] for x in visa_list]
df_visa = pd.DataFrame({'code':code, 'visa':visa_name})
df_visa.head()

Unnamed: 0,code,visa
0,1,Business
1,2,Pleasure
2,3,Student


#### Dropping missing values and duplicate data

In [26]:
fact_immigration.duplicated().any()

False

In [27]:
dim_immigration_personal.duplicated().any()

False

In [28]:
dim_immigration_air.duplicated().any()

False

In [29]:
df_temp_us.duplicated().any()

False

In [30]:
df_demo_info.duplicated().any()

False

In [31]:
df_demo_stat.duplicated().any()

True

In [35]:
# immigration data
fact_immigration.dropna(subset=['cic_id'])
dim_immigration_personal.dropna(subset=['cic_id'])
dim_immigration_air.dropna(subset=['cic_id'])

# temperature data
df_temp_us.dropna()

# demography data
df_demo_info.dropna()
df_demo_stat.dropna()
df_demo_stat.drop_duplicates(subset = 'city', keep = 'first')

Unnamed: 0,city,state,median_age,avg_household_size,state_code
0,Silver Spring,Maryland,33.8,2.60,MD
1,Quincy,Massachusetts,41.0,2.39,MA
2,Hoover,Alabama,38.5,2.58,AL
3,Rancho Cucamonga,California,34.5,3.18,CA
4,Newark,New Jersey,34.6,2.73,NJ
5,Peoria,Illinois,33.1,2.40,IL
6,Avondale,Arizona,29.1,3.18,AZ
7,West Covina,California,39.8,3.56,CA
8,O'Fallon,Missouri,36.0,2.77,MO
9,High Point,North Carolina,35.5,2.65,NC


In [37]:
fact_immigration.head()

Unnamed: 0,cic_id,year,month,dep_city,arrival_date,dep_date,travel_code,visa,country
2,15.0,2016.0,4.0,WAS,2016-04-01,2016-08-25,1.0,2.0,United States
3,16.0,2016.0,4.0,NYC,2016-04-01,2016-04-23,1.0,2.0,United States
4,17.0,2016.0,4.0,NYC,2016-04-01,2016-04-23,1.0,2.0,United States
5,18.0,2016.0,4.0,NYC,2016-04-01,2016-04-11,1.0,1.0,United States
6,19.0,2016.0,4.0,NYC,2016-04-01,2016-04-14,1.0,2.0,United States


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
<img width="802" alt="Screen Shot 2021-09-09 at 10 46 07 PM" src="https://user-images.githubusercontent.com/79597984/132707777-a124e5d3-03d0-45c6-9bd5-61a8fc6f5618.png">

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
1. Create the data model by coping the drop and create statements from `create_tables.sql` file into the query editor in redshift.
2. Run step 1 & 2 above to clean the data.
3. Insert the data into the tables created by running the below code in step `4.1`.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [50]:
# Write code here
conn = psycopg2.connect("host=redshift-cluster-1.cgy0semihpac.us-west-2.redshift.amazonaws.com dbname=dev user=awsuser password=Dayeonku59 port=5439")
cur = conn.cursor()
conn.autocommit = True

In [42]:
for index, row in fact_immigration.iterrows():
    cur.execute(insert_fact, list(row.values))

KeyboardInterrupt: 

In [44]:
for index, row in dim_immigration_personal.iterrows():
    cur.execute(insert_dim_imm_per, list(row.values))

KeyboardInterrupt: 

In [None]:
for index, row in dim_immigration_air.iterrows():
    cur.execute(insert_dim_imm_air, list(row.values))

In [None]:
for index, row in df_demo_info.iterrows():
    cur.execute(insert_dim_demo_info, list(row.values))

In [51]:
for index, row in df_demo_stat.iterrows():
    cur.execute(insert_dim_demo_stat, list(row.values))

In [None]:
for index, row in df_temp_us.iterrows():
    cur.execute(insert_dim_temp, list(row.values))

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
fact_immigration.dtypes()

#### Checking if the rows exist after inserting data

In [52]:
cur.execute("SELECT COUNT(*) FROM fact_immigration;")

if cur.rowcount < 1:
    print("No data found in the fact table")
else:
    print(f"Successfully executed with {cur.rowcount} records in the fact table!")

    
cur.execute("SELECT COUNT(*) FROM dim_immigration_personal;")

if cur.rowcount < 1:
    print("No data found in table dim_immigration_personal")
else:
    print(f"Successfully executed with {cur.rowcount} records in the table dim_immigration_personal!")

    
cur.execute("SELECT COUNT(*) FROM dim_immigration_air;")

if cur.rowcount < 1:
    print("No data found in table dim_immigration_air")
else:
    print(f"Successfully executed with {cur.rowcount} records in the table dim_immigration_air!")

    
cur.execute("SELECT COUNT(*) FROM dim_demo_info;")

if cur.rowcount < 1:
    print("No data found in table dim_demo_info")
else:
    print(f"Successfully executed with {cur.rowcount} records in the table dim_demo_info!")

    
cur.execute("SELECT COUNT(*) FROM dim_demo_stat;")

if cur.rowcount < 1:
    print("No data found in table dim_demo_stat")
else:
    print(f"Successfully executed with {cur.rowcount} records in the table dim_demo_stat!")

    
cur.execute("SELECT COUNT(*) FROM dim_temp;")

if cur.rowcount < 1:
    print("No data found in table dim_temp")
else:
    print(f"Successfully executed with {cur.rowcount} records in the table dim_temp!")

ProgrammingError: relation "df_demo_info" does not exist


In [62]:
query = "SELECT COUNT(*) FROM dim_demo_stat;"
cur.execute(query) 
data = cur.fetchall()
print(len(data))

if len(data) < 1:
    print("No data found in table dim_demo_stat")
else:
    print(f"Successfully executed with {len(data)} records!")

1
Successfully executed with 1 records!


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.