In [None]:
# Drive Mount
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

# Open directory
%cd /content/gdrive/My Drive/ONLINE COURSES/UDACITY/DATA ENGINEER NANO-DEGREE

Mounted at /content/gdrive
/content/gdrive/My Drive/ONLINE COURSES/UDACITY/DATA ENGINEER NANO-DEGREE


In [None]:
# Install package(s)
!pip install pyspark==2.3.0



In [None]:
#Import packages
import pandas as pd
import configparser

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import *

import clean_functions
import create_functions

In [None]:
# Load configuration
config = configparser.ConfigParser()
config.read('config.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [None]:
# Instantiate Spark Session
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()

## Immigration Data
---

In [None]:
# read data
immi_dataset = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(immi_datase)

In [None]:
# first five records
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [None]:
# number of records
print_formatted_float(immigration_df.count())

3,096,313


In [None]:
# shows top 5 unique visa country codes
immigration_df.select("visapost").dropDuplicates().show(5)

+--------+
|visapost|
+--------+
|     CRS|
|     KGL|
|     AKD|
|     BGM|
|     TRK|
+--------+
only showing top 5 rows



#### Data Cleaning:
---
> Drop all columns with most N/A values. Contains unsufficient data to be used for analytics.

> Drop all rows with N/A values. 

###### Drop columns with significant missing values

In [None]:
# remove columns missing most values
columns = ['insnum', 'entdepu', 'occup']
cleaned_immi_df = immigration_df.drop(*columns)

In [None]:
# display the new schema
cleaned_immi_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [None]:
# drop duplicates
cleaned_immi_df = cleaned_immi_df.dropDuplicates(['cicid'])

In [None]:
# count after dropping duplicates
print_formatted_float(cleaned_immi_df.count())

3,096,313


In [None]:
# drop rows missing values
cleaned_immi_df = cleaned_immi_df.dropna(how='all', subset=['cicid'])

In [None]:
# count after dropping rows
print_formatted_float(cleaned_immi_df.count())

3,096,313


In [None]:
# clean the immigration dataframe
final_immigration_df = clean_functions.clean_immigration_spark_df(immigration_df)

Total records in dataframe: 3,096,313
Total records after cleaning: 3,096,313


## World Temperature Data
---

In [None]:
# load dataset
temp_dataset = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = spark.read.csv(temp_dataset, header=True, inferSchema=True)

In [None]:
# schema
temperature_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



#### Data Cleaning:

-  Drop all rows with N/A in average temperature
-  Drop duplicate columns 

In [None]:
# clean the data
cleaned_temp_df = clean_functions.clean_temperature_spark_df(temperature_df)

Total records in dataframe: 8,599,212
Total records after dropping rows with missing values: 364,130
Rows dropped after accounting for duplicates: 44,299


## Demographics Data 
---

In [None]:
# load dataset
demo_dataset = "us-cities-demographics.csv"
demographics_df = spark.read.csv(demo_dataset, inferSchema=True, header=True, sep=';')

In [None]:
# first 5 rows
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [None]:
# number of records
print_formatted_float(demographics_df.count())

2,891


In [None]:
# schema
demographics_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



#### Data Cleaning:

-  Drop rows with N/A values
-  Drop duplicate columns 

In [None]:
# clean demographics data
cleaned_demographics_df = clean_functions.clean_demographics_spark_df(demographics_df)

Rows dropped with missing values: 16
Rows dropped after accounting for duplicates: 0


# Pipelines - Model Data

##### Create the immigration fact table
---

In [None]:
def create_immigration_fact_table(spark, df, output_data):
    """
      Creates country dimension from immigration and land temperatures datasets.
    """

    # retrieve visa_type dimension
    dim_df = retrieve_visa_type_dim(spark, output_data)

    # create/replace view for visa type
    dim_df.createOrReplaceTempView("visa_type_view")

    # udf that converts SAS format to datetime
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

    # rename columns
    df = df.withColumnRenamed('ccid', 'record_id') \
           .withColumnRenamed('i94res', 'residence_code') \
           .withColumnRenamed('i94addr', 'state_code')

    # create/replace view for immigration
    df.createOrReplaceTempView("immigration_view")

    # create visa_type key
    df = spark.sql(
        """
        SELECT 
            immigration_view.*, 
            visa_type_view.visa_type_id
        FROM immigration_view
        LEFT JOIN visa_type_view ON visa_type_view.visatype=immigration_view.visatype
        """
    )

    # converts date into datetime object
    df = df.withColumn("arrdate", get_datetime(df.arrdate))

    # drop visatype key
    df = df.drop(df.visatype)

    # write/overwrite dimension to parquet
    df.write.parquet(output_data + "immigration_fact", mode="overwrite")

    return immigration_df

In [None]:
immigration_df = create_immigration_fact_table(final_immigration_df, output_data)

##### Create immigration arrivals table
---

In [None]:
def create_immigration_arrivals_dimension(df, output_data):
    """
      Creates immigration arrivals table
    """

    # udf that converts SAS format to datetime
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)

    arrivals_df = df.select(['arrdate']).withColumn("arrdate", get_datetime(df.arrdate)).distinct()

    # compartmentalize datetime data
    arrivals_df = arrivals_df.withColumn('arrival_day', dayofmonth('arrdate'))
    arrivals_df = arrivals_df.withColumn('arrival_week', weekofyear('arrdate'))
    arrivals_df = arrivals_df.withColumn('arrival_month', month('arrdate'))
    arrivals_df = arrivals_df.withColumn('arrival_year', year('arrdate'))
    arrivals_df = arrivals_df.withColumn('arrival_weekday', dayofweek('arrdate'))

    # create id field
    arrivals_df = arrivals_df.withColumn('id', monotonically_increasing_id())

    # write/overwrite dimension to parquet
    part_columns = ['arrival_year', 'arrival_month', 'arrival_week']
    arrivals_df.write.parquet(output_data + "immigration_arrivals", partitionBy=part_columns, mode="overwrite")

    return arrivals_df

In [None]:
output_data = "tables/"

In [None]:
arrivals_df = create_immigration_arrivals_dimension(final_immigration_df, output_data)

##### Create the visa type dimension table
---

In [None]:
def create_visa_dimension_table(df, output_data):
    """
      Creates visa dimension from immigration dataset.
    
    """
    # create visa type df
    visatype_df = df.select(['visatype']).distinct()

    # add id column
    visatype_df = visatype_df.withColumn('visa_type_id', monotonically_increasing_id())

    # write/overwrite dimension to parquet
    visatype_df.write.parquet(output_data + "visatype", mode="overwrite")

    return visatype_df

In [None]:
# test create visa_type dimension function
visatype_df = create_visa_dimension_table(final_immigration_df, output_data)
visatype_df.show(n=5)

+--------+-------------+
|visatype|visa_type_key|
+--------+-------------+
|      F2| 103079215104|
|     GMB| 352187318272|
|      B2| 369367187456|
|      F1| 498216206336|
|     CPL| 601295421440|
+--------+-------------+
only showing top 5 rows



##### Create country table
---

In [None]:
def create_country_dimension_table(spark, df, temp_df, output_data, mapping_file):
    """
      Creates country dimension table.
    """

    # create/replace view for immigration
    df.createOrReplaceTempView("immigration_view")

    # create/replace view for countries codes
    mapping_file.createOrReplaceTempView("country_codes_view")

    # retreive grouped temperature data
    agg_temp = group_temperature_data(temp_df)

    # create/replace view for countries temperature
    agg_temp.createOrReplaceTempView("grouped_temperature_view")

    # create country dimension using SQL
    country_df = spark.sql(
        """
        SELECT 
            i94res as country_code,
            Name as country_name
        FROM immigration_view
        LEFT JOIN country_codes_view
        ON immigration_view.i94res=country_codes_view.code
        """
    ).distinct()

    # create temp country view
    country_df.createOrReplaceTempView("country_view")

    country_df = spark.sql(
        """
        SELECT 
            country_code,
            country_name,
            average_temperature
        FROM country_view
        LEFT JOIN grouped_temperature_view
        ON country_view.country_name=grouped_temperature_view.Country
        """
    ).distinct()

    # write/overwrite dimension to a parquet
    country_df.write.parquet(output_data + "country", mode="overwrite")

    return country_df

In [None]:
country_df = create_country_dimension_table(new_immigration_df, new_temperature_df, output_data)

In [None]:
country_dim_f.show(5)

+------------+------------+-------------------+
|country_code|country_name|average_temperature|
+------------+------------+-------------------+
|       692.0|     Ecuador|      20.5391705374|
|       299.0|    Mongolia|     -3.36548531952|
|       576.0| El Salvador|      25.2628525509|
|       735.0|  Montenegro|      10.2210401137|
|       206.0|   Hong Kong|      21.4236961538|
+------------+------------+-------------------+
only showing top 5 rows



##### Create the demographics dimension table
---

In [None]:
def create_demographics_dimension_table(df, output_data):
    """
      Creates demographics dimension table.
    """

    dim_df = df.withColumnRenamed('Median Age', 'median_age') \
                .withColumnRenamed('Male Population', 'male_population') \
                .withColumnRenamed('Female Population', 'female_population') \
                .withColumnRenamed('Total Population', 'total_population') \
                .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
                .withColumnRenamed('Foreign-born', 'foreign_born') \
                .withColumnRenamed('Average Household Size', 'average_household_size') \
                .withColumnRenamed('State Code', 'state_code')

    # add id column
    dim_df = dim_df.withColumn('id', monotonically_increasing_id())

    # write/overwrite dimension to parquet
    dim_df.write.parquet(output_data + "demographics", mode="overwrite")

    return demographics_df

In [None]:
demographics_df = create_demographics_dimension_table(demographics_df, output_data)
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,Race,Count,id
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723,0
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian,3152,1
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino,95154,2
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian,2788,3
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native,1057,4


#### 4.2 Data Quality Checks
The data quality checks ensures that the ETL has created fact and dimension tables with adequate records. 

In [None]:
tables = {
    'immigration_fact': immigration_df,
    'visatype': visatype_df,
    'immigration_arrivals': arrivals_df,
    'demographics': demographics_df,
    'country': country_df
}
for table_name, table_df in tables.items():
    create_functions.data_quality_check(table_df, table_name)

Quality check Successful on immigration_fact - 3,096,313 records. \
Quality check Successful on visatype - 17 records. \
Quality check Successful on immigration_arrivals - 30 records. \
Quality check Successful on demographics - 2,875 records. \
Quality check Successful on country - 229 records. \