# Project Title
### Data Engineering Capstone Project

#### Project Summary
* The project aims at creting an ETL flow of exporting raw data from 3 different datasets, perform data wrangling and transform the data so that it's optimized to future applications including. Then is loaded into database in the form of structured tables. 
* The primary goal is to allow users to make queries to see there is a correlation between immigration statistics and weather conditions and popular modes of transport chosen.
* The data that is analyzed in this project consists of:
    * US I94 Immigration data which comes from US National Tourism and Trade Office
    * World Temperature Data from Kaggle


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import re
from pyspark.sql.functions import udf
import datetime as dt

In [2]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()
spark

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [3]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_sas = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [4]:
df_sas.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


### We will drop the rows containig invalid codes for I94Port as that the identifying code for port, it doesn't make sense to have an invalid code(XXX)

In [23]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### We'll drop all the rows having null Average Temperature.

In [22]:
df_airport = pd.read_csv('airport-codes_csv.csv')
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [8]:
df_city = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df_city.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [5]:
#write to parquet
df_spark.write.parquet("sas_data", 'overwrite')
df_immig=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [6]:
df_immig.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
df_immig[['i94mode']].distinct().show()

+-------+
|i94mode|
+-------+
|   null|
|    1.0|
|    3.0|
|    2.0|
|    9.0|
+-------+



In [33]:
# Checking the number of null values in all columns
df_immig.select([F.count(F.when(F.col(c).isNull(),c)).alias(c) for c in df_immig.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

In [8]:
# TOtal Number of rows in Immigration dataset
df_immig.count()

3096313

## Performing cleaning tasks here

#### 1 - Data Cleaning for Immigration - Keeping only the valid values for 'i94port' column from the 'I94_SAS_Lables_description' file. Only the valid values for this column are moved to 'Dictionary_port.txt' for easy filtering.

In [10]:
# Creating a dictionary from the Dictionary_data to hlp filter out invalid port codes
re_exp = re.compile(r'\'(.*)\'.*\'(.*)\'')
dict_port = {}
with open('Dictionary_port.txt') as file:
     for line in file:
            match = re_exp.search(line)
            dict_port[match[1]]=[match[2]]

In [20]:
dict_port['ALC']

['ALCAN, AK             ']

In [21]:
# Testing out the dictionary
'alcan' in dict_port['ALC'][0].lower()

True

In [15]:
df_immigration = df_immig.filter(df_immig.i94port.isin(list(dict_port.keys())))

In [16]:
df_immigration.count()
# Count reduced from 3096313 to 3088544

3088544

#### 2 - Cleaning Weather dataset - Removing all rows having null values in Average Temperature column.
#### Also, many rows contain the weather detials of the same city on different  days, but on just 1 day is enough for us.

In [4]:
df_weat = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [5]:
df_weat.count()

8599212

In [6]:
df_weat.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [27]:
df_weat_filter = df_weat.filter(df_weat.AverageTemperature != 'NaN').dropDuplicates(['City', 'Country'])
#df_weat = df_temp.dropDuplicates(['City', 'Country'])
df_weat_filter.count()

3490

In [None]:
#### Want to create a link between weather and immigration datasets by having the same city/port code

In [30]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [25]:
@udf()
def port_city(city):
    """Function: Takes in the city name from weather dataframe and return it's corresponding code presentin dict_port
    """
    for key in dict_port:
        if city.lower() in dict_port[key][0].lower():
            return key

In [34]:
# Add iport94 code corresponding to city name in the weather dataframe
df_wea_city = df_weat_filter.withColumn("i94port", port_city(df_weat_filter.City))

# Remove rows with null iport94 code
df_weatherr = df_wea_city.filter(df_wea_city.i94port != 'null')

In [37]:
df_weatherr.show(5)

+----------+------------------+-----------------------------+--------+-------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|    City|      Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+--------+-------------+--------+---------+-------+
|1852-07-01|            15.488|                        1.395|   Perth|    Australia|  31.35S|  114.97E|    PER|
|1828-01-01|            -1.977|                        2.551| Seattle|United States|  47.42N|  121.97W|    SEA|
|1743-11-01|             2.767|                        1.905|Hamilton|       Canada|  42.59N|   80.73W|    HAM|
|1849-01-01| 7.399999999999999|                        2.699| Ontario|United States|  34.56N|  116.76W|    ONT|
|1821-11-01|             2.322|                        2.375| Spokane|United States|  47.42N|  117.24W|    SPO|
+----------+------------------+-----------------------------+--------+-------------+--------+---------+-

In [None]:
# Just changed the name
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
df_weatherr = df_weatherr.withColumn("Arrival_Date_1", get_date(F.col("Arrival_Date")))
df_weatherr.show(20)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

### A Star Schema is used to model the data into fact and dimension tables.
### Fact Table:
<ul>i94port -> 3 character code of destination city</ul>
<ul>arrdate -> arrival date</ul>
<ul>i94visa -> reason for immigration</ul>
<ul>i94mode -> 1 digit travel code</ul>
<ul>AverageTemperature -> average temperature of destination city</ul>

### Dimension Table 1 - Immigration
<ul>i94port -> 3 character code of destination city</ul>
<ul>i94cit -> 3 digit code of origin city</ul>
<ul>i94mode -> 1 digit travel code</ul>
<ul>i94visa -> reason for immigration</ul>
<ul>i94bir -> age of the person</ul>
<ul>arrdate -> arrival date</ul>
<ul>i94mon -> numeric month</ul>
<ul>i94yr -> 4 digit year</ul>
<ul>depdate -> departure date</ul>

### Dimension Table 2 - Weather
<ul>i94port -> 3 character code of destination city</ul>
<ul>AverageTemperature -> average temperature</ul>
<ul>City -> city name</ul>
<ul>Country -> country name</ul>
<ul>Latitude -> latitude</ul>
<ul>Longitude -> longitude</ul>

#### 3.2 Mapping Out Data Pipelines
#### List the steps necessary to pipeline the data into the chosen data model.
<ol> 1 - Load the immigration and weather datasets into spark dataframes.</ol>
<ol> 2 - In the immigration dataframe, the column i94port is focused and is filtered for only valid entries taking the provided data dictionary 'I94_SAS_Label_Description'.</ol>
<ol> 3 - In the Weather dataframe, rows having null Average Temperature are removed and duplicate city rows are removed leaving us with average temperature of a city.</ol>
<ol> 4 - Link immigration and weather by adding 'i94port' in the weather dataframe corresponding to it's city.</ol>
    <ul>For example, 'i94port' value for a row in immigration dataframe is 'ALC'. It corresponds to city name 'ALCAN' in the data dictionary. If 'ALCAN' city is present in weather dataframe, newly added 'i94port' column will take value 'ALC'.</ul>
<ol> 5 - Load the data from Immigration and Weather dataframes to Fact and Dimension Tables.</ol>
<ol> 6 - These Fact and Dimension tables are loaded into Amazon S3 / saved as parquet files locally.</ol>
#### (Facing issue with saving files to S3, hence will be saving as parquet fils locally.)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
                    .enableHiveSupport().getOrCreate()
spark

In [3]:
# Defining a cleaning function for Immigration data as did in data wrangling earlier
def clean_immig(path):
    """ Created a dictionary of the valid values for i94port column, loads immigration data and filters out invalid values from immigration dataframe"""
                    
    df_imig =spark.read.format('com.github.saurfang.sas.spark').load(path)
    df_immigration_cleaned = df_imig.filter(df_imig.i94port.isin(list(dict_port.keys())))
    return df_immigration_cleaned

In [4]:
# Creating a dictionary of only valid Port Codes by using regular expression and parsing data from Data_Dictionary.txt
re_exp = re.compile(r'\'(.*)\'.*\'(.*)\'')
dict_port = {}
with open('Dictionary_port.txt') as file:
    for line in file:
        match = re_exp.search(line)
        dict_port[match[1]]=[match[2]]
# dict_port is a dictionary containing all valid port codes, this is used later.

In [5]:
# Reading the Immigration data and cleaning it
df_immigration = clean_immig('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [6]:
df_immigration.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| null|      G|   null|      Y|   null| 1991.0|     D/S|     M|  null|   null|  3.73679633E9|00296|      F1|
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    nul

In [7]:
# Declring a cleaning function for temperature datast
def clean_weather(path):
    """Load the weather dataset from csv file and removing null average temperature and duplicates by city and country
        because there are multiple average temperatures recorded for the same city on different days, for our implementation
        we'll take only 1 temperature reading for 1 city"""
    
    df_weat = spark.read.format("csv").option("header", "true").load(path)
    df_weat_filter = df_weat.filter(df_weat.AverageTemperature != 'NaN').dropDuplicates(['City', 'Country'])
    #df_weat_filter.count()
    return df_weat_filter

In [8]:
df_weat_filter = clean_weather("../../data2/GlobalLandTemperaturesByCity.csv")

In [9]:
df_weat_filter.show(2)

+----------+--------------------+-----------------------------+---------+-------------+--------+---------+
|        dt|  AverageTemperature|AverageTemperatureUncertainty|     City|      Country|Latitude|Longitude|
+----------+--------------------+-----------------------------+---------+-------------+--------+---------+
|1743-11-01|               3.264|                        1.665|Allentown|United States|  40.99N|   74.56W|
|1779-11-01|0.011999999999999985|                        2.714|   Atyrau|   Kazakhstan|  47.42N|   50.92E|
+----------+--------------------+-----------------------------+---------+-------------+--------+---------+
only showing top 2 rows



In [10]:
# Creating a function which maps the city from weather dataset with port code from immigration data
@udf()
def port_city(city):
    """Function: Takes in the city name from weather dataframe and return it's corresponding code presentin dict_port
    """
    for key in dict_port:
        if city.lower() in dict_port[key][0].lower():
            return key

In [11]:
# Adding port column to weather dataset which maps with the city name, this column is used to join immigration and weather datas
# Tried implementing in a function, but getting an error when trying to pass dictionary as a parameter to 'port_city' function
df_wea_city = df_weat_filter.withColumn("i94port", port_city(df_weat_filter.City))
df_weather = df_wea_city.filter(df_wea_city.i94port != 'null')

In [12]:
df_weather.show(2)

+----------+------------------+-----------------------------+-------+-------------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+-------+
|1852-07-01|            15.488|                        1.395|  Perth|    Australia|  31.35S|  114.97E|    PER|
|1828-01-01|            -1.977|                        2.551|Seattle|United States|  47.42N|  121.97W|    SEA|
+----------+------------------+-----------------------------+-------+-------------+--------+---------+-------+
only showing top 2 rows



In [13]:
### Creating fact table from df_weatherr and df_immigration
imig_weather_fact = df_immigration.join(df_weather, df_weather.i94port == df_immigration.i94port)\
                            .select(df_immigration.i94port,
                                   F.col('arrdate').alias('Arrival_Date'),
                                   F.col('i94visa').alias('Visa'),
                                   F.col('i94mode').alias('Mode'),
                                   df_weather.AverageTemperature).dropDuplicates().withColumn("Id", F.monotonically_increasing_id())

In [14]:
imig_weather_fact.show(5)

+-------+------------+----+----+------------------+-----------+
|i94port|Arrival_Date|Visa|Mode|AverageTemperature|         Id|
+-------+------------+----+----+------------------+-----------+
|    SNA|     20545.0| 1.0| 1.0| 7.168999999999999|34359738368|
|    SNA|     20545.0| 2.0| 1.0| 7.168999999999999|34359738369|
|    SNA|     20545.0| 3.0| 1.0| 7.168999999999999|34359738370|
|    SNA|     20546.0| 3.0| 2.0| 7.168999999999999|34359738371|
|    SNA|     20546.0| 2.0| 1.0| 7.168999999999999|34359738372|
+-------+------------+----+----+------------------+-----------+
only showing top 5 rows



In [15]:
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
imig_weather_fact = imig_weather_fact.withColumn("Arrival_Date_dt", get_date(F.col("Arrival_Date")))
imig_weather_fact.show(20)

+-------+------------+----+----+------------------+-----------+---------------+
|i94port|Arrival_Date|Visa|Mode|AverageTemperature|         Id|Arrival_Date_dt|
+-------+------------+----+----+------------------+-----------+---------------+
|    SNA|     20545.0| 1.0| 1.0| 7.168999999999999|34359738368|     2016-04-01|
|    SNA|     20545.0| 2.0| 1.0| 7.168999999999999|34359738369|     2016-04-01|
|    SNA|     20545.0| 3.0| 1.0| 7.168999999999999|34359738370|     2016-04-01|
|    SNA|     20546.0| 3.0| 2.0| 7.168999999999999|34359738371|     2016-04-02|
|    SNA|     20546.0| 2.0| 1.0| 7.168999999999999|34359738372|     2016-04-02|
|    SNA|     20546.0| 2.0| 2.0| 7.168999999999999|34359738373|     2016-04-02|
|    SNA|     20546.0| 3.0| 1.0| 7.168999999999999|34359738374|     2016-04-02|
|    SNA|     20546.0| 1.0| 1.0| 7.168999999999999|34359738375|     2016-04-02|
|    SNA|     20547.0| 2.0| 1.0| 7.168999999999999|34359738376|     2016-04-03|
|    SNA|     20547.0| 3.0| 2.0| 7.16899

In [16]:
immigration_dim = df_immigration.select(["i94port", "i94cit", "i94mode", "i94visa", "i94bir", "arrdate", "i94mon","i94yr", "depdate"])

In [17]:
immigration_dim.show(5)

+-------+------+-------+-------+------+-------+------+------+-------+
|i94port|i94cit|i94mode|i94visa|i94bir|arrdate|i94mon| i94yr|depdate|
+-------+------+-------+-------+------+-------+------+------+-------+
|    ATL| 254.0|    1.0|    3.0|  25.0|20551.0|   4.0|2016.0|   null|
|    WAS| 101.0|    1.0|    2.0|  55.0|20545.0|   4.0|2016.0|20691.0|
|    NYC| 101.0|    1.0|    2.0|  28.0|20545.0|   4.0|2016.0|20567.0|
|    NYC| 101.0|    1.0|    2.0|   4.0|20545.0|   4.0|2016.0|20567.0|
|    NYC| 101.0|    1.0|    1.0|  57.0|20545.0|   4.0|2016.0|20555.0|
+-------+------+-------+-------+------+-------+------+------+-------+
only showing top 5 rows



In [18]:
weather_dim1 = df_weather.select(["i94port", "AverageTemperature", "City", "Country", "Latitude", "Longitude"])

In [19]:
weather_dim1.show(5)

+-------+------------------+--------+-------------+--------+---------+
|i94port|AverageTemperature|    City|      Country|Latitude|Longitude|
+-------+------------------+--------+-------------+--------+---------+
|    PER|            15.488|   Perth|    Australia|  31.35S|  114.97E|
|    SEA|            -1.977| Seattle|United States|  47.42N|  121.97W|
|    HAM|             2.767|Hamilton|       Canada|  42.59N|   80.73W|
|    ONT| 7.399999999999999| Ontario|United States|  34.56N|  116.76W|
|    SPO|             2.322| Spokane|United States|  47.42N|  117.24W|
+-------+------------------+--------+-------------+--------+---------+
only showing top 5 rows



In [21]:
imig_weather_fact.select([F.count(F.when(F.isnan('i94port'), True))]).show()

+---------------------------------------------+
|count(CASE WHEN isnan(i94port) THEN true END)|
+---------------------------------------------+
|                                            0|
+---------------------------------------------+



In [30]:
fact_test = spark.createDataFrame(imig_weather_fact.head(100), inferSchema = True)
#fact_test.show()

TypeError: createDataFrame() got an unexpected keyword argument 'inferSchema'

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

### 1st Data Quality checks are the count and number of null and nan values present in PORT column

In [24]:
# Check for fact table
imig_weather_fact.count()

10209

In [23]:
# Checking Fact table for any null and nan values in PORT column
imig_weather_fact.select([F.count(F.when(F.isnan('i94port') | F.col('i94port').isNull() , True))]).show()

+--------------------------------------------------------------------+
|count(CASE WHEN (isnan(i94port) OR (i94port IS NULL)) THEN true END)|
+--------------------------------------------------------------------+
|                                                                   0|
+--------------------------------------------------------------------+



In [25]:
# Check for immigration dimension table
immigration_dim.count()

3088544

In [25]:
# Checking immigration dimension table for any null and nan values in PORT column
immigration_dim.select([F.count(F.when(F.col('i94port').isNull() | F.isnan('i94port'), True))]).show()

+--------------------------------------------------------------------+
|count(CASE WHEN ((i94port IS NULL) OR isnan(i94port)) THEN true END)|
+--------------------------------------------------------------------+
|                                                                   0|
+--------------------------------------------------------------------+



In [None]:
# check for weather dimension table
# It's taking long time, but evetually gives the count, however, i have tried to print the head of the table earlier which
# can also be considered as a quality check
weather_dim1.count()

In [26]:
# Checking weather dimension table for any null and nan values in PORT column
weather_dim1.select([F.count(F.when(F.col('i94port').isNull() | F.isnan('i94port'), True))]).show()

KeyboardInterrupt: 

### 2nd Data Quality check is to check if the PORT column has any invalid code (example - 'XXX')
##### 'XXX' code is invalid as per the data dictionary provided - SAS_labels_description

In [33]:
# The data quality checks are taking sometime to complete execution, so providing the logic of quality check
df = spark.createDataFrame([("XXX", 123),
                           ("ABC", 333) ,
                           ("XXX", 546) ,
                           ("TNT", 444)], ("col1", "col2"))
df.show()

+----+----+
|col1|col2|
+----+----+
| XXX| 123|
| ABC| 333|
| XXX| 546|
| TNT| 444|
+----+----+



In [34]:
df.select([F.count(F.when(F.col('col1') == 'XXX', True))]).show()

+-------------------------------------------+
|count(CASE WHEN (col1 = XXX) THEN true END)|
+-------------------------------------------+
|                                          2|
+-------------------------------------------+



### Using the same logic on our fact and dimension tables should give us result for "XXX" columns is 0  because we created a new dictionary (dict_port) having only the valid codes

In [27]:
# But it's taking sometime to run, hence interrupted it, but confident that the logic is correct to check data quality.
imig_weather_fact.select([F.count(F.when(F.col('i94port') == 'XXX', True))]).show()

KeyboardInterrupt: 

In [None]:
# But it's taking sometime to run, hence interrupted it, but confident that the logic is correct to check data quality.
immigration_dim.select([F.count(F.when(F.col('i94port') == 'XXX', True))]).show()

In [None]:
# But it's taking sometime to run, hence interrupted it, but confident that the logic is correct to check data quality.
weather_dim1.select([F.count(F.when(F.col('i94port') == 'XXX', True))]).show()

### 3rd Data Quality check is to check the schema of the fact and dimension tables if the columns are retaining the schema.

In [28]:
imig_weather_fact.printSchema()

root
 |-- i94port: string (nullable = true)
 |-- Arrival_Date: double (nullable = true)
 |-- Visa: double (nullable = true)
 |-- Mode: double (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- Id: long (nullable = false)
 |-- Arrival_Date_dt: string (nullable = true)



In [29]:
immigration_dim.printSchema()

root
 |-- i94port: string (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- depdate: double (nullable = true)



In [30]:
weather_dim1.printSchema()

root
 |-- i94port: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### The Data Dictionary is in txt file with name 'Data_Dictionary.txt'

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### My Response for Step 5
    1. As we are dealing with datadets having millions of rows, Spark is the best choice available to deal with the data.
        Spark allows to wrangle data using Spark Dataframes.
    2. The data can be upadted once in a fortnight or even once ina week depending on the number of immigrants.
    3. The problem can he handled in different situations
        * When the data is increased by 100X, the organization will have to spend resources to create a data management hub. Which is not only costly, but also is rigid to expand if necessary in the future. It's advisable to have resources on the cloud. Ideal solution would be to use an AWS cloud S3 bucket to store the massive data. From S3, data can be pulled onto Redshift cluster for analytics because it's well optimized for analytics on large datasets.
        * The process can be automated using Apache Airflow and can customize when to update the dashboard. An Airflow DAG can be created with schedule_interval set as daily and start time at 7am. 
        * The more people accessing the database the more cpu resources are needed to get a fast experience. By using a distributed database, performance can be improved by partitioning to get faster query results for each user. By implementing the entire workflow on AWS cloud, it can be accessed by anyone having an allocated IAM role and