# Project Title
### Data Engineering Capstone Project

#### Project Summary
In the project we will create a ETL pipeline to  Data Lake using US I94 Immigration data. We will extract, process, clean and store data that will later on be used to analyse tourist and immigration flow to US through different airports. We could set some expectations on the inflow of the tourist, as well the necessary exchange currencies for tourists. We could extract seasonalities and prepare accordingly.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import isnan, when, count, col, upper, split, year, month, avg, isnull
from mappings import country_codes_mapping, i94visa_mapping

In [2]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
We will create a fact table and dimension tables using five sources described below. We will use Apache Sparke to explore, load, transform and save data to parquet files. We will create a fact table that would allow analyzing the arrivals and seasonality of arrivals in the U.S. cities. This fact table could be used by some turist officers, or companies offering services to tourists iin the peak of the season.

#### Describe and Gather Data

The following datasets are included in the project workspace. We purposely did not include a lot of detail about the data and instead point you to the sources. This is to help you get experience doing a self-guided project and researching the data yourself. If something about the data is unclear, make an assumption, document it, and move on. Feel free to enrich your project by gathering and including additional data sources.

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- World Temperature Data: This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/selfishgene/historical-hourly-weather-data).
- U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
- Airport Code Table: This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).
- Countries Table: This is a simple table of country codes and additional information about countries. It comes from [here](https://datahub.io/JohnSnowLabs/iso-3166-country-codes-itu-dialing-codes-iso-4217-currency-codes/r/iso-3166-country-codes-itu-dialing-codes-iso-4217-currency-codes-csv.csv).

In [None]:
#!wget https://datahub.io/JohnSnowLabs/iso-3166-country-codes-itu-dialing-codes-iso-4217-currency-codes/r/iso-3166-country-codes-itu-dialing-codes-iso-4217-currency-codes-csv.csv


In [3]:
demographics_data = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("us-cities-demographics.csv")
airport_data = spark.read.format("csv").option("delimiter", ",").option("header", "true").load("airport-codes_csv.csv")
country_data = spark.read.format("csv").option("delimiter", ",").option("header", "true").load("iso-3166-country-codes-itu-dialing-codes-iso-4217-currency-codes-csv.csv")
city_data = spark.read.format("csv").option("delimiter", ";").option("header", "true").load("city_codes.csv")
temperature_data = spark.read.format("csv").option("delimiter", ",").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [4]:
demographics_data.head()

Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924')

In [5]:
airport_data.head()

Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft='11', continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125')

In [6]:
country_data.head()

Row(Country_Name=None, Official_Name_English='Channel Islands', ISO3166_1_Alpha_2=None, ISO3166_1_Alpha_3=None, M49='830', ITU=None, MARC=None, WMO=None, DS=None, Dial=None, FIFA=None, FIPS=None, GAUL=None, IOC=None, ISO4217_Currency_Alphabetic_Code=None, ISO4217_Currency_Country_Name=None, ISO4217_Currency_Minor_Unit=None, ISO4217_Currency_Name=None, ISO4217_Currency_Numeric_Code=None, Is_Independent=None, Capital=None, Continent=None, TLD=None, Languages=None, Geo_Name_ID=None, EDGAR=None)

In [7]:
city_data.head()

Row(city_code='ALC', city_name='ALCAN')

In [8]:
temperature_data.head()

Row(dt='1743-11-01', AverageTemperature='6.068', AverageTemperatureUncertainty='1.7369999999999999', City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E')

In [9]:
IMMIGRATION_DATA_FILENAMES = [
    'i94_jan16_sub.sas7bdat',
    'i94_feb16_sub.sas7bdat',
    'i94_mar16_sub.sas7bdat',
    'i94_apr16_sub.sas7bdat',
    'i94_may16_sub.sas7bdat',
    'i94_jun16_sub.sas7bdat',
    'i94_jul16_sub.sas7bdat',
    'i94_aug16_sub.sas7bdat',
    'i94_sep16_sub.sas7bdat',
    'i94_oct16_sub.sas7bdat',
    'i94_nov16_sub.sas7bdat',
    'i94_dec16_sub.sas7bdat'
]

In [10]:
for i, file_name in enumerate(IMMIGRATION_DATA_FILENAMES):
    if i==0:
        immigration_data = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/' + file_name)
        if file_name=='i94_jun16_sub.sas7bdat':
            immigration_data = immigration_data.drop('validres','delete_days','delete_mexl','delete_dup','delete_recdup','delete_visa')
    else:
        immigration_data_subset = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/' + file_name)
        if file_name=='i94_jun16_sub.sas7bdat':
            immigration_data_subset = immigration_data_subset.drop('validres','delete_days','delete_mexl','delete_dup','delete_recdup','delete_visa')
        immigration_data.union(immigration_data_subset)

In [11]:
immigration_data.head()

Row(cicid=7.0, i94yr=2016.0, i94mon=1.0, i94cit=101.0, i94res=101.0, i94port='BOS', arrdate=20465.0, i94mode=1.0, i94addr='MA', depdate=None, i94bir=20.0, i94visa=3.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu=None, matflag=None, biryear=1996.0, dtaddto='D/S', gender='M', insnum=None, airline='LH', admnum=346608285.0, fltno='424', visatype='F1')

In [12]:
immigration_data.write.parquet("sas_data")
immigration_data = spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
In this step we will try to identify data quality issues, like missing values, duplicate data, etc. For that purpouse we will check for NaN and null values in our dataframes, and compare duplicate and deduplicated counts. In the future we could create a function that would check out the check we did in all the sample datasets, but for the sake of show and tell we will keep this in iPython cells. We will explore our datasets seperately.

##### Airport dataset exploration

In [13]:
airport_data.show(1, vertical=True)

-RECORD 0----------------------------
 ident        | 00A                  
 type         | heliport             
 name         | Total Rf Heliport    
 elevation_ft | 11                   
 continent    | NA                   
 iso_country  | US                   
 iso_region   | US-PA                
 municipality | Bensalem             
 gps_code     | 00A                  
 iata_code    | null                 
 local_code   | 00A                  
 coordinates  | -74.9336013793945... 
only showing top 1 row



In [14]:
airport_data.groupBy("type").count().orderBy("count", ascending=False).show()
airport_data.groupBy("iso_country").count().orderBy("count", ascending=False).show()
airport_data.groupBy("iso_region").count().orderBy("count", ascending=False).show()
airport_data.groupBy("continent").count().orderBy("count", ascending=False).show()
airport_data.groupBy("iata_code").count().orderBy("count", ascending=False).show()

+--------------+-----+
|          type|count|
+--------------+-----+
| small_airport|33965|
|      heliport|11287|
|medium_airport| 4550|
|        closed| 3606|
| seaplane_base| 1016|
| large_airport|  627|
|   balloonport|   24|
+--------------+-----+

+-----------+-----+
|iso_country|count|
+-----------+-----+
|         US|22757|
|         BR| 4334|
|         CA| 2784|
|         AU| 1963|
|         KR| 1376|
|         MX| 1181|
|         RU| 1040|
|         DE|  947|
|         GB|  911|
|         FR|  850|
|         AR|  848|
|         CO|  706|
|         IT|  671|
|         PG|  593|
|         VE|  592|
|         ZA|  489|
|         CL|  474|
|         ID|  470|
|         ES|  416|
|         CN|  404|
+-----------+-----+
only showing top 20 rows

+----------+-----+
|iso_region|count|
+----------+-----+
|     US-TX| 2277|
|     US-CA| 1088|
|     US-FL|  967|
|     US-PA|  918|
|     BR-SP|  907|
|     US-IL|  902|
|     US-AK|  829|
|     US-OH|  799|
|    GB-ENG|  726|
|     US-IN|

In [15]:
airport_data.select([count(when(isnan(column), column)).alias(column) for column in airport_data.columns]).show()
airport_data.select([count(when(isnan(column) | col(column).isNull(), column)).alias(column) for column in airport_data.columns]).show()

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|           0|        0|          0|         0|           0|       0|        0|         0|          0|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+

+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|ident|type|name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates|
+-----+----+----+------------+---------+-----------+----------+------------+--------+---------+----------+-----------+
|    0|   0|   0|        7006|        0|       

In [16]:
deduplicated_count = airport_data.groupBy(airport_data.columns).agg((count("*")>1).cast("int")).count()
dataframe_count =  airport_data.count()
if deduplicated_count == dataframe_count:
    print("Data doesn't contain duplicates")
else:
    print("Data contains duplicates")

Data doesn't contain duplicates


##### Country dataset exploration

In [17]:
country_data.show(1, vertical=True)

-RECORD 0-------------------------------------------
 Country_Name                     | null            
 Official_Name_English            | Channel Islands 
 ISO3166_1_Alpha_2                | null            
 ISO3166_1_Alpha_3                | null            
 M49                              | 830             
 ITU                              | null            
 MARC                             | null            
 WMO                              | null            
 DS                               | null            
 Dial                             | null            
 FIFA                             | null            
 FIPS                             | null            
 GAUL                             | null            
 IOC                              | null            
 ISO4217_Currency_Alphabetic_Code | null            
 ISO4217_Currency_Country_Name    | null            
 ISO4217_Currency_Minor_Unit      | null            
 ISO4217_Currency_Name            | null      

In [18]:
country_data.groupBy("Country_Name").count().orderBy("count", ascending=False).show()
country_data.groupBy("Official_Name_English").count().orderBy("count", ascending=False).show()
country_data.groupBy("Dial").count().orderBy("count", ascending=False).show()
country_data.groupBy("M49").count().orderBy("count", ascending=False).show()

+--------------------+-----+
|        Country_Name|count|
+--------------------+-----+
|                null|    2|
|               Tonga|    1|
|                Chad|    1|
|            Anguilla|    1|
|            Paraguay|    1|
|              Russia|    1|
|British Indian Oc...|    1|
| U.S. Virgin Islands|    1|
|               Yemen|    1|
|Heard & McDonald ...|    1|
|             Senegal|    1|
|              Sweden|    1|
|             Tokelau|    1|
|French Southern T...|    1|
|            Kiribati|    1|
|              Guyana|    1|
|             Eritrea|    1|
|              Jersey|    1|
|         Philippines|    1|
|            Djibouti|    1|
+--------------------+-----+
only showing top 20 rows

+---------------------+-----+
|Official_Name_English|count|
+---------------------+-----+
|                 null|   10|
|                Tonga|    1|
|                 Chad|    1|
| Micronesia (Feder...|    1|
|             Anguilla|    1|
|             Paraguay|    1|
| The fo

In [19]:
country_data.select([count(when(isnan(column), column)).alias(column) for column in country_data.columns]).show()
country_data.select([count(when(isnan(column) | col(column).isNull(), column)).alias(column) for column in country_data.columns]).show()

+------------+---------------------+-----------------+-----------------+---+---+----+---+---+----+----+----+----+---+--------------------------------+-----------------------------+---------------------------+---------------------+-----------------------------+--------------+-------+---------+---+---------+-----------+-----+
|Country_Name|Official_Name_English|ISO3166_1_Alpha_2|ISO3166_1_Alpha_3|M49|ITU|MARC|WMO| DS|Dial|FIFA|FIPS|GAUL|IOC|ISO4217_Currency_Alphabetic_Code|ISO4217_Currency_Country_Name|ISO4217_Currency_Minor_Unit|ISO4217_Currency_Name|ISO4217_Currency_Numeric_Code|Is_Independent|Capital|Continent|TLD|Languages|Geo_Name_ID|EDGAR|
+------------+---------------------+-----------------+-----------------+---+---+----+---+---+----+----+----+----+---+--------------------------------+-----------------------------+---------------------------+---------------------+-----------------------------+--------------+-------+---------+---+---------+-----------+-----+
|           0|        

In [20]:
deduplicated_count = country_data.groupBy(country_data.columns).agg((count("*")>1).cast("int")).count()
dataframe_count =  country_data.count()
if deduplicated_count == dataframe_count:
    print("Data doesn't contain duplicates")
else:
    print("Data contains duplicates")

Data doesn't contain duplicates


##### Demographic dataset exploration

In [21]:
demographics_data.show(1, vertical=True)

-RECORD 0------------------------------------
 City                   | Silver Spring      
 State                  | Maryland           
 Median Age             | 33.8               
 Male Population        | 40601              
 Female Population      | 41862              
 Total Population       | 82463              
 Number of Veterans     | 1562               
 Foreign-born           | 30908              
 Average Household Size | 2.6                
 State Code             | MD                 
 Race                   | Hispanic or Latino 
 Count                  | 25924              
only showing top 1 row



In [22]:
demographics_data.groupBy("City").count().orderBy("count", ascending=False).show()
demographics_data.groupBy("State").count().orderBy("count", ascending=False).show()
demographics_data.groupBy("Male Population").count().orderBy("count", ascending=False).show()
demographics_data.groupBy("Race").count().orderBy("count", ascending=False).show()

+------------+-----+
|        City|count|
+------------+-----+
| Springfield|   15|
| Bloomington|   15|
|    Columbia|   15|
|     Norwalk|   10|
|Jacksonville|   10|
|      Peoria|   10|
|      Albany|   10|
|    Pasadena|   10|
|     Jackson|   10|
| Kansas City|   10|
|Fayetteville|   10|
|    Columbus|   10|
| Westminster|   10|
|  Wilmington|   10|
|    Lakewood|   10|
|   Arlington|   10|
|   Rochester|   10|
|       Allen|   10|
|    Portland|   10|
|      Aurora|   10|
+------------+-----+
only showing top 20 rows

+--------------+-----+
|         State|count|
+--------------+-----+
|    California|  676|
|         Texas|  273|
|       Florida|  222|
|      Illinois|   91|
|    Washington|   85|
|       Arizona|   80|
|      Colorado|   80|
|      Michigan|   79|
|North Carolina|   70|
|      Virginia|   70|
| Massachusetts|   69|
|    New Jersey|   57|
|       Georgia|   55|
|     Minnesota|   54|
|      New York|   54|
|       Indiana|   51|
|      Maryland|   50|
|         

In [23]:
demographics_data.select([count(when(isnan(column), column)).alias(column) for column in demographics_data.columns]).show()
demographics_data.select([count(when(isnan(column) | col(column).isNull(), column)).alias(column) for column in demographics_data.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              0|                0|               0|                 0|           0|                     0|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Vet

In [24]:
deduplicated_count = demographics_data.groupBy(demographics_data.columns).agg((count("*")>1).cast("int")).count()
dataframe_count =  demographics_data.count()
if deduplicated_count == dataframe_count:
    print("Data doesn't contain duplicates")
else:
    print("Data contains duplicates")

Data doesn't contain duplicates


##### Temperature dataset exploration

In [25]:
temperature_data.show(1, vertical=True)

-RECORD 0-------------------------------------------
 dt                            | 1743-11-01         
 AverageTemperature            | 6.068              
 AverageTemperatureUncertainty | 1.7369999999999999 
 City                          | Århus              
 Country                       | Denmark            
 Latitude                      | 57.05N             
 Longitude                     | 10.33E             
only showing top 1 row



In [27]:
temperature_data.groupBy("City").count().orderBy("count", ascending=False).show()
temperature_data.groupBy("Country").count().orderBy("count", ascending=False).show()
temperature_data.groupBy("AverageTemperature").count().orderBy("count", ascending=False).show()
temperature_data.groupBy("AverageTemperatureUncertainty").count().orderBy("count", ascending=False).show()

+----------------+-----+
|            City|count|
+----------------+-----+
|     Springfield| 9545|
|       Worcester| 8359|
|            León| 7469|
|       Rongcheng| 6526|
|           Brest| 6478|
|        Columbus| 6478|
|       Cambridge| 6478|
|Saint Petersburg| 6478|
|          London| 6478|
|      Birmingham| 6478|
|        Syracuse| 6478|
|      Manchester| 6478|
|        Santiago| 6203|
|          Aurora| 6101|
|      Alexandria| 5908|
|       Arlington| 5564|
|       Barcelona| 5516|
|        Kingston| 5516|
|        Valencia| 5516|
|       Cartagena| 5516|
+----------------+-----+
only showing top 20 rows

+--------------+-------+
|       Country|  count|
+--------------+-------+
|         India|1014906|
|         China| 827802|
| United States| 687289|
|        Brazil| 475580|
|        Russia| 461234|
|         Japan| 358669|
|     Indonesia| 323255|
|       Germany| 262359|
|United Kingdom| 220252|
|        Mexico| 209560|
|       Nigeria| 172347|
|         Spain| 159594|

In [28]:
temperature_data.select([count(when(isnan(column), column)).alias(column) for column in temperature_data.columns]).show()
temperature_data.select([count(when(isnan(column) | col(column).isNull(), column)).alias(column) for column in temperature_data.columns]).show()

+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|                 0|                            0|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+

+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|            364130|                       364130|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+



In [29]:
deduplicated_count = temperature_data.groupBy(temperature_data.columns).agg((count("*")>1).cast("int")).count()
dataframe_count =  temperature_data.count()
if deduplicated_count == dataframe_count:
    print("Data doesn't contain duplicates")
else:
    print("Data contains duplicates")

Data doesn't contain duplicates


##### Immigration dataset exploration

In [30]:
immigration_data.show(1, vertical=True)

-RECORD 0----------------
 cicid    | 7.0          
 i94yr    | 2016.0       
 i94mon   | 1.0          
 i94cit   | 101.0        
 i94res   | 101.0        
 i94port  | BOS          
 arrdate  | 20465.0      
 i94mode  | 1.0          
 i94addr  | MA           
 depdate  | null         
 i94bir   | 20.0         
 i94visa  | 3.0          
 count    | 1.0          
 dtadfile | null         
 visapost | null         
 occup    | null         
 entdepa  | T            
 entdepd  | null         
 entdepu  | null         
 matflag  | null         
 biryear  | 1996.0       
 dtaddto  | D/S          
 gender   | M            
 insnum   | null         
 airline  | LH           
 admnum   | 3.46608285E8 
 fltno    | 424          
 visatype | F1           
only showing top 1 row



In [31]:
immigration_data.groupBy("i94port").count().orderBy("count", ascending=False).show()
immigration_data.groupBy("visapost").count().orderBy("count", ascending=False).show()
immigration_data.groupBy("gender").count().orderBy("count", ascending=False).show()
immigration_data.groupBy("visatype").count().orderBy("count", ascending=False).show()

+-------+------+
|i94port| count|
+-------+------+
|    MIA|420690|
|    NYC|382771|
|    LOS|339125|
|    HHW|180162|
|    SFR|165661|
|    ORL|130865|
|    CHI|120888|
|    AGA|118170|
|    NEW|107767|
|    HOU|106536|
|    ATL| 89865|
|    DAL| 78220|
|    FTL| 73761|
|    WAS| 64601|
|    BOS| 54270|
|    LVG| 49865|
|    SEA| 47086|
|    SAI| 42317|
|    DET| 40404|
|    PHI| 21478|
+-------+------+
only showing top 20 rows

+--------+-------+
|visapost|  count|
+--------+-------+
|    null|1386375|
|     GUZ|  99955|
|     SPL|  98234|
|     MEX|  87723|
|     BEJ|  68257|
|     BNS|  66180|
|     SHG|  58280|
|     RDJ|  51798|
|     BGT|  49826|
|     CRS|  34854|
|     SEO|  33793|
|     SNJ|  29331|
|     BMB|  23389|
|     MDR|  22828|
|     GDL|  21751|
|     LMA|  19742|
|     TLV|  18745|
|     LND|  18539|
|     BRA|  17625|
|     SDO|  17177|
+--------+-------+
only showing top 20 rows

+------+-------+
|gender|  count|
+------+-------+
|     M|1415249|
|     F|1214106|

In [32]:
immigration_data.select([count(when(isnan(column), column)).alias(column) for column in immigration_data.columns]).show()
immigration_data.select([count(when(isnan(column) | col(column).isNull(), column)).alias(column) for column in immigration_data.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|      0|      0|      0|     0|      0|    0|       0|       0|    0|      0|      0|      0|      0|      0|      0|     0|     0|      0|     0|    0|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-

In [33]:
deduplicated_count = immigration_data.groupBy(immigration_data.columns).agg((count("*")>1).cast("int")).count()
dataframe_count =  immigration_data.count()
if deduplicated_count == dataframe_count:
    print("Data doesn't contain duplicates")
else:
    print("Data contains duplicates")

Data doesn't contain duplicates


#### Explore the Data conclusions
We are working with five datasets: Demographic, Airport , Temperature, Country and Immigration data. None of the datasets contain duplicated data, so the datasets deduplication will not be needed. However, all datasets contain columns that contain null values, so we will need to be careful how we filter and map the data from that columns.

#### Cleaning Steps
We now proceed to cleaning the data. We will seperate the process per dataset for the sake of show and tell here.

##### Airport dataset cleaning

Since we are having a demographic dataset that contains data for the USA, we will filter the U.S. locations from the airport data. Also, we will transform city name to upper case to have a uniform case in the city name column. Furthermore, airport data contains the iso_region column that could be parsed to extract state code, and we will drop duplicates. We will filter out closed airports and ballon ports since they are probbably not used for the immigration. Last, but not least we will filter only columns for our dimensions table and give them more meaningful names.

In [34]:
airport_data = airport_data.filter(airport_data["iso_country"]=="US").withColumn("municipality", upper(col("municipality")))

airport_data = airport_data.withColumn("state_code", split(airport_data["iso_region"], '-').getItem(1))

airport_data = airport_data.filter(~airport_data["type"].isin({"closed", "balloonport"})).drop_duplicates()

airport_data = airport_data.select(\
                        col("ident").alias("airport_id"),\
                        upper(col("municipality")).alias("city_name"),\
                        "state_code",\
                        col("name").alias("airport_name"),\
                        col("type").alias("airport_type")).drop_duplicates()
airport_data.show(1, vertical=True)

-RECORD 0----------------------------
 airport_id   | K7M4                 
 city_name    | OSCEOLA              
 state_code   | AR                   
 airport_name | Osceola Municipal... 
 airport_type | small_airport        
only showing top 1 row



##### Country dataset cleaning

For country dataset, we will transform the country name into the uppercase for consistency, and extract and rename interesting columns. Also, we had noticed that some of the currencies were missing so we will add them manually.

In [35]:
country_data = country_data.withColumn("Country_Name", upper(col("Country_Name")))


country_data = country_data.select(\
                        col("Country_Name").alias("country_name"),\
                        col("ISO4217_Currency_Name").alias("currency_name"),\
                        col("Capital").alias("capital"),\
                        col("Continent").alias("continent")).drop_duplicates()

country_data = country_data.withColumn("currency_name",when(col("country_name") == "CZECH REPUBLIC","Czech crown")\
                        .otherwise(col("currency_name")))

country_data = country_data.withColumn("currency_name",when(col("country_name") == "HONG KONG","Hong Kong dollar")\
                        .otherwise(col("currency_name")))


country_data = country_data.withColumn("currency_name",when(col("country_name") == "TAIWAN","Taiwan New dollar")\
                        .otherwise(col("currency_name")))

country_data.show(1, vertical=True)

-RECORD 0-----------------------------
 country_name  | HEARD & MCDONALD ... 
 currency_name | null                 
 capital       | null                 
 continent     | AN                   
only showing top 1 row



##### Demographic dataset cleaning

As in the Airport data we will transform the city name to upper case for the uniformity sake. We will give the columns more meaningfull names, cast float and integer types in columns where it makes sence, and drop eventual duplicates from the tables.

In [36]:
demographics_data = demographics_data.withColumn("median_age",col("Median Age").cast("float"))\
                        .withColumn("City", upper(col("City")))\
                        .withColumn("Total Population",col("Total Population").cast("integer"))\
                        .withColumn("Male Population",col("Male Population").cast("integer"))\
                        .withColumn("Female Population",col("Female Population").cast("integer"))\
                        .withColumn("Foreign-born",col("Foreign-born").cast("integer"))

demographics_data = demographics_data.select(\
    col("City").alias("city_name"),\
    col("State Code").alias("state_code"),\
    col("State").alias("state_name"),\
    col("Total Population").alias("total_population"),\
    col("Male Population").alias("male_population"),\
    col("Female Population").alias("female_population"),\
    col("Foreign-born").alias("immigrants_population"),\
    "median_age").drop_duplicates()

demographics_data.show(1, vertical=True)

-RECORD 0------------------------
 city_name             | ROSWELL 
 state_code            | GA      
 state_name            | Georgia 
 total_population      | 94496   
 male_population       | 48637   
 female_population     | 45859   
 immigrants_population | 16501   
 median_age            | 38.8    
only showing top 1 row



##### Temperature dataset cleaning

In [37]:
temperature_data = temperature_data.withColumn("year",year(temperature_data["dt"]))\
                        .withColumn("month",month(temperature_data["dt"]))

temperature_data = temperature_data.filter(temperature_data["country"]=="United States")\
                        .withColumn('City', upper(col('City')))

temperature_data=temperature_data.groupBy('City','month').agg({'AverageTemperature':'avg',
                                                 'AverageTemperatureUncertainty':'avg'})

temperature_data = temperature_data.select(\
                        col("City").alias("city"),\
                        "month",\
                        col("avg(AverageTemperature)").alias("average_temperature"),\
                        col("avg(AverageTemperatureUncertainty)").alias("average_temperature_uncertainty")).drop_duplicates()

temperature_data.show(1, vertical=True)

-RECORD 0---------------------------------------------
 city                            | RALEIGH            
 month                           | 12                 
 average_temperature             | 4.4194827586206875 
 average_temperature_uncertainty | 1.5283026819923373 
only showing top 1 row



##### Immigration dataset cleaning

In the Immigration dataset we will filter interesting columns and give them a more descriptabale name. Namely, we will consider fields that contain information about year, month, origin country, destination city, destination state, visa type, age, and gender fields We will filter only arrivals by air because we want to compare them with our airport dataset. If the origin country code or destination code is null we will consider that as invaluable data. Lastly, we will aggregate table by our subset of observed fields.

In [38]:
immigration_data = immigration_data.filter(immigration_data["i94mode"]==1)\
                    .filter(immigration_data.i94res.isNotNull())\
                    .filter(immigration_data.i94port.isNotNull())\
                    .filter(immigration_data.i94visa.isNotNull())\
                    .withColumn("I94MON",col("I94MON").cast("integer"))\
                    .withColumn("I94RES",col("I94RES").cast("integer"))\
                    .withColumn("I94YR",col("I94YR").cast("integer"))\
                    .withColumn("I94BIR",col("I94BIR").cast("integer"))\
                    .withColumn("I94VISA",col("I94VISA").cast("integer"))\
                    .withColumn("count",col("count").cast("integer"))

immigration_data = immigration_data.select(
    "cicid",
    col("I94YR").alias("year"),
    col("I94MON").alias("month"),
    col("I94RES").alias("origin_country_code"),
    col("I94PORT").alias("destination_city_code"),
    col("I94ADDR").alias("destination_state"),
    col("I94BIR").alias("age"),
    col("GENDER").alias("gender"),
    col("visatype").alias("visa_type"),
    col("I94VISA").alias("i94visa"),
    "count").drop_duplicates()

immigration_data = immigration_data.join(city_data, immigration_data.destination_city_code == city_data.city_code)

immigration_data = immigration_data.withColumn('visa_purpose', i94visa_mapping(immigration_data["i94visa"]))

immigration_data = immigration_data.withColumn('origin_country', country_codes_mapping(immigration_data["origin_country_code"]))

immigration_data.show(1, vertical=True)

-RECORD 0-------------------------
 cicid                 | 149.0    
 year                  | 2016     
 month                 | 1        
 origin_country_code   | 103      
 destination_city_code | TUC      
 destination_state     | NY       
 age                   | 35       
 gender                | F        
 visa_type             | WT       
 i94visa               | 2        
 count                 | 1        
 city_code             | TUC      
 city_name             | TUCSON   
 visa_purpose          | Pleasure 
 origin_country        | AUSTRIA  
only showing top 1 row



In [39]:
immigration_data=immigration_data.groupBy(
    "year",\
    "month",\
    "origin_country_code",\
    "origin_country",\
    "destination_city_code",\
    col("city_name").alias("destination_city_name"),\
    "destination_state",\
    "age",\
    "gender",\
    "visa_type",\
    "visa_purpose"\
).agg({"count":"sum"})\
.withColumnRenamed("sum(count)","count")

immigration_data.show(1, vertical=True)

-RECORD 0-----------------------------
 year                  | 2016         
 month                 | 1            
 origin_country_code   | 261          
 origin_country        | SAUDI ARABIA 
 destination_city_code | DAL          
 destination_city_name | DALLAS       
 destination_state     | TX           
 age                   | 29           
 gender                | F            
 visa_type             | F1           
 visa_purpose          | Student      
 count                 | 7            
only showing top 1 row



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We will create a star schema with following dimension and fact tables.

##### Dimension Tables

Airport dimension ---> contains following information about airports:
  * airport_id
  * city_name
  * state_code
  * airport_name
  * airport_type
    
Country dimension ---> contains following information about countries:
  * country_name
  * currency_name
  * capital
  * continent

Demographics dimension ---> contains following information about demographics of U.S. cities:
  * city_name
  * state_code
  * state_name
  * total_population
  * male_population
  * female_population
  * immigrants_population
  * median_age

Temperature dimension ---> contains following information about temperatures in U.S. cities:
  * city
  * month
  * average_temperature
  * average_temperature_uncertainty

Immigration dimension ---> contains following information about immigration in U.S. cities:
  * year
  * month
  * origin_country_code
  * destination_city_code
  * destination_state
  * age
  * gender
  * visa_type
  * i94visa
  * destination_city
  * origin_country
  * visa_purpose
  * count

Fact Table will contain following data:
  * year
  * month
  * destination_city_name
  * destination_state
  * number_of_airports
  * total_population
  * immigrants_population
  * origin_country
  * origin_continent
  * origin_currency
  * average_temperature
  * visa_purpose
  * count

#### 3.2 Mapping Out Data Pipelines
We will generate the dimension tables from the transformed dataframes. Using Spark SQL we will join the dimension tables to create a fact table and write a fact table to parquet files. 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [40]:
airport_data.createOrReplaceTempView("airport_data")
country_data.createOrReplaceTempView("country_data")
demographics_data.createOrReplaceTempView("demographics_data")
temperature_data.createOrReplaceTempView("temperature_data")
immigration_data.createOrReplaceTempView("immigration_data")

In [41]:
fact_table = spark.sql("""
SELECT 
    i.year, 
    i.month, 
    i.destination_city_name, 
    d.state_name as destination_state,
    a.number_of_airports,
    d.total_population,
    d.immigrants_population,
    i.origin_country,
    c.continent as origin_continent,
    c.currency_name as origin_currency,
    t.average_temperature,
    i.visa_purpose,
    SUM(i.count) as count
FROM immigration_data i
JOIN country_data c ON i.origin_country = c.country_name
JOIN demographics_data d ON i.destination_city_name = d.city_name
JOIN temperature_data t ON i.destination_city_name = t.city AND i.month=t.month
JOIN (
    SELECT 
        state_code, 
        city_name, 
        COUNT(airport_id) as number_of_airports 
    FROM airport_data 
    GROUP BY 
    state_code, 
    city_name
    ) a ON i.destination_city_name = a.city_name AND i.destination_state = a.state_code
GROUP BY 
    i.year, 
    i.month, 
    i.destination_city_name, 
    d.state_name,
    a.number_of_airports,
    d.total_population,
    d.immigrants_population,
    i.origin_country,
    c.continent,
    c.currency_name,
    t.average_temperature,
    i.visa_purpose
ORDER BY 
    d.state_name, 
    i.destination_city_name
""")

fact_table.show(1, vertical=True)

-RECORD 0----------------------------------
 year                  | 2016              
 month                 | 1                 
 destination_city_name | MOBILE            
 destination_state     | Alabama           
 number_of_airports    | 6                 
 total_population      | 194305            
 immigrants_population | 7234              
 origin_country        | GREECE            
 origin_continent      | EU                
 origin_currency       | Euro              
 average_temperature   | 9.150419087136928 
 visa_purpose          | Business          
 count                 | 1                 
only showing top 1 row



In [42]:
# write fact table to parquet
print('Writing {} rows to parquet'.format(fact_table.count()))
fact_table.write.partitionBy('destination_state','destination_city_name').option('compression','snappy')\
.parquet("data_tables/fact_table",mode='overwrite')



Writing 9038 rows to parquet


#### 4.2 Data Quality Checks

Run Quality Checks

First we will check if any of the rows in the fact table contains null values.

In [43]:
fact_table.select([count(when(isnan(column), column)).alias(column) for column in fact_table.columns]).show(vertical=True)
fact_table.select([count(when(isnan(column) | col(column).isNull(), column)).alias(column) for column in fact_table.columns]).show(vertical=True)

-RECORD 0--------------------
 year                  | 0   
 month                 | 0   
 destination_city_name | 0   
 destination_state     | 0   
 number_of_airports    | 0   
 total_population      | 0   
 immigrants_population | 0   
 origin_country        | 0   
 origin_continent      | 0   
 origin_currency       | 0   
 average_temperature   | 0   
 visa_purpose          | 0   
 count                 | 0   

-RECORD 0--------------------
 year                  | 0   
 month                 | 0   
 destination_city_name | 0   
 destination_state     | 0   
 number_of_airports    | 0   
 total_population      | 0   
 immigrants_population | 0   
 origin_country        | 0   
 origin_continent      | 0   
 origin_currency       | 0   
 average_temperature   | 0   
 visa_purpose          | 0   
 count                 | 0   



Next we will check if the rows have been populated and what is the shape of the table.

In [44]:
print("Number of records: {r}\nColumns: {c}".format(
    r=fact_table.count(),c=len(fact_table.columns)
))

Number of records: 9038
Columns: 13


We will also check if we had filtered the data correctly so the aggregations on the fact table are lower than immigration data aggregations.

In [45]:
fact_table.agg({"count":"sum"}).show(1,vertical=True)
immigration_data.agg({"count":"sum"}).show(1,vertical=True)

-RECORD 0-------------
 sum(count) | 1050485 

-RECORD 0-------------
 sum(count) | 2761287 



Last, we will check if we had succesfully added currencies data to the dataset, as we had some null values there before.

In [None]:
fact_table.filter(fact_table["origin_currency"].isNull()).groupBy('origin_country').agg({'count':'sum'}).show(10)

Also, we were careful about data quality in the whole process of setting up this pipeline, as we had done some basic checks before, and always tried to show at least one row of our data at each step of the way.

#### 4.3 Data dictionary 

##### airport_data - data was extracted from airport data downloaded from Datahub
  * airport_id(string) - Unique airport ID code
  * city_name(string) - City airport is located in
  * state_code(string) - Abbriviation of state airport is located in
  * airport_name(string) - Airport name
  * airport_type(string) - Airport type

##### country_data - data was extracted from country data downloaded from Datahub
  * country_name(string) - Country name
  * currency_name(string) - Country currency
  * capital(string) - Country capital
  * continent(string) - Continent country is located in

##### demographics_data - data was extracted from demographics data downloaded from OpenSoft
  * city_name(string) - City name
  * state_code(string) - Abbriviation of state
  * state_name(string) - State name
  * total_population(integer) - Total population of the city
  * male_population(integer) - Male population of the city
  * female_population(integer) - Female population of the city
  * immigrants_population(integer) - Immigrants population of the city
  * median_age(float) - Median age of the population

##### temperature_data - data was extracted from temperature data downloaded from Kaggle
  * city(string) - City name
  * month(integer) - Month
  * average_temperature(double) - Average temperature in degrees Celsius
  * average_temperature_uncertainty(double) - Average temperature uncertanty in degrees Celsius


##### immigration_data - data was extracted from original I94 Immigrations data
  * year(integer) - Year
  * month(integer) - Month
  * origin_country_code(integer) - Country of origin code
  * origin_country(string) - Country of origin name calculated from origin_country_code using SAS Labels Descriptions
  * destination_city_code(string) - Destination city code
  * destination_city_name(string) - Destination city name calculated from destination_city_code using SAS Labels Descriptions
  * destination_state(string) - Abbriviation of destination state
  * age(integer) - Age of traveler listed on I94
  * gender(string) - Gender of traveler listed on I94 
  * visa_type(string) - Visa type
  * visa_purpose(string) - Visa purpose calculated from origin code using SAS Labels Descriptions
  * count(long) Number of arrivals


##### fact_table 
  * year(integer) - Year
  * month(integer) - Month
  * destination_city_name(string) - Destination city name
  * destination_state(string) - Destination state name
  * number_of_airports(long) - Number of airports in the city
  * total_population(integer) - Total population of the city
  * immigrants_population(integer) - Immigrants population of the city
  * origin_country(string) - Country of origin name
  * origin_continent(string) - Continent of origin
  * origin_currency(string) - Currency in country of origin
  * average_temperature(double) - Average temperature in degrees Celsius
  * visa_purpose(string) - Visa purpose (1-Business 2-Pleasure 3-Student)
  * count(long) - Number of arrivals

#### Step 5: Complete Project Write Up

We had used Python and Apache Spark for their speed and usabillity on this smaller dataset. These tools contained all necesary libraries we used to extract, read, clean, process and store tables. Due to limited dataset available, we had locally stored the fact table using the Spark SQL in parquet files partitioned by city and state. The data could be updated at monthly basis as that is level of aggregation we used here, and the level of aggregation the imigration data is provided.

If the data would increase by 100 times, we would store the input data in the cloud storage AWS S3. To process the data clustered Spark would be used as it would allow parallel processing of the data. We would consider using AWS Redshift to store the staging and the final tables, while the output data would be stored bask to AWS S3.

If the data populates a dashboard that must be updated on a daily basis by 7am every day, we would use Airflow to schedule and run the data pipeline. 

If the database needed to be accessed by 100+ people, we could replicate the data to different nodes used by different users. Moreover, we could store the data in the AWS and use the web app to access the data. If the usage would increase further more we could build a dashboard using some BI tool, e.g. Tableu.
