## Udacity Capstone Project

#### ProjectSummary.md has the writeup for this project. 

In [5]:
import pandas as pd
import os
import configparser
import datetime as dt

from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format
from pyspark.sql import functions as func
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit
from pyspark.sql import Row

#### Function definitions to create data models

In [6]:
def load_immigration_data(df, spark):
    # df = spark.read.parquet(input_path)
    
    df = df \
    .withColumn("cicid", col("cicid").cast("integer")) \
    .withColumn("year", col("i94yr").cast("integer")) \
    .drop("i94yr") \
    .withColumn("month", col("i94mon").cast("integer")) \
    .drop("i94mon") \
    .withColumn("bornCountry", col("i94cit").cast("integer")) \
    .drop("i94cit") \
    .withColumn("residentCountry", col("i94res").cast("integer")) \
    .drop("i94res") \
    .withColumnRenamed("i94port", "arrivalPort") \
    .withColumn("mode", col("i94mode").cast("integer")) \
    .drop("i94mode") \
    .withColumnRenamed("i94addr", "state_code") \
    .withColumn("age", col("i94bir").cast("integer")) \
    .drop("i94bir") \
    .withColumn("visa", col("i94visa").cast("integer")) \
    .drop("i94visa") \
    .withColumnRenamed("entdepa", "arrivalFlag") \
    .withColumnRenamed("entdepd", "departureFlag") \
    .withColumnRenamed("entdepu", "updateFlag") \
    .withColumnRenamed("matflag", "matchFlag") \
    .withColumn("birthYear", col("biryear").cast("integer")) \
    .drop("biryear") \
    .withColumnRenamed("fltno", "flightNumber") \
    .withColumn("sasDate", to_date(lit("01/01/1960"), "MM/dd/yyyy")) \
    .withColumn("arrivalDate", expr("date_add(sasDate, arrdate)")) \
    .withColumn("departureDate", expr("date_add(sasDate, depdate)")) \
    .drop("sasDate", "arrdate", "depdate", "count", "admnum", "dtadfile", "visapost", "occup", "dtaddto", "insnum")
    return df

#### Function definition to load data

In [44]:
def create_airport_table(spark, input_df , output_path):
    '''
    '''
    # airport = spark.read.format("csv").option("header", "true").option("delimiter", ",").load(filename)
    airport = input_df   

    # filtering only US airports, data splitting country and state by "-" from the iso_region column & dropping old iso_region column  
    airport =   airport.where(
            (col("iso_country") == "US") & (col("iata_code").isNotNull()) & (col("type").isin("large_airport", "medium_airport", "small_airport"))) \
            .withColumn("state", split(col("iso_region"), "-")[1]) \
            .drop("local_code", "elevation_ft", "iso_region", 'continent') \
            .dropDuplicates()
    
    airport = airport.select(['ident', 'type', 'name', 'state', 'municipality','gps_code', 'iata_code','iso_country', 'coordinates']) \
               .dropDuplicates().dropna()
    
    airport.write.mode("overwrite").parquet(output_path+"airport.parquet")
    
  
    
    return airport


def create_visa_type(df, output_path):
    '''
    '''
    # create visatype df from visatype column
    visatype_df = df.select(['visatype']).distinct()

    # add an id column
    visatype_df = visatype_df.withColumn('visa_type_key', monotonically_increasing_id())

    visatype_df.write.mode("overwrite").parquet(output_path+"visa_type.parquet")

    return visatype_df


def create_state_demographics(spark,input_df, output_path):
    """This function creates a us demographics dimension table from the us cities demographics data.
    :param df: spark dataframe of us demographics survey data
    :param output_data: path to write dimension dataframe to
    :return: spark dataframe representing demographics dimension
    """
    #cities = spark.read.format("csv").option("header", "true").option("delimiter", ";").load(input_path)
    cities = input_df
    
    state_demographics = cities \
        .groupBy(col("State Code").alias("stateCode"), col("State").alias("state")).agg(
        round(mean('Median Age'), 2).alias("medianAge"),\
        sum("Total Population").alias("totalPopulation"),\
        sum("Male Population").alias("malePopulation"), \
        sum("Female Population").alias("femalePopulation"),\
        sum("Number of Veterans").alias("numberOfVeterans"),\
        sum("Foreign-born").alias("foreignBorn"), \
        round(mean("Average Household Size"),2).alias("averageHouseholdSize")
        ).dropna()
    # lets add an id column
    state_demographics = state_demographics.withColumn('id', monotonically_increasing_id())

    # write dimension to parquet file
    state_demographics.write.parquet(output_path + "demographics.parquet", mode="overwrite")

    return state_demographics

def create_time(df, output_path):
    time_table = df.select(['arrivalDate'])\
                    .withColumnRenamed('arrivalDate','time') 

    time_table = time_table \
                 .withColumn('arrival_day', func.dayofmonth('time')) \
                 .withColumn('arrival_month', func.month('time')) \
                 .withColumn('arrival_year', func.year('time')) \
                 .withColumn('arrival_week', func.weekofyear('time')) \
                 .withColumn('arrival_weekday', func.dayofweek('time'))\
    
    time_table = time_table.dropDuplicates()

    # write the calendar dimension to parquet file
    partition_columns = ['arrival_year', 'arrival_month', 'arrival_week']
    time_table.write.parquet(output_path + "immigration_calendar.parquet", partitionBy=partition_columns, mode="overwrite")

    return time_table

def create_immigration_facts(df, visatype_df, state_demographics_df, time_df, airport_df, outpath):

    immigration = df.select(['cicid', 'arrivalDate','mode','bornCountry', 'airline','flightNumber','visa','visaType',
                         'gender','arrivalPort', 'matchFlag', 'year', 'month', 'birthyear', 'residentCountry', 'state_code']) \
                .dropna() \
                .dropDuplicates(['cicid'])

    immigration = immigration.join(visatype_df, (visatype_df.visatype == immigration.visaType) , how = 'inner') \
                      .drop('visatype') \
                      .drop('visa') \
                      .dropna() \
                      .dropDuplicates()

    immigration = immigration.join(state_demographics_df.select(['stateCode','id']), (state_demographics_df.stateCode == immigration.state_code) , how = 'left') \
                      .drop('state_code').drop('stateCode').drop('state').withColumnRenamed("id", "state_code_id").dropna().dropDuplicates()
    '''
    immigration = immigration.join(select(['time']), (time_df.time == immigration.arrivalDate), how = 'inner') \
                      .drop('arrdate') \
                      .dropna() \
                      .dropDuplicates()
    '''

    immigration = immigration.join(airport_df.select(['iata_code', 'ident']), (airport_df.iata_code == immigration.arrivalPort), how = 'left') \
                      .drop('iata_code').withColumnRenamed("ident", "airport_id").dropna().dropDuplicates()
    
    
    # write dimension to parquet file
    partition_columns = ['year', 'month']
    immigration.write.parquet(outpath + "immigration", partitionBy=partition_columns, mode="overwrite")
    
    return immigration

#### Utility Functions

In [27]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

#### Data Cleaning Functions

In [28]:
def visualize_missing_values(df):
    """Visualize missing values in a spark dataframe
    
    :param df: spark dataframe
    """
    # create a dataframe with missing values count per column
    df = df.toPandas()
    
    nulls_df = pd.DataFrame(data= df.isnull().sum(), columns=['values'])
    nulls_df = nulls_df.reset_index()
    nulls_df.columns = ['cols', 'values']

    # calculate % missing values
    nulls_df['% missing values'] = 100*nulls_df['values']/df.shape[0]
    
    #Convert to spark before sending
    print(nulls_df)
    
def clean_immigration_data(df):
    """Clean immigration dataframe
    :param df: spark dataframe with monthly immigration data
    :return: clean dataframe
    """
    total_records = df.count()
    
    print(f'Total records in dataframe: {total_records:,}')
    
    # EDA has shown these columns to exhibit over 90% missing values, and hence we drop them
    drop_columns = ['occup', 'entdepu','insnum']
    df = df.drop(*drop_columns)
    
    # drop rows where all elements are missing
    df = df.dropna(how='all')

    new_total_records = df.count()
    
    print(f'Total records after cleaning: {new_total_records:,}')
    
    return df


def clean_demographics_data(df):
    """Clean the US demographics dataset
    
    :param df: spark dataframe of US demographics dataset
    :return: clean dataframe
    """
    # drop rows with missing values
    subset_cols = [
        'Male Population',
        'Female Population',
        'Number of Veterans',
        'Foreign-born',
        'Average Household Size'
    ]
    new_df = df.dropna(subset=subset_cols)
    
    rows_dropped = df.count()-new_df.count()
    print("Rows dropped with missing values: {}".format(rows_dropped))
    
    # drop duplicate columns
    new_df2 = new_df.dropDuplicates(subset=['City', 'State', 'State Code', 'Race'])
    
    rows_dropped_with_duplicates = new_df.count()-new_df2.count()
    print(f"Rows dropped after accounting for duplicates: {rows_dropped_with_duplicates}")
    
    return new_df2


### 3.2 Mapping Out Data Pipelines
The pipeline steps are as follows:

- Load the datasets
- Clean the I94 Immigration data
- Create visa_type dimension table
- Create time dimension table
- Extract clean airport data
- Create airport table
- Load demographics data
- Clean demographics data
- Create demographic dimension table
- Create immigration fact table

In [29]:
spark = create_spark_session()

# Load Immigration data
df = spark.read.parquet("sas_data")

# Create a smaller sample for quick run
df = df.sample(True, 0.0001)

# Explore Immigration data
visualize_missing_values(df)

# Clean data
df = clean_immigration_data(df)

# rename and load.
df = load_immigration_data(df, spark)

        cols  values  % missing values
0      cicid       0          0.000000
1      i94yr       0          0.000000
2     i94mon       0          0.000000
3     i94cit       0          0.000000
4     i94res       0          0.000000
5    i94port       0          0.000000
6    arrdate       0          0.000000
7    i94mode       0          0.000000
8    i94addr      13          4.012346
9    depdate      13          4.012346
10    i94bir       0          0.000000
11   i94visa       0          0.000000
12     count       0          0.000000
13  dtadfile       0          0.000000
14  visapost     200         61.728395
15     occup     324        100.000000
16   entdepa       0          0.000000
17   entdepd      13          4.012346
18   entdepu     324        100.000000
19   matflag      13          4.012346
20   biryear       0          0.000000
21   dtaddto       0          0.000000
22    gender      44         13.580247
23    insnum     319         98.456790
24   airline       8     

In [30]:
airport_df = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("airport-codes_csv.csv")

visualize_missing_values(airport_df.where(
            (col("iso_country") == "US")))

# Between iata_code and local_code, the airports can be uniquely identified.

            cols  values  % missing values
0          ident       0          0.000000
1           type       0          0.000000
2           name       0          0.000000
3   elevation_ft     239          1.050226
4      continent       0          0.000000
5    iso_country       0          0.000000
6     iso_region       0          0.000000
7   municipality     102          0.448214
8       gps_code    1773          7.791009
9      iata_code   20738         91.128005
10    local_code    1521          6.683658
11   coordinates       0          0.000000


In [32]:
demographics_df = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")

visualize_missing_values(demographics_df)

demographics_df = clean_demographics_data(demographics_df)

                      cols  values  % missing values
0                     City       0          0.000000
1                    State       0          0.000000
2               Median Age       0          0.000000
3          Male Population       3          0.103770
4        Female Population       3          0.103770
5         Total Population       0          0.000000
6       Number of Veterans      13          0.449671
7             Foreign-born      13          0.449671
8   Average Household Size      16          0.553442
9               State Code       0          0.000000
10                    Race       0          0.000000
11                   Count       0          0.000000
Rows dropped with missing values: 16
Rows dropped after accounting for duplicates: 0


### Create data models

#### Create the dimension tables and write them as parquet file

In [33]:

output_path = "./sas_data_out/"

airports_df = create_airport_table(spark, airport_df, output_path)

visa_type_df = create_visa_type(df, output_path)

state_demographics_df = create_state_demographics(spark, demographics_df, output_path)

time_df = create_time(df, output_path)

#### Create the fact tables and write them as parquet file, partitioned by year and month.

In [45]:
immigration_facts_df = create_immigration_facts(df=df, airport_df=airports_df, 
                                                visatype_df=visa_type_df, 
                                                state_demographics_df=state_demographics_df, time_df=time_df, outpath=output_path) 

### Check Data Models - Data Quality check

#### Data Quality check for dimension tables

In [35]:
def data_len_check(df):
    '''
    This function checks if the number of the columns in the dataframe is 0 or not. 
    This function also chceks if the number of records in the dataframe is 0 or not.
    '''
    df_len = df.count()
    df_col_len = len(df.columns)
    return df_col_len <= 0 or df_len <= 0

def count_missings(spark_df,sort=True):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([func.count(func.when(func.isnan(c) | func.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes 
                          if c_type not in ('timestamp', 'string', 'date')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df
    
    

In [36]:
# Print a warning if any of the dimension table(s) has no data

if data_len_check(airports_df):
    print("Airports Table data check failed")
elif data_len_check(visa_type_df):
    print("Visa Type Table data check failed")
elif data_len_check(state_demographics_df):
    print("State Demographic Table data check failed")
elif data_len_check(time_df):
    print("Time Table data check failed")

#### Data Quality check for fact tables

In [37]:
# Print a warning if any of the fact table(s) has no data

if data_len_check(immigration_facts_df):
    print("Immigration facts data check failed")

In [38]:
# print a summary of missing data on the fact table so the user can see if the 

print(count_missings(immigration_facts_df))

                 count
cicid                0
mode                 0
bornCountry          0
year                 0
month                0
birthyear            0
residentCountry      0
visa_type_key        0
state_code_id        0


In [None]:
##### Thank you #######