# Project Title
### Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Project Write Up

In [4]:
import pandas as pd

### Step 1: Purpose of the project
The purpose of the projecs is to study the following:<br />
1- The relation between temperature of departure country at the time and the volume of travellers.<br />
2- The relation between the temperature of departure country and temperature of arrival state at the time of travel.<br />
3- The seasonality of travel and the visa type related to (ex. which month of the year has most given tourist visa type).


### Step 2: Project Steps
1- Use pandas to analyze a sample of the data from each csv sample files.<br />
2- Use Spark to load the full data into dfs.<br />
3- Analyze the immigration data, city demographics, airport codes and global temperature data to capture missing data and devise a strategy to clean.<br />
4- Clean the data. <br />
5- Create the tables (fact and dimension tables) <br />
   5.1- "country temperature" dimension table: this table contain the country and its monthly temperature based on   
   the country temperature reported in global temperature data, this table with the next table can help understand the flow of tourism/immigration based on the temperature of the departure country and arrival in us states. The link to the    fact table is through the code of the country of residence.<br />
   5.2- "us state temperature" dimension table: this table will contain the state, state code and its monthly temperature based on the temperature reported in the us state in the global temperature data, this table with the previous table can help understand the flow of tourism/immigration based on the temperature of the departure country and arrival in us states. The link to the fact table is through the state code (i94addr - USA State of arrival).<br />
   5.3- "usa demographics" dimension table which is constructed from the demographics data, the link to the fact table is         through the state code.<br />
   5.4- "travel month" dimension table which is constructed directly from I94 immigration data. It includes the departure country, arrival city and month of travel. It links to the fact         table through the travel information unique identifier "cicid", <br />
   5.5- "Visa type" dimension table which maps the shortened 


#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [5]:
# Reading the immigration data sample
df_immigration_sample = pd.read_csv('immigration_data_sample.csv')

In [6]:
display(df_immigration_sample.head())
df_immigration_sample.columns

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

## Now, to understand what different columns represent

<b><i>Data dictionary</i></b>

<table class="tg" align="left">
  <tr>
    <th class="tg-0pky">Feature</th>
    <th class="tg-0pky">Description</th>
  </tr>
 <tr><td class="tg-0pky">cicid</td><td class="tg-0pky">Unique record ID</td>
 <tr><td class="tg-0pky">i94yr</td><td class="tg-0pky">year - 4 digits (xyzw)</td>
 <tr><td class="tg-0pky">i94mon</td><td class="tg-0pky">month (1-12)</td>
 <tr><td class="tg-0pky">i94cit</td><td class="tg-0pky">3 digit code for immigrant country of birth</td>
 <tr><td class="tg-0pky">i94res</td><td class="tg-0pky">3 digit code for immigrant country of residence </td>
 <tr><td class="tg-0pky">i94port</td><td class="tg-0pky">Port of admission</td>
 <tr><td class="tg-0pky">arrdate</td><td class="tg-0pky">Arrival Date in the USA</td>
 <tr><td class="tg-0pky">i94mode</td><td class="tg-0pky">Mode of transportation (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported)</td>
 <tr><td class="tg-0pky">i94addr</td><td class="tg-0pky">USA State of arrival</td>
 <tr><td class="tg-0pky">depdate</td><td class="tg-0pky">Departure Date from the USA</td>
 <tr><td class="tg-0pky">i94bir</td><td class="tg-0pky">Age of Respondent in Years</td>
 <tr><td class="tg-0pky">i94visa</td><td class="tg-0pky">Visa codes collapsed into three categories</td>
 <tr><td class="tg-0pky">count</td><td class="tg-0pky">Field used for summary statistics</td>
 <tr><td class="tg-0pky">dtadfile</td><td class="tg-0pky">Character Date Field - Date added to I-94 Files</td>
 <tr><td class="tg-0pky">visapost</td><td class="tg-0pky">Department of State where where Visa was issued </td>
 <tr><td class="tg-0pky">occup</td><td class="tg-0pky">Occupation that will be performed in U.S</td>
 <tr><td class="tg-0pky">entdepa</td><td class="tg-0pky">Arrival Flag - admitted or paroled into the U.S.</td>
 <tr><td class="tg-0pky">entdepd</td><td class="tg-0pky">Departure Flag - Departed, lost I-94 or is deceased</td>
 <tr><td class="tg-0pky">entdepu</td><td class="tg-0pky">Update Flag - Either apprehended, overstayed, adjusted to perm residence</td>
 <tr><td class="tg-0pky">matflag</td><td class="tg-0pky">Match flag - Match of arrival and departure records</td>
 <tr><td class="tg-0pky">biryear</td><td class="tg-0pky">4 digit year of birth</td>
 <tr><td class="tg-0pky">dtaddto</td><td class="tg-0pky">Character Date Field - Date to which admitted to U.S. (allowed to stay until)</td>
 <tr><td class="tg-0pky">gender</td><td class="tg-0pky">Non-immigrant sex</td>
 <tr><td class="tg-0pky">insnum</td><td class="tg-0pky">INS number</td>
 <tr><td class="tg-0pky">airline</td><td class="tg-0pky">Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">admnum</td><td class="tg-0pky">Admission Number</td>
 <tr><td class="tg-0pky">fltno</td><td class="tg-0pky">Flight number of Airline used to arrive in U.S.</td>
 <tr><td class="tg-0pky">visatype</td><td class="tg-0pky">Class of admission legally admitting the non-immigrant to temporarily stay in U.S.</td>
</table>

In [7]:
df_immigration_sample.visatype.value_counts()

WT     443
B2     356
WB      91
B1      61
GMT     27
F1      10
CP       5
F2       3
E2       3
M1       1
Name: visatype, dtype: int64

In [8]:
df_immigration_sample.i94visa.value_counts()

2.0    831
1.0    155
3.0     14
Name: i94visa, dtype: int64

In [9]:
df_immigration_sample.i94addr.value_counts()

FL    188
CA    163
NY    161
HI     53
TX     42
NV     34
IL     31
GU     27
MA     26
NJ     20
WA     19
GA     19
VA     13
DC     12
NE     12
MD     11
PA     10
MI      9
NC      9
LA      8
TN      7
IN      7
CT      6
AL      5
CO      5
OH      5
AZ      5
SC      3
MN      3
MP      3
OR      2
VT      2
UN      2
MO      2
IA      1
VQ      1
ID      1
AR      1
OK      1
NH      1
RI      1
ME      1
PR      1
SW      1
WI      1
TE      1
KY      1
KS      1
NM      1
MS      1
UT      1
Name: i94addr, dtype: int64

## We will use from here cicid, i94mon, i94res, i94addr, i94visa, visatype

In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_immigration_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat')


In [11]:
df_immigration_spark.count()

2847924

In [None]:
#write to parquet
df_immigration_spark.write.parquet("sas_data")
df_immigration_spark=spark.read.parquet("sas_data")

## Now look at city demographic csv

In [12]:
df_city_demo = pd.read_csv('us-cities-demographics.csv')
df_city_demo.head()

Unnamed: 0,City;State;Median Age;Male Population;Female Population;Total Population;Number of Veterans;Foreign-born;Average Household Size;State Code;Race;Count
0,Silver Spring;Maryland;33.8;40601;41862;82463;...
1,Quincy;Massachusetts;41.0;44129;49500;93629;41...
2,Hoover;Alabama;38.5;38040;46799;84839;4819;822...
3,Rancho Cucamonga;California;34.5;88127;87105;1...
4,Newark;New Jersey;34.6;138040;143873;281913;58...


### Not quite right, seperate by ;

In [13]:
df_city_demo = pd.read_csv('us-cities-demographics.csv', sep=';')
df_city_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


## We will use the demographics data to map the state alpha code to the state name

## Now look at country temperature csv

In [14]:
df_temperature = pd.read_csv('../../data2/GlobalLandTemperaturesByCity.csv')
df_temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [15]:
pd.set_option('display.max_rows', 10)
df_temperature[df_temperature['Country'] == 'United States']

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W
...,...,...,...,...,...,...,...
8439242,2013-05-01,15.544,0.281,Yonkers,United States,40.99N,74.56W
8439243,2013-06-01,20.892,0.273,Yonkers,United States,40.99N,74.56W
8439244,2013-07-01,24.722,0.279,Yonkers,United States,40.99N,74.56W
8439245,2013-08-01,21.001,0.323,Yonkers,United States,40.99N,74.56W


### We'll use the data to calculate average temperature per country

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

## First I would like to begin for the temperature data because I noticed it included much more data than I would need

In [16]:
display(df_temperature['dt'].min())
display(df_temperature['dt'].max())

print("The number of rows are", len(df_temperature))

'1743-11-01'

'2013-09-01'

The number of rows are 8599212


### As we see that is just too much, we wouldn't need temperature data for 270 years to calculate the average for our task. 50 years is enough.

In [17]:
df_temperature = df_temperature[df_temperature['dt'] >= '1963-09-01']

display(df_temperature['dt'].min())
display(df_temperature['dt'].max())

print("Now the number of rows are", len(df_temperature))

'1963-09-01'

'2013-09-01'

Now the number of rows are 2109510


### Now we check for missing values

In [18]:
df_temperature.isnull().sum()

dt                                  0
AverageTemperature               3070
AverageTemperatureUncertainty    3070
City                                0
Country                             0
Latitude                            0
Longitude                           0
dtype: int64

In [19]:
df_temperature[df_temperature.AverageTemperature.isnull()]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
3238,2013-09-01,,,Århus,Denmark,57.05N,10.33E
6477,2013-09-01,,,Çorlu,Turkey,40.99N,27.69E
9606,2013-09-01,,,Çorum,Turkey,40.99N,34.08E
11924,2013-09-01,,,Öskemen,Kazakhstan,50.63N,82.39E
14242,2013-09-01,,,Ürümqi,China,44.20N,87.20E
...,...,...,...,...,...,...,...
8587519,2013-09-01,,,Zouxian,China,36.17N,117.35E
8589604,2013-09-01,,,Zunyi,China,28.13N,106.36E
8592843,2013-09-01,,,Zurich,Switzerland,47.42N,8.29E
8595972,2013-09-01,,,Zuwarah,Libya,32.95N,12.45E


### It seems that there is a common error in 2013-09-01, but let's see more

In [20]:
display(df_temperature[df_temperature.AverageTemperature.isnull()]['dt'].value_counts())

2013-09-01    3070
Name: dt, dtype: int64

### So all from the same day, and it is the last date in the dataset. We'll just act as this value never existed as we have alot of data. We could have tried to fill it using average values from previous years but that will not change anything when we calculate the average temperature!

In [21]:
df_temperature = df_temperature[df_temperature.AverageTemperature.notnull()]
print("Now the number of rows are", len(df_temperature))

Now the number of rows are 2106440


In [22]:
df_temperature['dt']= pd.to_datetime(df_temperature['dt'])

### Now the demographics data

In [23]:
df_city_demo.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


### For our task, we only need the city and state

In [24]:
df_city_demo['State_Code'] = df_city_demo['State Code']
df_city_demo = df_city_demo.drop(['State Code'], axis = 1)
df_city_demo = df_city_demo[['City', 'State_Code', 'State']]
df_city_demo.head(5)

Unnamed: 0,City,State_Code,State
0,Silver Spring,MD,Maryland
1,Quincy,MA,Massachusetts
2,Hoover,AL,Alabama
3,Rancho Cucamonga,CA,California
4,Newark,NJ,New Jersey


### Check for null enteries for this dataset also

In [25]:
df_city_demo.isnull().sum()

City          0
State_Code    0
State         0
dtype: int64

### That's convenient, now drop duplicates

In [26]:
df_city_demo = df_city_demo.drop_duplicates()
df_city_demo.head()

Unnamed: 0,City,State_Code,State
0,Silver Spring,MD,Maryland
1,Quincy,MA,Massachusetts
2,Hoover,AL,Alabama
3,Rancho Cucamonga,CA,California
4,Newark,NJ,New Jersey


In [27]:
df_city_demo[df_city_demo[['City']].duplicated()]

Unnamed: 0,City,State_Code,State
177,Wilmington,DE,Delaware
210,Lakewood,CA,California
238,Glendale,CA,California
242,Peoria,AZ,Arizona
250,Westminster,CA,California
...,...,...,...
1470,Albany,NY,New York
1543,Columbia,SC,South Carolina
1621,Aurora,CO,Colorado
1629,Bloomington,MN,Minnesota


In [28]:
df_city_demo[df_city_demo.City == 'Columbia']


Unnamed: 0,City,State_Code,State
62,Columbia,MD,Maryland
874,Columbia,MO,Missouri
1543,Columbia,SC,South Carolina


### So these are actually different cities with the same name in the US, quite confusing. Let's check how is it reported in the temperature df

### So it is very difficult to know which Columbia the dataset is talking about (unless we check the latitude and longitude), which is very tedious. Alternatively we'll load another file for the us states.

In [29]:
df_temperature_cities = df_temperature

df_temperature_usstates = pd.read_csv('GlobalLandTemperaturesByState.csv')
df_temperature_usstates = df_temperature_usstates[df_temperature_usstates['Country'] == "United States"]
df_temperature_usstates.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
7458,1743-11-01,10.722,2.898,Alabama,United States
7459,1743-12-01,,,Alabama,United States
7460,1744-01-01,,,Alabama,United States
7461,1744-02-01,,,Alabama,United States
7462,1744-03-01,,,Alabama,United States


### Since it is the same dataset, do the same operations

In [30]:
df_temperature_usstates = df_temperature_usstates[df_temperature_usstates['dt'] >= '1963-09-01']
df_temperature_usstates = df_temperature_usstates[df_temperature_usstates.AverageTemperature.notnull()]
df_temperature_usstates['dt']= pd.to_datetime(df_temperature_usstates['dt'])
df_temperature_usstates.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
10096,1963-09-01,23.083,0.283,Alabama,United States
10097,1963-10-01,18.871,0.239,Alabama,United States
10098,1963-11-01,12.082,0.259,Alabama,United States
10099,1963-12-01,3.331,0.102,Alabama,United States
10100,1964-01-01,6.09,0.291,Alabama,United States


### Now, with the immigration data

### Since we are interested in the following columns only, then we'll remove other columns to make all next processes faster. The columns are: cicid, i94yr, i94mon, i94city, i94res, arrdate, i94mode, i94addr, i94visa, visatype

In [31]:
df_immigration_spark.createOrReplaceTempView("immigration_table")
df_immigration_spark = spark.sql("""select cicid, i94cit,
i94res, arrdate, i94mode, i94addr, i94visa, visatype
from immigration_table""")

In [32]:
spark.sql("""
select * from immigration_table limit 5
""").show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|      admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+------------+-----+--------+
|  7.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|      T|   null|   null|   null| 1996.0|     D/S|     M|  null|     LH|3.46608285E8|  424|      F1|
|  8.0|2016.0|   1.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|    3.0|  1.0|    null|    null| null|

In [33]:
df_immigration_spark.count()

2847924

### Good, now only filter with the air travel and remove the column since I am also interested in this type of travel

In [34]:
df_immigration_spark.createOrReplaceTempView("immigration_table")
df_immigration_spark = spark.sql("""select cicid, i94cit,
i94res, arrdate, i94addr, i94visa, visatype
from immigration_table where i94mode = 1""")

In [35]:
df_immigration_spark.createOrReplaceTempView("immigration_table")
spark.sql("""
select * from immigration_table limit 5
""").show()

+-----+------+------+-------+-------+-------+--------+
|cicid|i94cit|i94res|arrdate|i94addr|i94visa|visatype|
+-----+------+------+-------+-------+-------+--------+
|  7.0| 101.0| 101.0|20465.0|     MA|    3.0|      F1|
|  8.0| 101.0| 101.0|20465.0|     MA|    3.0|      F1|
|  9.0| 101.0| 101.0|20469.0|     CT|    2.0|      B2|
| 10.0| 101.0| 101.0|20469.0|     CT|    2.0|      B2|
| 11.0| 101.0| 101.0|20469.0|     CT|    2.0|      B2|
+-----+------+------+-------+-------+-------+--------+



In [36]:
df_immigration_spark.count()

2761448

### Now get the arrival month and year from the arrdate, and since arrdate represents the number of days since 1960. This information could be the same as i94yr and i94mon, but since I am not sure. I will calculate from arrdate.

In [37]:
df_immigration_spark = spark.sql("""select *, date_add(to_date('1960-01-01'), arrdate) as arrival_date
from immigration_table""")
df_immigration_spark.createOrReplaceTempView("immigration_table")

### Now, rename and remove unwanted columns

In [38]:
df_immigration_spark = spark.sql("""select cicid, arrival_date, i94cit as immig_country_birth_code,
i94res as immig_country_residence_code, i94addr as arrival_usa_state, i94visa as categories_arrival_visa,
visatype as visa_category
from immigration_table""")

In [39]:
df_immigration_spark.createOrReplaceTempView("immigration_table")
# First we check the uniqueness of the cicid
spark.sql("""
select count (distinct cicid)
from immigration_table
""").show()


+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              2761448|
+---------------------+



In [40]:
df_immigration_spark.count()

2761448

### So as expected cicid is unique and is perfect to be the primary key of the fact table!

In [41]:
spark.sql("""
select length (arrival_usa_state) as state_address_length, count(*) as occurence
from immigration_table
group by state_address_length
order by occurence desc
""").show()

+--------------------+---------+
|state_address_length|occurence|
+--------------------+---------+
|                   2|  2620906|
|                null|   140421|
|                   1|      121|
+--------------------+---------+



In [42]:
spark.sql("""
select arrival_usa_state from immigration_table where length (arrival_usa_state) = 1 limit 5
""").show()

+-----------------+
|arrival_usa_state|
+-----------------+
|                C|
|                X|
|                S|
|                N|
|                N|
+-----------------+



In [44]:
df_city_demo['State_Code'].astype(str).map(len).value_counts()

2    596
Name: State_Code, dtype: int64

### So as expected, the state codes has to be 2 characters, and the null values and 1 character values in the immigration table are not right.

### Just one more thing

In [45]:
spark.sql("""
select count(distinct(arrival_usa_state)) from immigration_table
""").show()

+---------------------------------+
|count(DISTINCT arrival_usa_state)|
+---------------------------------+
|                              410|
+---------------------------------+



### So great, all the arrival states in the immigration table are included in the city demographics data frame
### So let's only include state codes with 2 characters

In [46]:
df_immigration_spark = spark.sql("""
select * from immigration_table where length (arrival_usa_state) = 2
""")
spark.sql("""
select * from immigration_table limit 10""").show()
df_immigration_spark.createOrReplaceTempView("immigration_table")


+-----+------------+------------------------+----------------------------+-----------------+-----------------------+-------------+
|cicid|arrival_date|immig_country_birth_code|immig_country_residence_code|arrival_usa_state|categories_arrival_visa|visa_category|
+-----+------------+------------------------+----------------------------+-----------------+-----------------------+-------------+
|  7.0|  2016-01-12|                   101.0|                       101.0|               MA|                    3.0|           F1|
|  8.0|  2016-01-12|                   101.0|                       101.0|               MA|                    3.0|           F1|
|  9.0|  2016-01-16|                   101.0|                       101.0|               CT|                    2.0|           B2|
| 10.0|  2016-01-16|                   101.0|                       101.0|               CT|                    2.0|           B2|
| 11.0|  2016-01-16|                   101.0|                       101.0|         

### Since we also know what the 3 visa number categories refer to, so we can convert it for easier read

In [47]:
spark.sql("""
select distinct(categories_arrival_visa) from immigration_table
""").show()

+-----------------------+
|categories_arrival_visa|
+-----------------------+
|                    1.0|
|                    3.0|
|                    2.0|
+-----------------------+



In [48]:
df_immigration_spark = spark.sql(""" select cicid, arrival_date,
                        immig_country_birth_code, immig_country_residence_code,
                        arrival_usa_state, case 
                        when categories_arrival_visa = 1.0 then 'Business' 
                        when categories_arrival_visa = 2.0 then 'Leisure'
                        when categories_arrival_visa = 3.0 then 'Study'
                        else 'Unknown' end
                        as visa_type, visa_category 
                        from immigration_table""")

df_immigration_spark.createOrReplaceTempView("immigration_table")

### Now let's make sure that each visa_category belongs to a unique visa_type

In [49]:
spark.sql("""
select visa_type, visa_category, count(*) as number_of_occurences
from immigration_table
group by visa_type, visa_category
order by visa_type, visa_category
""").show()


+---------+-------------+--------------------+
|visa_type|visa_category|number_of_occurences|
+---------+-------------+--------------------+
| Business|           B1|              186997|
| Business|           E1|                5237|
| Business|           E2|               26994|
| Business|          GMB|                 556|
| Business|            I|                3350|
| Business|           I1|                 196|
| Business|           WB|              276406|
|  Leisure|           B2|              871889|
|  Leisure|           CP|                 994|
|  Leisure|          CPL|                  28|
|  Leisure|          GMT|               95267|
|  Leisure|          SBP|                   5|
|  Leisure|           WT|              779660|
|    Study|           F1|              362039|
|    Study|           F2|                9270|
|    Study|           M1|                1968|
|    Study|           M2|                  50|
+---------+-------------+--------------------+



### So they are unique, B1 for example is a cateogory for business type visa, while B2 is a category for a Leisure/Tourist type visa and both are temporary, whill research more about the other types when we get a specific insight and we try and analyze it

In [50]:
spark.sql("""select * from immigration_table limit 10""").show()

+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
|cicid|arrival_date|immig_country_birth_code|immig_country_residence_code|arrival_usa_state|visa_type|visa_category|
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
|  7.0|  2016-01-12|                   101.0|                       101.0|               MA|    Study|           F1|
|  8.0|  2016-01-12|                   101.0|                       101.0|               MA|    Study|           F1|
|  9.0|  2016-01-16|                   101.0|                       101.0|               CT|  Leisure|           B2|
| 10.0|  2016-01-16|                   101.0|                       101.0|               CT|  Leisure|           B2|
| 11.0|  2016-01-16|                   101.0|                       101.0|               CT|  Leisure|           B2|
| 12.0|  2016-01-21|                   101.0|                   

### Check for null values in all columns

In [51]:
spark.sql("""select * from immigration_table where arrival_date is NULL limit 5""").show()

+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
|cicid|arrival_date|immig_country_birth_code|immig_country_residence_code|arrival_usa_state|visa_type|visa_category|
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+



In [55]:
spark.sql("""select * from immigration_table where immig_country_birth_code is NULL limit 5""").show()

+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
|cicid|arrival_date|immig_country_birth_code|immig_country_residence_code|arrival_usa_state|visa_type|visa_category|
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+



In [59]:
spark.sql("""select * from immigration_table where immig_country_residence_code is NULL limit 5""").show()

+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
|cicid|arrival_date|immig_country_birth_code|immig_country_residence_code|arrival_usa_state|visa_type|visa_category|
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+



In [61]:
spark.sql("""select * from immigration_table where arrival_usa_state is NULL limit 5""").show()

+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
|cicid|arrival_date|immig_country_birth_code|immig_country_residence_code|arrival_usa_state|visa_type|visa_category|
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+



In [63]:
spark.sql("""select * from immigration_table where visa_type is NULL limit 5""").show()

+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
|cicid|arrival_date|immig_country_birth_code|immig_country_residence_code|arrival_usa_state|visa_type|visa_category|
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+



In [65]:
spark.sql("""select * from immigration_table where visa_category is NULL limit 5""").show()

+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
|cicid|arrival_date|immig_country_birth_code|immig_country_residence_code|arrival_usa_state|visa_type|visa_category|
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+
+-----+------------+------------------------+----------------------------+-----------------+---------+-------------+



### Good, no Null values in any of the columns!

### Finally, since the immigrant country birth country and residence country are in codes. We have to have the mapping for it to be informative. Thus I searched and found countries.csv which maps the country codes to their respective names.

In [67]:
df_country_codes = pd.read_csv('countries.csv')
df_country_codes.head()

Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [69]:
df_country_codes.isnull().any()

code       False
country    False
dtype: bool

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
As explained in the beginning in step 2, we will use the immigration table to create the fact table "immigration_fact" with the below columns:
- cicid (primary key)
- arrival_date
- immig_country_birth
- immig_country_residence
- arrival_usa_state_alpha_code
- visa_type
- visa_category

Now comes the dimension tables, starting with "country_monthly_temperature_dim", which will include monthly average temperature of each country (to be used in relation to departure countries temperature). It links to the fact table through the country in the dimension table and through residence and birth countries in the fact table. It will include these columns:

- country
- month
- average_temperature
- - primary keys (country, month)

Also, "us_state_monthly_temperature_dim" which has the same purpose as the previous but was detached due to non-unqiue naming of cities between different us states. Links to the fact table via state. The columns are:

- state_alpha_code
- month
- average_temperature
- - primary keys (state, month)

"usa_states_dim" dervied from the demographics is also important and links to the fact table via state code, and the columnns are:

- state_alpha_code (primary key)
- state

Finally, "arrival_dates_dim", which links to the fact table via arrival_date, and the columns are:

- date (primary key)
- year
- month
- week
- day
- day_of_week
- season




#### 3.2 Mapping Out Data Pipelines
Here is how the ETL is designed:

1- E (Extraction): Read the data from the SAS data and csv files.

2- TL (Transformation and loading): Do the following to construct each table:

"immigration_fact"

Using the immigration data and countries data (from countries.csv):

- Drop "i94port" because we are not interested in airports in the task we have chosen. (Done)
- Drop "i94yr" and "i94mon" as we only need the data which is "arrdate". (Done)
- Filter by "i94mode" = 1 (air travel) and then drop the column. (Done)
- Drop "depdate" as we are not interested in the departure date from USA. (Done)
- Drop "count", "dtadfile", "visapost", "occup", "entdepa", "entdepd", "entdepu", "matflag", "biryear", "gender", "isnum", "airline", "amnum", "fltno" as they are not needed for our task. (Done)
- Replace the country codes with their corresponding country names. (Not Yet Done)
- Load the data to the fact table. (Not Yet Done)
- Write to parquet. (Not Yet Done)

"country_monthly_temperature_dim"

Using the city_temperature data:
- Drop unneeded temperatures of more than 50 years in the past. (Done)
- Handle the null temperatures. (Done)
- Aggregate the city temperatures to calculate the monthly average temperature for each country. (Not Yet Done)
- Load the data to the dimension table. (Not Yet Done)
- Write to parquet. (Not Yet Done)

"us_state_monthly_temperature_dim"
Using the state_temperature data and the city_demographics data:
- Drop unneeded temperatures of more than 50 years in the past. (Done)
- Handle the null temperatures. (Done)
- Map the states to state_codes using city_demographics to be inserted in the dimension table as alpha codes for each state. (Not Yet Done)
- Load the data to the dimension table. (Not Yet Done)
- Write to parquet. (Not Yet Done)

"usa_states_dim"
Using the demographics data (All Not Yet Done):
- Filter and get the wanted columns (the state code and the state name).
- Load the data to the dimension table.
- Write to parquet.

"arrival_dates_dim"
Using the immigration data (All Not Yet Done):
- Extract all the columns from the arrival date (year, month, week, day, day_of_week and season)
- Load the data to the dimension table.
- Write to parquet.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### First, the immigration fact table
To be done:

- Replace the country codes with their corresponding country names. 
- Load the data to the fact table.
- Write to parquet.

In [79]:
spark.sql("""select * from immigration_table limit 10""").show()

+-------+------------+-------------------+-----------------------+-----------------+---------+-------------+
|  cicid|arrival_date|immig_country_birth|immig_country_residence|arrival_usa_state|visa_type|visa_category|
+-------+------------+-------------------+-----------------------+-----------------+---------+-------------+
|88402.0|  2016-01-08|           MONGOLIA|               MONGOLIA|               GU|  Leisure|           B2|
|88403.0|  2016-01-21|           MONGOLIA|               MONGOLIA|               MA|    Study|           F1|
|88406.0|  2016-01-02|           MONGOLIA|               MONGOLIA|               MO|    Study|           F1|
|88407.0|  2016-01-02|           MONGOLIA|               MONGOLIA|               UT|    Study|           F1|
|88408.0|  2016-01-04|           MONGOLIA|               MONGOLIA|               IL|  Leisure|           B2|
|88409.0|  2016-01-05|           MONGOLIA|               MONGOLIA|               IL|    Study|           F1|
|88410.0|  2016-01-

In [73]:
df_country_codes.head(2)

Unnamed: 0,code,country
0,582,MEXICO
1,236,AFGHANISTAN


In [75]:
df_country_codes_spark = spark.createDataFrame(df_country_codes) 
df_country_codes_spark.printSchema()
df_country_codes_spark.show(2)
df_country_codes_spark.createOrReplaceTempView("country_codes_table")

root
 |-- code: long (nullable = true)
 |-- country: string (nullable = true)

+----+-----------+
|code|    country|
+----+-----------+
| 582|    MEXICO |
| 236|AFGHANISTAN|
+----+-----------+
only showing top 2 rows



#### Now adding the country of birth from the code

In [77]:
spark.sql("""
select imm_t.*, cc_t.country as immig_country_birth
from immigration_table imm_t
inner join country_codes_table cc_t
on imm_t.immig_country_birth_code = cc_t.code
""").createOrReplaceTempView("immigration_table")

AnalysisException: "cannot resolve '`imm_t.immig_country_birth_code`' given input columns: [imm_t.visa_type, imm_t.immig_country_birth, imm_t.arrival_usa_state, imm_t.cicid, imm_t.immig_country_residence, cc_t.code, cc_t.country, imm_t.visa_category, imm_t.arrival_date]; line 5 pos 3;\n'Project [ArrayBuffer(imm_t).*, 'cc_t.country AS immig_country_birth#935]\n+- 'Join Inner, ('imm_t.immig_country_birth_code = code#895L)\n   :- SubqueryAlias `imm_t`\n   :  +- SubqueryAlias `immigration_table`\n   :     +- Project [cicid#0, arrival_date#271, immig_country_birth#803, immig_country_residence#878, arrival_usa_state#282, visa_type#388, visa_category#284]\n   :        +- SubqueryAlias `immigration_table`\n   :           +- Project [cicid#0, arrival_date#271, immig_country_birth_code#280, immig_country_residence_code#281, arrival_usa_state#282, visa_type#388, visa_category#284, immig_country_birth#803, country#793 AS immig_country_residence#878]\n   :              +- Join Inner, (immig_country_residence_code#281 = cast(code#792L as double))\n   :                 :- SubqueryAlias `imm_t`\n   :                 :  +- SubqueryAlias `immigration_table`\n   :                 :     +- Project [cicid#0, arrival_date#271, immig_country_birth_code#280, immig_country_residence_code#281, arrival_usa_state#282, visa_type#388, visa_category#284, country#793 AS immig_country_birth#803]\n   :                 :        +- Join Inner, (immig_country_birth_code#280 = cast(code#792L as double))\n   :                 :           :- SubqueryAlias `imm_t`\n   :                 :           :  +- SubqueryAlias `immigration_table`\n   :                 :           :     +- Project [cicid#0, arrival_date#271, immig_country_birth_code#280, immig_country_residence_code#281, arrival_usa_state#282, CASE WHEN (categories_arrival_visa#283 = cast(1.0 as double)) THEN Business WHEN (categories_arrival_visa#283 = cast(2.0 as double)) THEN Leisure WHEN (categories_arrival_visa#283 = cast(3.0 as double)) THEN Study ELSE Unknown END AS visa_type#388, visa_category#284]\n   :                 :           :        +- SubqueryAlias `immigration_table`\n   :                 :           :           +- Project [cicid#0, arrival_date#271, immig_country_birth_code#280, immig_country_residence_code#281, arrival_usa_state#282, categories_arrival_visa#283, visa_category#284]\n   :                 :           :              +- Filter (length(arrival_usa_state#282) = 2)\n   :                 :           :                 +- SubqueryAlias `immigration_table`\n   :                 :           :                    +- Project [cicid#0, arrival_date#271, i94cit#3 AS immig_country_birth_code#280, i94res#4 AS immig_country_residence_code#281, i94addr#8 AS arrival_usa_state#282, i94visa#11 AS categories_arrival_visa#283, visatype#27 AS visa_category#284]\n   :                 :           :                       +- SubqueryAlias `immigration_table`\n   :                 :           :                          +- Project [cicid#0, i94cit#3, i94res#4, arrdate#6, i94addr#8, i94visa#11, visatype#27, date_add(to_date(1960-01-01, None), cast(arrdate#6 as int)) AS arrival_date#271]\n   :                 :           :                             +- SubqueryAlias `immigration_table`\n   :                 :           :                                +- Project [cicid#0, i94cit#3, i94res#4, arrdate#6, i94addr#8, i94visa#11, visatype#27]\n   :                 :           :                                   +- Filter (i94mode#7 = cast(1 as double))\n   :                 :           :                                      +- SubqueryAlias `immigration_table`\n   :                 :           :                                         +- Project [cicid#0, i94cit#3, i94res#4, arrdate#6, i94mode#7, i94addr#8, i94visa#11, visatype#27]\n   :                 :           :                                            +- SubqueryAlias `immigration_table`\n   :                 :           :                                               +- Relation[cicid#0,i94yr#1,i94mon#2,i94cit#3,i94res#4,i94port#5,arrdate#6,i94mode#7,i94addr#8,depdate#9,i94bir#10,i94visa#11,count#12,dtadfile#13,visapost#14,occup#15,entdepa#16,entdepd#17,entdepu#18,matflag#19,biryear#20,dtaddto#21,gender#22,insnum#23,... 4 more fields] SasRelation(../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat,null,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,0)\n   :                 :           +- SubqueryAlias `cc_t`\n   :                 :              +- SubqueryAlias `country_codes_table`\n   :                 :                 +- LogicalRDD [code#792L, country#793], false\n   :                 +- SubqueryAlias `cc_t`\n   :                    +- SubqueryAlias `country_codes_table`\n   :                       +- LogicalRDD [code#792L, country#793], false\n   +- SubqueryAlias `cc_t`\n      +- SubqueryAlias `country_codes_table`\n         +- LogicalRDD [code#895L, country#896], false\n"

In [None]:
spark.sql("""select * from immigration_table limit 5""").show()

### Now adding the residence country from the codes

In [80]:
spark.sql("""
select imm_t.*, cc_t.country as immig_country_residence
from immigration_table imm_t
inner join country_codes_table cc_t
on imm_t.immig_country_residence_code = cc_t.code
""").createOrReplaceTempView("immigration_table")

AnalysisException: "cannot resolve '`imm_t.immig_country_residence_code`' given input columns: [imm_t.visa_type, imm_t.immig_country_birth, imm_t.arrival_usa_state, imm_t.cicid, imm_t.immig_country_residence, cc_t.code, cc_t.country, imm_t.visa_category, imm_t.arrival_date]; line 5 pos 3;\n'Project [ArrayBuffer(imm_t).*, 'cc_t.country AS immig_country_residence#994]\n+- 'Join Inner, ('imm_t.immig_country_residence_code = code#895L)\n   :- SubqueryAlias `imm_t`\n   :  +- SubqueryAlias `immigration_table`\n   :     +- Project [cicid#0, arrival_date#271, immig_country_birth#803, immig_country_residence#878, arrival_usa_state#282, visa_type#388, visa_category#284]\n   :        +- SubqueryAlias `immigration_table`\n   :           +- Project [cicid#0, arrival_date#271, immig_country_birth_code#280, immig_country_residence_code#281, arrival_usa_state#282, visa_type#388, visa_category#284, immig_country_birth#803, country#793 AS immig_country_residence#878]\n   :              +- Join Inner, (immig_country_residence_code#281 = cast(code#792L as double))\n   :                 :- SubqueryAlias `imm_t`\n   :                 :  +- SubqueryAlias `immigration_table`\n   :                 :     +- Project [cicid#0, arrival_date#271, immig_country_birth_code#280, immig_country_residence_code#281, arrival_usa_state#282, visa_type#388, visa_category#284, country#793 AS immig_country_birth#803]\n   :                 :        +- Join Inner, (immig_country_birth_code#280 = cast(code#792L as double))\n   :                 :           :- SubqueryAlias `imm_t`\n   :                 :           :  +- SubqueryAlias `immigration_table`\n   :                 :           :     +- Project [cicid#0, arrival_date#271, immig_country_birth_code#280, immig_country_residence_code#281, arrival_usa_state#282, CASE WHEN (categories_arrival_visa#283 = cast(1.0 as double)) THEN Business WHEN (categories_arrival_visa#283 = cast(2.0 as double)) THEN Leisure WHEN (categories_arrival_visa#283 = cast(3.0 as double)) THEN Study ELSE Unknown END AS visa_type#388, visa_category#284]\n   :                 :           :        +- SubqueryAlias `immigration_table`\n   :                 :           :           +- Project [cicid#0, arrival_date#271, immig_country_birth_code#280, immig_country_residence_code#281, arrival_usa_state#282, categories_arrival_visa#283, visa_category#284]\n   :                 :           :              +- Filter (length(arrival_usa_state#282) = 2)\n   :                 :           :                 +- SubqueryAlias `immigration_table`\n   :                 :           :                    +- Project [cicid#0, arrival_date#271, i94cit#3 AS immig_country_birth_code#280, i94res#4 AS immig_country_residence_code#281, i94addr#8 AS arrival_usa_state#282, i94visa#11 AS categories_arrival_visa#283, visatype#27 AS visa_category#284]\n   :                 :           :                       +- SubqueryAlias `immigration_table`\n   :                 :           :                          +- Project [cicid#0, i94cit#3, i94res#4, arrdate#6, i94addr#8, i94visa#11, visatype#27, date_add(to_date(1960-01-01, None), cast(arrdate#6 as int)) AS arrival_date#271]\n   :                 :           :                             +- SubqueryAlias `immigration_table`\n   :                 :           :                                +- Project [cicid#0, i94cit#3, i94res#4, arrdate#6, i94addr#8, i94visa#11, visatype#27]\n   :                 :           :                                   +- Filter (i94mode#7 = cast(1 as double))\n   :                 :           :                                      +- SubqueryAlias `immigration_table`\n   :                 :           :                                         +- Project [cicid#0, i94cit#3, i94res#4, arrdate#6, i94mode#7, i94addr#8, i94visa#11, visatype#27]\n   :                 :           :                                            +- SubqueryAlias `immigration_table`\n   :                 :           :                                               +- Relation[cicid#0,i94yr#1,i94mon#2,i94cit#3,i94res#4,i94port#5,arrdate#6,i94mode#7,i94addr#8,depdate#9,i94bir#10,i94visa#11,count#12,dtadfile#13,visapost#14,occup#15,entdepa#16,entdepd#17,entdepu#18,matflag#19,biryear#20,dtaddto#21,gender#22,insnum#23,... 4 more fields] SasRelation(../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat,null,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml,0)\n   :                 :           +- SubqueryAlias `cc_t`\n   :                 :              +- SubqueryAlias `country_codes_table`\n   :                 :                 +- LogicalRDD [code#792L, country#793], false\n   :                 +- SubqueryAlias `cc_t`\n   :                    +- SubqueryAlias `country_codes_table`\n   :                       +- LogicalRDD [code#792L, country#793], false\n   +- SubqueryAlias `cc_t`\n      +- SubqueryAlias `country_codes_table`\n         +- LogicalRDD [code#895L, country#896], false\n"

#### Now remove the code columns

In [81]:
spark.sql("""select cicid, arrival_date, immig_country_birth, immig_country_residence, arrival_usa_state,
visa_type, visa_category from immigration_table""").createOrReplaceTempView("immigration_table")

In [83]:
spark.sql("""select * from immigration_table limit 5""").show()

+--------+
|count(1)|
+--------+
| 2264187|
+--------+



#### Now inserting the data the fact table

In [84]:
immigration_fact = spark.sql("""select * from immigration_table""")
immigration_fact.createOrReplaceTempView("immigration_fact")

#### Finally for the fact table, write to parquet

In [85]:
immigration_fact.write.parquet("immigration_fact_parquet")

### Second, the "country_monthly_temperature_dim"

Using the city_temperature data:
- Aggregate the city temperatures to calculate the monthly average temperature for each country.
- Load the data to the dimension table.
- Write to parquet.

In [86]:
df_temperature_cities_spark = spark.createDataFrame(df_temperature_cities[['dt',
                                                                           'AverageTemperature',
                                                                           'AverageTemperatureUncertainty',
                                                                           'City',
                                                                           'Country']]) 
df_temperature_cities_spark.printSchema()
df_temperature_cities_spark.show(2)
df_temperature_cities_spark.createOrReplaceTempView("temperature_city_table")

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)

+-------------------+------------------+-----------------------------+-----+-------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|
+-------------------+------------------+-----------------------------+-----+-------+
|1963-09-01 00:00:00|            13.164|                         0.29|Århus|Denmark|
|1963-10-01 00:00:00|              9.17|                        0.269|Århus|Denmark|
+-------------------+------------------+-----------------------------+-----+-------+
only showing top 2 rows



#### First, aggregate the average temperature to be per country
#### Then load the data to the dimension table

In [87]:
country_monthly_temperature_dim = \
spark.sql("""select month(dt) as month, Country as country, avg(AverageTemperature) as average_temperature
from temperature_city_table
group by month, country
order by country, month""")

country_monthly_temperature_dim.createOrReplaceTempView("country_monthly_temperature_dim")

In [88]:
spark.sql("""select * from country_monthly_temperature_dim""").show()

+-----+-----------+-------------------+
|month|    country|average_temperature|
+-----+-----------+-------------------+
|    1|Afghanistan| 1.2564550000000008|
|    2|Afghanistan| 3.3597075000000007|
|    3|Afghanistan|  8.879800000000008|
|    4|Afghanistan| 14.820217500000004|
|    5|Afghanistan| 20.322742499999993|
|    6|Afghanistan| 25.062480000000015|
|    7|Afghanistan| 26.469322499999993|
|    8|Afghanistan| 25.051309999999987|
|    9|Afghanistan| 20.835020000000007|
|   10|Afghanistan| 14.701605000000008|
|   11|Afghanistan|            8.77815|
|   12|Afghanistan|  3.517519999999999|
|    1|    Albania|  8.439160000000001|
|    2|    Albania|  8.937439999999997|
|    3|    Albania| 10.927040000000003|
|    4|    Albania| 13.595800000000004|
|    5|    Albania| 18.187219999999996|
|    6|    Albania|            21.9605|
|    7|    Albania|  24.31614000000002|
|    8|    Albania| 24.315879999999996|
+-----+-----------+-------------------+
only showing top 20 rows



#### Finally write this table to parquet

In [89]:
country_monthly_temperature_dim.write.parquet("country_monthly_temperature_dim_parquet")

### Third, for "state_monthly_temperature_dim"
- Map the states to state_codes using city_demographics to be inserted in the dimension table as alpha codes for each state.
- Load the data to the dimension table.
- Write to parquet.

#### Convert demographics to a spark dataframe to be able to join

In [90]:
df_city_demo_spark = spark.createDataFrame(df_city_demo) 
df_city_demo_spark.printSchema()
df_city_demo_spark.show(2)
df_city_demo_spark.createOrReplaceTempView("city_demo_spark")

df_temperature_usstates_spark = spark.createDataFrame(df_temperature_usstates)
df_temperature_usstates_spark.printSchema()
df_temperature_usstates_spark.show(2)
df_temperature_usstates_spark.createOrReplaceTempView("temperature_state_table")

root
 |-- City: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- State: string (nullable = true)

+-------------+----------+-------------+
|         City|State_Code|        State|
+-------------+----------+-------------+
|Silver Spring|        MD|     Maryland|
|       Quincy|        MA|Massachusetts|
+-------------+----------+-------------+
only showing top 2 rows

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)

+-------------------+------------------+-----------------------------+-------+-------------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|  State|      Country|
+-------------------+------------------+-----------------------------+-------+-------------+
|1963-09-01 00:00:00|            23.083|          0.28300000000000003|Alabama|United States|
|1963-

#### Calculate the average State temperature per month

In [91]:
temperature_state_table = \
spark.sql("""select month(dt) as month, State as state, avg(AverageTemperature) as average_temperature
from temperature_state_table
group by month, state
order by state, month""")

temperature_state_table.createOrReplaceTempView("temperature_state_table")

In [92]:
spark.sql("""select * from temperature_state_table""").show()

+-----+-------+-------------------+
|month|  state|average_temperature|
+-----+-------+-------------------+
|    1|Alabama|            7.02144|
|    2|Alabama|  8.817859999999998|
|    3|Alabama| 13.237700000000002|
|    4|Alabama|           17.42764|
|    5|Alabama| 21.698400000000007|
|    6|Alabama| 25.478219999999997|
|    7|Alabama|           26.99136|
|    8|Alabama| 26.617319999999996|
|    9|Alabama| 23.761980392156865|
|   10|Alabama| 17.646179999999998|
|   11|Alabama| 12.616119999999999|
|   12|Alabama|  8.330459999999995|
|    1| Alaska|-19.071919999999995|
|    2| Alaska|          -16.97936|
|    3| Alaska|-13.393739999999998|
|    4| Alaska|           -5.79734|
|    5| Alaska|            3.31124|
|    6| Alaska|            9.87654|
|    7| Alaska| 12.050160000000004|
|    8| Alaska|            9.97292|
+-----+-------+-------------------+
only showing top 20 rows



In [93]:
us_state_monthly_temperature_dim = spark.sql("""
select ts_t.month, cds.State_Code as state_alpha_code, ts_t.average_temperature as average_temperature
from temperature_state_table ts_t
inner join city_demo_spark cds
on ts_t.state = cds.State
group by month, state_alpha_code, average_temperature
order by state_alpha_code, month
""")

us_state_monthly_temperature_dim.createOrReplaceTempView("us_state_monthly_temperature_dim")

In [94]:
spark.sql("""select * from us_state_monthly_temperature_dim""").show()

+-----+----------------+-------------------+
|month|state_alpha_code|average_temperature|
+-----+----------------+-------------------+
|    1|              AK|-19.071919999999995|
|    2|              AK|          -16.97936|
|    3|              AK|-13.393739999999998|
|    4|              AK|           -5.79734|
|    5|              AK|            3.31124|
|    6|              AK|            9.87654|
|    7|              AK| 12.050160000000004|
|    8|              AK|            9.97292|
|    9|              AK|  4.590639999999999|
|   10|              AK|            -4.8628|
|   11|              AK|-13.426379999999995|
|   12|              AK|          -17.49882|
|    1|              AL|            7.02144|
|    2|              AL|  8.817859999999998|
|    3|              AL| 13.237700000000002|
|    4|              AL|           17.42764|
|    5|              AL| 21.698400000000007|
|    6|              AL| 25.478219999999997|
|    7|              AL|           26.99136|
|    8|   

#### Finally, write the state_monthly_temperature to parquet

In [95]:
us_state_monthly_temperature_dim.write.parquet("us_state_monthly_temperature_dim_parquet")

### Fourth "usa_states_dim", the next steps are
Using the demographics data (All Not Yet Done):
- Filter and get the wanted columns (the state code and the state name).
- Load the data to the dimension table.
- Write to parquet.

### But since we already have a spark df and a table, we can directly filter from it

In [96]:
usa_states_dim = \
spark.sql("""select distinct(State_Code) as state_alpha_code, State as state
from city_demo_spark""")

usa_states_dim.createOrReplaceTempView("usa_states_dim")

In [97]:
spark.sql("""select * from usa_states_dim""").show()

+----------------+--------------------+
|state_alpha_code|               state|
+----------------+--------------------+
|              MT|             Montana|
|              NC|      North Carolina|
|              MD|            Maryland|
|              CO|            Colorado|
|              CT|         Connecticut|
|              IL|            Illinois|
|              NJ|          New Jersey|
|              DE|            Delaware|
|              DC|District of Columbia|
|              AR|            Arkansas|
|              TN|           Tennessee|
|              LA|           Louisiana|
|              AK|              Alaska|
|              CA|          California|
|              NM|          New Mexico|
|              UT|                Utah|
|              MI|            Michigan|
|              NY|            New York|
|              NH|       New Hampshire|
|              WA|          Washington|
+----------------+--------------------+
only showing top 20 rows



In [98]:
usa_states_dim.write.parquet("usa_states_dim_parquet")

### Now, for the last dimension table "arrival_dates_dim"
Using the immigration data
- Extract all the columns from the arrival date (year, month, week, day, day_of_week and season)
- Load the data to the dimension table.
- Write to parquet.

In [99]:
arrival_dates_dim = spark.sql("""
select distinct(arrival_date) as date, year(arrival_date) as year, month(arrival_date) as month,
weekofyear(arrival_date) as week, day(arrival_date) as day, date_format(arrival_date, 'EEEE') as day_of_week,
case when month(arrival_date) in (12, 1, 2) then 'Winter'
     when month(arrival_date) in (3, 4, 5) then 'Spring'
     when month(arrival_date) in (6, 7, 8) then 'Summer'
     when month(arrival_date) in (9, 10, 11) then 'Autumn'
     else 'date_error'
     end as season
from immigration_fact""")

arrival_dates_dim.createOrReplaceTempView("arrival_dates_dim")

In [100]:
arrival_dates_dim.write.parquet("arrival_dates_dim_parquet")

#### 4.2 Data Quality Checks

##### First, we'll check key uniqueness

In [101]:
spark.sql("""select count(*) - count(distinct(cicid)) from immigration_fact""").show()

+----------------------------------+
|(count(1) - count(DISTINCT cicid))|
+----------------------------------+
|                                 0|
+----------------------------------+



In [102]:
spark.sql("""select count(*) - count(distinct(month, country)) from country_monthly_temperature_dim""").show()

+-------------------------------------------------------------------------+
|(count(1) - count(DISTINCT named_struct(month, month, country, country)))|
+-------------------------------------------------------------------------+
|                                                                        0|
+-------------------------------------------------------------------------+



In [103]:
spark.sql("""select count(*) - count(distinct(month, state_alpha_code)) from us_state_monthly_temperature_dim""").show()

+-------------------------------------------------------------------------------------------+
|(count(1) - count(DISTINCT named_struct(month, month, state_alpha_code, state_alpha_code)))|
+-------------------------------------------------------------------------------------------+
|                                                                                          0|
+-------------------------------------------------------------------------------------------+



In [104]:
spark.sql("""select count(*) - count(distinct(state_alpha_code)) from usa_states_dim""").show()

+---------------------------------------------+
|(count(1) - count(DISTINCT state_alpha_code))|
+---------------------------------------------+
|                                            0|
+---------------------------------------------+



In [105]:
spark.sql("""select count(*) - count(distinct(date)) from arrival_dates_dim""").show()

+---------------------------------+
|(count(1) - count(DISTINCT date))|
+---------------------------------+
|                                0|
+---------------------------------+



#### Then we will do sort of functional tests by answering some of the questions related to the scope of the project, as the following:

1- What is the most nationality/residents that visits the US? <br />
2- Temperatures of the countries of the 5 most visiting nationalities, and temperature of destination us states? <br />
3- Most popular visa types? <br />
4- Which US state are most visited? <br />

#### To answer the first question 
1- What is the most nationality/residents that visits the US?

In [115]:
spark.sql("""select immig_country_birth, immig_country_residence, count(immig_country_birth)
from immigration_fact 
group by immig_country_birth, immig_country_residence
order by count(immig_country_birth) desc""").show()

+-------------------+-----------------------+--------------------------+
|immig_country_birth|immig_country_residence|count(immig_country_birth)|
+-------------------+-----------------------+--------------------------+
|         CHINA, PRC|             CHINA, PRC|                    261708|
|              JAPAN|                  JAPAN|                    207688|
|     UNITED KINGDOM|         UNITED KINGDOM|                    198300|
|             BRAZIL|                 BRAZIL|                    180344|
|            MEXICO |                MEXICO |                    149357|
|          AUSTRALIA|              AUSTRALIA|                     91034|
|              INDIA|                  INDIA|                     88506|
|             FRANCE|                 FRANCE|                     70010|
|         ARGENTINA |             ARGENTINA |                     65343|
|              ITALY|                  ITALY|                     54415|
|           COLOMBIA|               COLOMBIA|      

#### It would be interesting to divide the values by the population of each country because I expected China of course, nevertheless, UK, Japan and France are interesting to see!.

#### Second question
2- Temperatures of the countries of the 5 most visiting nationalities, and temperature of destination us states?

##### Since the engine kept failing when I use order by to the sql, I decided to make a workaround

#### It looks like people from Japan go to Hawaei to enjoy the Sun! :D, Brazil to Florida to enjoy some cold. California is popular too!

#### Third question
3- Most popular visa types?

In [198]:
spark.sql("""select visa_type, visa_category, count(*) from
immigration_fact
group by visa_type, visa_category
order by count(*) desc limit 10""").show()

+---------+-------------+--------+
|visa_type|visa_category|count(1)|
+---------+-------------+--------+
|  Leisure|           B2|  834384|
|  Leisure|           WT|  655625|
|    Study|           F1|  327439|
| Business|           WB|  226348|
| Business|           B1|  175953|
| Business|           E2|   22395|
|    Study|           F2|    8348|
|  Leisure|          GMT|    3967|
| Business|           E1|    3893|
| Business|            I|    2842|
+---------+-------------+--------+



#### So Leisure in general is the most popular then Study, makes sense. It would be super interesting to see the relation of the visas with the destination usa_state, and even the departure countries

In [199]:
spark.sql("""select visa_type, visa_category, count(*), arrival_usa_state from
immigration_fact
group by arrival_usa_state, visa_type, visa_category
order by count(*) desc limit 10""").show()

+---------+-------------+--------+-----------------+
|visa_type|visa_category|count(1)|arrival_usa_state|
+---------+-------------+--------+-----------------+
|  Leisure|           B2|  357198|               FL|
|  Leisure|           WT|  149478|               FL|
|  Leisure|           WT|  140001|               HI|
|  Leisure|           B2|  127089|               CA|
|  Leisure|           WT|  114902|               NY|
|  Leisure|           WT|  102351|               CA|
|  Leisure|           B2|   93811|               NY|
|    Study|           F1|   57644|               CA|
| Business|           WB|   51244|               CA|
|  Leisure|           B2|   44874|               TX|
+---------+-------------+--------+-----------------+



In [200]:
spark.sql("""select visa_type, visa_category, count(*), immig_country_residence from
immigration_fact
group by immig_country_residence, visa_type, visa_category
order by count(*) desc limit 10""").show()

+---------+-------------+--------+-----------------------+
|visa_type|visa_category|count(1)|immig_country_residence|
+---------+-------------+--------+-----------------------+
|  Leisure|           B2|  161227|                 BRAZIL|
|  Leisure|           WT|  159931|                  JAPAN|
|  Leisure|           WT|  127095|         UNITED KINGDOM|
|  Leisure|           B2|  116773|                MEXICO |
|  Leisure|           B2|  115571|             CHINA, PRC|
|    Study|           F1|  111072|             CHINA, PRC|
|  Leisure|           WT|   77822|              AUSTRALIA|
| Business|           WB|   62819|         UNITED KINGDOM|
|  Leisure|           B2|   61190|             ARGENTINA |
|    Study|           F1|   44561|                  INDIA|
+---------+-------------+--------+-----------------------+



#### The fourth and final question
4- Which US state are most visited?


In [208]:
spark.sql("""select us_d.state_alpha_code, us_d.state, count(*) 
from immigration_fact
inner join usa_states_dim us_d on
arrival_usa_state = us_d.state_alpha_code
group by us_d.state_alpha_code, us_d.state
order by count(*) desc""").show()

+----------------+--------------+--------+
|state_alpha_code|         state|count(1)|
+----------------+--------------+--------+
|              FL|       Florida|  576767|
|              CA|    California|  382082|
|              NY|      New York|  301714|
|              HI|        Hawaii|  159077|
|              TX|         Texas|  111657|
|              NV|        Nevada|   94348|
|              MA| Massachusetts|   57573|
|              IL|      Illinois|   46218|
|              NJ|    New Jersey|   37774|
|              GA|       Georgia|   36127|
|              CO|      Colorado|   33795|
|              WA|    Washington|   32391|
|              PA|  Pennsylvania|   28907|
|              MI|      Michigan|   26600|
|              AZ|       Arizona|   22204|
|              VA|      Virginia|   22007|
|              OH|          Ohio|   20667|
|              NE|      Nebraska|   17588|
|              NC|North Carolina|   17439|
|              MD|      Maryland|   15756|
+----------

##### Go Florida, Go California! :D

#### 4.3 Data dictionary 
The data dictonary is detailed in section 3.1, nevertheless, I will include a file data_dictionary.txt explaining it.

#### Step 5: Complete Project Write Up

##### User persona: Such data model and analytics can be used by peopluale that are interested in knowing what features makes travellers actually travel, such as Airline companies, or transportation companies in general!. Knowing what countries posses an immigrating power or even generating high number of tourists is something that can be used by the foreign affair that can build a special relation with such countries and maybe even push for tourist discounts/ads or scholarship for students. Also, cities/states that attract a specific type of visa can give many insights to the diversity nature of the city, its attraction to investments and its touristic nature and in what season!. I think that users that can benefit the most of this are travel agencies, but there are also a broad variety of institutions that can benefit from these types of analytics.

##### Choice of Tools: Through working with the project, working with all the dataset was seemless easy (fast) except for the immigration dataset, because the data is huge for each month!. The only logical choise would be Spark as the data will not fit on RAM. I used Pandas for quick work with small datasets while used spark with big ones.

##### Data storage and update: This data should be stored on something like s3, and it should be updated in batches as there is no need to stream the data (not time sensitive - unless there is a crisis on hand).

##### - Multiusage and increase in data: The same setup purposal can be applied, where data is saved and updated on S3, we would use spark as a processing platform after we construct the staging tables as we did in previous projects. 

##### -  For automating the process Aiflow is a good choice and can trigger the 7-AM actions by also breaking up the actions and this notebook into smaller chunks/tasks. We could also use simpler approaches like AWS cloudwatch crone job triggers that can trigger smaller functions based on AWS lambda.

##### - Finally, if the database needs to be accessed by alot of people then our database can be hosted on a redshift cluster and there are options for scalability flexibility so it would be perfect in case we can expect an increase in usage.