# Project Title
### Data Engineering Capstone Project

#### Project Summary
The goal of this project to built ETL pipline uses two different source of data, 194 immagration and city temperature.The database will be ready for Analyst to find any insight relation between immagration and temperature   

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [5]:
# !pip install pyspark
#pip install pyarrow

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
##### The immigration data comes form US National Tourism and Trade Office. 


##### Immigration Data
- cicid - float64 - ID that is unique, identify the dataset
- i94yr - float64 - 4 digit year
- i94cit - float64 - 3 digit code of immigrant born country
- i94res - float64 - 3 digit code of immigrant country of residence
- arrdate - float64 - Arrival date in the USA
- i94port = 3 character code of destination USA city
- i94mode - float64 - Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)
- biryear - float64 - 4 digit year of birth
- i94visa - float64 - Visa codes modified into: (B1, E2 = Business; B2 = Pleasure; F1 = Student, WB, WT= Three months visit, others = the rest)
- gender - 
- fltno - object - Flight number of Airline used to arrive in U.S.
- visatype - object - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.


### The temperature data set comes from Kaggle. It is in csv format.

* AverageTemperature = average temperature
* City = city name
* Country = country name
* Latitude= latitude
* Longitude = longitude



In [12]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.functions import lower, col
from pprint import pprint
import configparser
from datetime import datetime
import os, sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col




import pyspark.sql.functions as F
pd.set_option('display.width',170, 'display.max_rows',200, 'display.max_columns',900)

In [6]:
spark = SparkSession.builder \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .enableHiveSupport() \
    .getOrCreate()

In [33]:
df_img=pd.read_csv('img_org_data.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [35]:
# Explor the Data
df_img.shape

(3096313, 29)

In [36]:
# select the data and show missing value
new_df_img=df_img[['cicid', 'i94yr', 'i94cit', 'i94port','i94res', 'arrdate', 'i94mode', 'biryear','i94visa', 'gender', 'fltno', 'visatype']] 
(new_df_img.isna().sum() /len(new_df_img)) * 100

cicid        0.000000
i94yr        0.000000
i94cit       0.000000
i94port      0.000000
i94res       0.000000
arrdate      0.000000
i94mode      0.007719
biryear      0.025902
i94visa      0.000000
gender      13.379429
fltno        0.631364
visatype     0.000000
dtype: float64

### Step 2: Explore and Assess the Data
#### Explore the Data and Cleaning Steps
* I94 immigration data - use I94_SAS_Labels_Description.SAS to find the city of each port after cleaning and creating a datafram from I94_SAS_Labels_Description.SAS. Drop any null or unreleavent data

* Temperature Data - Drop AverageTemperature is NaN, duplicate (City,locations), and add the i94port of each city.


* Create a list for the cites that is in the immigration database, filter temperature data and select only the cites that is exisit in immigration database. Add port number using merge tables. 

In [37]:
new_df_img[['i94cit', 'i94port','i94res', 'arrdate', 'i94mode', 'biryear','i94visa', 'gender']].head(2)

Unnamed: 0,i94cit,i94port,i94res,arrdate,i94mode,biryear,i94visa,gender
0,692.0,XXX,692.0,20573.0,,1979.0,2.0,
1,254.0,ATL,276.0,20551.0,1.0,1991.0,3.0,M


In [38]:
# Cleaning: replace visatype values into 4 catogories: (B1, E2 = Business; B2 = Pleasure; F1 = Student, WB, WT= Three months visit, others = the rest)
new_df_img['visatype']= new_df_img['visatype'].replace(['B1', 'E2'], 'Business')
new_df_img['visatype']= new_df_img['visatype'].replace('B2', 'Pleasure')
new_df_img['visatype']= new_df_img['visatype'].replace('F1', 'Student')
new_df_img['visatype']= new_df_img['visatype'].replace(['WB','WT'], '3 Months Visa')
new_df_img['visatype']= new_df_img['visatype'].replace(['WB','WT'], '3 Months Visa')
new_df_img['visatype']= new_df_img['visatype'].replace(['GMT','CP', 'E1','I', 'F2','M1', 'I1', 'GMB', 'M2', 'SBP', 'CPL'], 'Others')

new_df_img.visatype.value_counts()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  after removing the cwd from sys.path.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the cavea

3 Months Visa    1592042
Pleasure         1117897
Business          231793
Others            115565
Student            39016
Name: visatype, dtype: int64

In [39]:
new_df_img.gender.value_counts()

M    1377224
F    1302743
X       1610
U        467
Name: gender, dtype: int64

In [40]:
new_df_img.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 12 columns):
cicid       float64
i94yr       float64
i94cit      float64
i94port     object
i94res      float64
arrdate     float64
i94mode     float64
biryear     float64
i94visa     float64
gender      object
fltno       object
visatype    object
dtypes: float64(8), object(4)
memory usage: 283.5+ MB


In [41]:
new_df_img.i94yr.value_counts()

2016.0    3096313
Name: i94yr, dtype: int64

In [42]:
# Get port locations from SAS text file
with open("./I94_SAS_Labels_Descriptions.SAS") as f:
    content = f.readlines()
content = [x.strip() for x in content]
ports = content[302:962]
splitted_ports = [port.split("=") for port in ports]
port_codes = [x[0].replace("'","").strip() for x in splitted_ports]
port_locations = [x[1].replace("'","").strip() for x in splitted_ports]
port_cities = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
df_port_locations = pd.DataFrame({"i94port" : port_codes, "City": port_cities, "State": port_states})
df_port_locations.head(5)


Unnamed: 0,i94port,City,State
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [43]:
df_port_locations.City.value_counts()

NEWPORT                        3
VANCOUVER                      2
COLUMBUS                       2
PITTSBURG                      2
WILMINGTON                     2
Collapsed (FOK) 06/15          2
Collapsed into INT             2
LOS INDIOS                     2
YUMA                           2
PORTLAND                       2
MARFA                          2
DERBY LINE                     2
AKRON                          2
KETCHIKAN                      2
FREEPORT                       2
CANNON CORNERS                 2
MORRISTOWN                     2
CHARLESTON                     2
LEWISTON                       2
SAN LUIS OBISPO                2
LAKE CHARLES                   2
ROCHESTER                      2
ROME                           2
BELLINGHAM                     2
EASTPORT                       2
BOCAGRANDE                     1
FARGO                          1
MID-CONTINENT - WITCHITA       1
ST LOUIS                       1
NIGHTHAWK                      1
BRIDGEPORT

In [44]:
# add a primery key column to the imgration table 'City'img
img_full=pd.merge(new_df_img, df_port_locations, how="outer", on=['i94port'] )

In [45]:
# drop selected value where City = NOT REPORTED/UNKNOWN
indexNames = img_full[(img_full['City'] == 'NOT REPORTED/UNKNOWN')].index 
img_full.drop(indexNames , inplace=True)

In [46]:
x='no port code'
ind=img_full[img_full.City.str.contains(x)].index
img_full.drop(ind , inplace=True)

g=['X','U', 'NaN']
ind=img_full[img_full.gender.isin(g)].index
img_full.drop(ind , inplace=True)

In [47]:
img_full.gender.value_counts()

M    1375596
F    1301121
Name: gender, dtype: int64

In [48]:
# check null value and drop
index=img_full[img_full.cicid.isnull()].index
img_full.drop(index , inplace=True)
img_full[img_full.cicid.isnull()]

Unnamed: 0,cicid,i94yr,i94cit,i94port,i94res,arrdate,i94mode,biryear,i94visa,gender,fltno,visatype,City,State


In [49]:
# no duplicate in the primery key
len(img_full) - len(img_full.cicid.unique())

0

In [50]:
# Only 292 city we are intersting in:
img_full['City']=img_full.City.str.lower()
city_list=img_full.City.unique()
len(city_list)

292

In [30]:
#read weather file
temp_df=pd.read_csv('GlobalLandTemperaturesByCity.csv')
temp_df.tail(1)
temp_df['City']=temp_df['City'].str.lower()
len(temp_df['City'].unique())

3448

In [51]:
us_temp=temp_df[temp_df.Country == 'United States']
len(us_temp.City.unique())

248

In [62]:
# Filter data by city, this data has no record of City in USA so will ignor this data:
us_temp=us_temp[us_temp['City'].isin(city_list)]
temp_city=us_temp.City.unique()
len(us_temp.City.unique())

69

In [54]:
# drop null value
index=us_temp[us_temp.AverageTemperature.isnull()].index
us_temp.drop(index , inplace=True)
us_temp[us_temp.AverageTemperature.isnull()]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude


In [56]:
#will check the demographic city 
us_city=pd.read_csv('us-cities-demographics.csv', sep=';')
us_city['City']=us_city.City.str.lower()


In [57]:
# replace value of was
us_city.replace(to_replace ="washington", 
                 value ="washington dc", inplace=True) 

In [58]:
# check the cities
us_city_img_city=us_city[us_city['City'].isin(city_list)]
test=us_city_img_city.City.unique()
len(us_city_img_city.City.unique())


81

In [59]:
us_city_img_city.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 439 entries, 12 to 2888
Data columns (total 12 columns):
City                      439 non-null object
State                     439 non-null object
Median Age                439 non-null float64
Male Population           439 non-null float64
Female Population         439 non-null float64
Total Population          439 non-null int64
Number of Veterans        436 non-null float64
Foreign-born              436 non-null float64
Average Household Size    436 non-null float64
State Code                439 non-null object
Race                      439 non-null object
Count                     439 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 44.6+ KB


In [65]:
# Create a valid list from imigration to filter cities 
new_im_data=img_full[img_full.City.isin(temp_city)]
# select only the columns below 
new_im_data=new_im_data[['City', 'i94port']]
new_im_data=new_im_data.reset_index(drop=True)
# drop duplicate
new_im_data=new_im_data.drop_duplicates(subset=['City', 'i94port'], keep='last')
new_im_data.sort_values('City')
new_im_data=new_im_data.reset_index(drop=True)

new_im_data

Unnamed: 0,City,i94port
0,atlanta,ATL
1,new york,NYC
2,boston,BOS
3,houston,HOU
4,miami,MIA
5,chicago,CHI
6,los angeles,LOS
7,charlotte,CLT
8,denver,DEN
9,dallas,DAL


In [67]:
# drop duplicate city
us_temp=us_temp.sort_values('City').drop_duplicates(subset=['City', 'Country'], keep='last')

us_temp_new=pd.merge(us_temp, new_im_data, how="outer", on="City")
us_temp_new


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,i94port
0,1885-12-01,0.768,1.095,albuquerque,United States,34.56N,107.03W,ABQ
1,1893-05-01,6.884,1.725,anchorage,United States,61.88N,151.13W,ANC
2,1840-09-01,19.963,2.114,atlanta,United States,34.56N,83.68W,ATL
3,1885-02-01,10.883,1.353,austin,United States,29.74N,97.85W,AUS
4,1840-12-01,-0.987,1.688,baltimore,United States,39.38N,76.99W,BAL
5,1875-09-01,23.95,0.623,baton rouge,United States,29.74N,90.46W,BTN
6,1840-07-01,25.855,1.705,birmingham,United States,32.95N,87.13W,BHX
7,1840-04-01,7.766,2.863,boston,United States,42.59N,72.00W,BOS
8,1886-10-01,24.531,0.944,brownsville,United States,26.52N,96.72W,BRO
9,1839-06-01,15.859,1.508,buffalo,United States,42.59N,78.55W,BUF


In [36]:
# df_img

### Step 3: Define the Data Model


## Data Dictionary:
##### Fact table Table - This will contain events from the I94 immigration data.
* cicid: ID that is unique
* i94cit: code of city
* i94res: 3 digit code of immigrant country of residence
* i94port:  3 character code of destination USA city
* i94mode: travel code
* i94visa - type of the visa 
* gender - M, F


##### Dim temperature:

* i94port = 3 character code of destination city 
* AverageTemperature = average temperature
* City = Name of the City
* Latitude= latitude
* Longitude = longitudeCity = city name
* Country = country name


##### Dim Immigration:

 * cicid: id 
 * i94yr: year 
 * i94cit: City code 
 * i94port: US code of destination
 * i94res: immigrant country of residence
 * arrdate: Arrival date in the USA
 * i94mode: type of trip
 * biryear: year of birth
 * i94visa: Visa Code
 * gender: M, F
 * fltno:Flight number of the airline
 * visatype: Student, Buiness , Three months, Pleasure 
 * State: State 



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [77]:
img_full.to_csv('img_full.csv')
us_temp_new.to_csv('us_temp_new.csv')

In [78]:
# Write code here
df_img=spark.read.format("csv").option("header", "true").load('img_full.csv')
df_img.createOrReplaceTempView('imgration')

us_temp_new=spark.read.format("csv").option("header", "true").load('us_temp_new.csv')
us_temp_new.createOrReplaceTempView('temp')

In [84]:
us_temp_new.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- i94port: string (nullable = true)



In [87]:
# ## Conceptual Data Model
# #### imigration dim:
df_img.write.mode("append").partitionBy("i94port").parquet("immigration/")

In [89]:
#dim_temperature
us_temp_new.write.mode("append").partitionBy("i94port").parquet("temperature/")

In [86]:
df_img.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)



In [97]:
# Create the fact table by joining the immigration and temperature views
fact_table = spark.sql('''
SELECT imgration.cicid as id,
       imgration.i94yr as year,
       imgration.i94cit as city,
       imgration.i94port as i94port,
       imgration.i94visa as reason,
       imgration.i94mode as type_of_trip,
       temp.AverageTemperature as temperature,
  
FROM imgration
JOIN temp ON (imgration.i94port = temp.i94port)
''')

In [98]:
#Write fact table to parquet files partitioned by i94port
fact_table.write.mode("append").partitionBy("i94port").parquet("fact/")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [99]:
# Perform quality checks here
def quality_check(df, description):
    result = df.count()
    
    if result == 0:
        print("Data quality check failed for {} with zero records".format(description))
    else:
        print("Data quality check passed for {} with {} records".format(description, result))
    return 0

In [100]:
# Perform data quality check
quality_check(df_img, "immigration table")
quality_check(us_temp_new, "temperature table")

Data quality check passed for immigration table with 3090769 records
Data quality check passed for temperature table with 72 records


0

### Complete Project Write Up:
Technology of this project:
* Spark 
* Panda
* Sql 
* SAS CSV 
ETL - I have extarct the data from diifrent source and use the datafarem to clean and modify the data that can be combine and used as data lake. Finaly the data can be extarct and combine  using 194port number     

Answer questions:


### If the data was increased by 100x.
I will keep the process the same due to the fact Spark is a great tool for processing datasets. However, using EMR cluster is more powerful in the distributed way to handle 100x bigger data.

### If the pipelines were run on a daily basis by 7am?
To run run at 7am each day as production level pipeline, I have to convert the Jupyter notebook to a python script and send that directly to the emr instance to run every day at 7am. I only used Jupyter as quick display to the data as an analyst.

### If the database needed to be accessed by 100+ people. If the dataset needed to be accessed by 100+ more people there should not really be a problem pulling it from S3. 

A good solution of this approach to consider transferring it to the EMR HDFS database to make things a bit quicker. Also, Use RedShift to have the data stored in a way that it can efficiently be accessed by many people




### why I chose the this data model: tha.
The ETL I have built resulted in two dimensions tables and a fact table.
To approach this, I have aggregated dataset by the city to create two-dimension tables (immigration and temperature) and I created fact table by joining two tables on city. .
Finally, we will run the query on the immigration event to find out if the temperature has affected the immigration choice of city. .
I used Spark cause its  free open source and efficient to extract and manipulate data lake. 
 