# Project Title
### Data Engineering Capstone Project --- Spark
This notebook shows the dimensions of immigration dataset.

### Step 1: Scope the Project and Gather Data

#### Scope 
This projects main goal is to deliver data about immigration to US for further analysis purposes of the marketing department of the company I work for as a data engineer. Stakeholders are interested not only in the airports and nearby cities the US visitors choose to travel to, but also about the demographics of this cities and temperature data.  

#### I94 Immigration Data
The dataset comes from the US National Tourism and Trade Office.

In [1]:
from pyspark.sql import SparkSession

from pyspark.sql.types import IntegerType, DateType

from pyspark.sql.functions import udf

from pyspark.sql.functions import month

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [5]:
dir_immi_data = '../../data/18-83510-I94-Data-2016/*'

In [6]:
import glob
import re
immi_files = glob.glob(dir_immi_data)

for count,file in enumerate(immi_files):
    month = re.search(r'.*\/i94_(.*)16', file)
    raw_df = spark.read.format('com.github.saurfang.sas.spark').load(file)
    df_immigration_shape = (raw_df.count(), len(raw_df.columns))
    print(f'--> Shape for {month.group(1)}:  columns: {df_immigration_shape[1]} rows: {df_immigration_shape[0]} <-- filename: {file}' )

--> Shape for apr:  columns: 28 rows: 3096313 <-- filename: ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
--> Shape for sep:  columns: 28 rows: 3733786 <-- filename: ../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat
--> Shape for nov:  columns: 28 rows: 2914926 <-- filename: ../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat
--> Shape for mar:  columns: 28 rows: 3157072 <-- filename: ../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat
--> Shape for jun:  columns: 34 rows: 3574989 <-- filename: ../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat
--> Shape for aug:  columns: 28 rows: 4103570 <-- filename: ../../data/18-83510-I94-Data-2016/i94_aug16_sub.sas7bdat
--> Shape for may:  columns: 28 rows: 3444249 <-- filename: ../../data/18-83510-I94-Data-2016/i94_may16_sub.sas7bdat
--> Shape for jan:  columns: 28 rows: 2847924 <-- filename: ../../data/18-83510-I94-Data-2016/i94_jan16_sub.sas7bdat
--> Shape for oct:  columns: 28 rows: 3649136 <-- filename: ../.

#### Assesing the data
Based on the numbers above (data immigrations files shape) it can stated, that files on average have 3 Mio. rows. Thereafter around 35 Mio. rows must be analyzed in order to extract valuable information

#### Issue
* jun file consists of 34 columns, whereas all over columns only have 28. Spark allows to only read data with same column number. In order to get the data extracted and transformed for each month a module has to be written, which removes unnecessary columns from jun data file
* python module re is incompatible with spark

#### Goal
Manipulating data results very often in easy to follow visible changes to the data set. Here I will process data and compare shapes of individual raw and processed datasets.

In [2]:
import pandas as pd
import datetime as dt

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 200)
pd.set_option('display.max_rows', 100)

In [None]:
# visatype
fname_visatype = 'input/visatype.csv'

# country_code
fname_country_code = 'i94_cit.csv'

# airports (4 us_poe)
fname_airports = 'airports.csv'

dir_immi_data = '../../data/18-83510-I94-Data-2016/*'

# visatype
raw_visatype = pd.read_csv(fname_visatype,delimiter='|')

# country_code
raw_country_code = pd.read_csv(fname_country_code)

# airports (4 us_poe)
raw_airports = pd.read_csv(fname_airports)

# visatype
df_visatype = raw_visatype.copy()

# country_code
df_country_code = raw_country_code.copy()

# airports (4 us_poe)
df_airports = raw_airports.copy()

#### In order to extract only valuable data from the files a number of manipulations must be made (see code block below):
* matflag is based on matching arrival and departue records. Mismatched values will be filtered first
* visatype will be matched with a table df_visatype. This comes in handy because by modifying the this table one can generate more or less data 
* column i94 contains information about way of transportation: 1 -> Air, 2 -> Sea, 3 -> Land. Stakeholders are only interested in analyzing airports
* gender contains NaN values. These will be removed
* another additional dataframe (country_code_list) will be used to filter data by Country of Citizenship & Country of Residence (i94cit, i94res)
* the remaining rows will be filtered by using a list of US international Airports
* immigration use SAS format in order to save dates. These must be transformed into conventional format
* data can further be filtered by comparing arrival and departure dates. Logically, the latter cannot be smaller
* month column will be generated in order to partition data more efficiently
* columns that are not needed for further analysis will be removed
* multiple columns will be set to type integer
* for better communication multiple columns will be renamed (see Table above for details)

In [3]:
import glob
immi_files = glob.glob(dir_immi_data)
    
def remove_extra(df):
    """This method loooks for pre-defined columns in a dataframe and removes them
    
    Params:
        df: dataframe
        
    Returns: cleaned dataframe
    """
    cols_2_remove = ['validres', 'delete_days', 'delete_mexl', 'delete_dup', 'delete_visa', 'delete_recdup']
    df = df.drop(*cols_2_remove)
    return df

udf_to_datetime_sas = udf(lambda x: to_datetime(x), DateType())
def to_datetime(x):
    """This method takes in a SAS coded date and converts it to a normal one
    
    Params:
        x: SAS encoded date
    
    Returns: normal encoded date
    """
    try:
        start = dt.datetime(1960, 1, 1).date()
        return start + dt.timedelta(days=int(x))
    except:
        return None

def process_immi_data(spark, df_raw, df_visatype, df_country_code, df_airports):
    """This method cleanes immigration datasets
    
    Params:
        spark: spark session
        df_raw: dataset to manipulate
        df_visatype: additional dataset for mapping purposes
        df_country_code: additional dataset for mapping purposes
        df_airports: additional dataset for mapping purposes
    
    Returns: df, cleaned immigration dataset
    """
    # matflag is null
    df_raw = df_raw.filter(df_raw.matflag.isNotNull())
    # visatype GMT  
    visatype_list = df_visatype.visatype.tolist()
    df_raw = df_raw.filter( df_raw.visatype.isin(visatype_list) )
    # i94mode other than 1 2 3
    # 1: Air, 2: Sea, 3: Land
    travel_mode = [1]
    df_raw = df_raw.filter( df_raw.i94mode.isin(travel_mode) )
    # gender is null
    df_raw = df_raw.filter(df_raw.gender.isNotNull())
    # Remove rows having invalid CoC & CoR
    country_code_list = df_country_code.I94_country_code.astype('int').tolist()
    df_raw = df_raw.filter( df_raw.i94cit.isin(country_code_list) )
    df_raw = df_raw.filter( df_raw.i94res.isin(country_code_list) )
    # filter only US international Airports
    airports_us_list = df_airports.I94_port_code.tolist()
    df_raw = df_raw.filter( df_raw.i94port.isin(airports_us_list) )
    # Conversion of SAS encoded dates(arrdate & depdate)
    df_raw = df_raw.withColumn("dt_arrival", udf_to_datetime_sas(df_raw.arrdate))
    df_raw = df_raw.withColumn("dt_departure", udf_to_datetime_sas(df_raw.depdate))
    # Departure date can't before Arrival date 
    df_raw = df_raw.filter(~(df_raw.dt_arrival > df_raw.dt_departure) | (df_raw.dt_departure.isNull()))
    # Adding month which is used when saving file in parquet format partioning by month & landing state
    df_raw = df_raw.withColumn("month", month("dt_arrival"))
    # dropping columns
    drop_cols = ['cicid', 'i94yr', 'i94mon', 'i94mode', 'i94visa', 'arrdate', 'depdate', 'count', 'dtadfile', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'dtaddto', 'insnum', 'admnum']
    df_raw = df_raw.drop(*drop_cols)
    # change column type to integer
    cols_2_integer = ['i94cit', 'i94res', 'i94bir', 'biryear']
    for col in cols_2_integer:
        df_raw = df_raw.na.fill(0, subset=[col])
        df_raw = df_raw.withColumn(col, df_raw[col].cast(IntegerType()))
    # Columns Rename
    df_raw = (df_raw
                .withColumnRenamed("i94cit",  "CoC")
                .withColumnRenamed("i94res", "CoR")
                .withColumnRenamed("i94port", "PoE")
                .withColumnRenamed("i94addr", "state_landing")
                .withColumnRenamed("i94bir", "age")
                .withColumnRenamed("biryear", "year_birth")
                .withColumnRenamed("airline", "airline_used")
                .withColumnRenamed("fltno", "num_flight"))
    return df_raw

In [4]:
for count,immi_file in enumerate(immi_files):
    print(f'--- Processing: {count+1} | {len(immi_files)} ---> {immi_file}')
    raw_df = spark.read.format('com.github.saurfang.sas.spark').load(immi_file)
    df_processed = process_immi_data(spark, raw_df, df_visatype, df_country_code, df_airports)
    raw_shape = (raw_df.count(), len(raw_df.columns))
    processed_shape = (df_processed.count(), len(df_processed.columns))
    print(f'--- Number of rows --- RAW DATA -[> {raw_shape[0]} VS {processed_shape[0]} <]-- PROCESSED DATA --- filename: {immi_file}' )

--- Processing: 1 | 12 ---> ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
--- Number of rows --- RAW DATA -[> 3096313 VS 1835551 <]-- PROCESSED DATA --- filename: ../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat
--- Processing: 2 | 12 ---> ../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat
--- Number of rows --- RAW DATA -[> 3733786 VS 2569913 <]-- PROCESSED DATA --- filename: ../../data/18-83510-I94-Data-2016/i94_sep16_sub.sas7bdat
--- Processing: 3 | 12 ---> ../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat
--- Number of rows --- RAW DATA -[> 2914926 VS 1806168 <]-- PROCESSED DATA --- filename: ../../data/18-83510-I94-Data-2016/i94_nov16_sub.sas7bdat
--- Processing: 4 | 12 ---> ../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat
--- Number of rows --- RAW DATA -[> 3157072 VS 1927600 <]-- PROCESSED DATA --- filename: ../../data/18-83510-I94-Data-2016/i94_mar16_sub.sas7bdat
--- Processing: 5 | 12 ---> ../../data/18-83510-I94-Data-2016/i94_jun16_sub.

#### Results
After processing data and extracting only what is valuable for a particular usecase it can be stated that:
* amount of data decreased significantly (on average each dataset was cut in half)
* only keeping what is necessary increased the visibility for further analysis
* transforming data is essential, as high compression formats keep data in a sort of a coded form (example: SAS date)