In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import boto3
import configparser
import os
from pyspark.sql.types import LongType, IntegerType, FloatType
from pyspark.sql.functions import udf, col

### Initial EDA

### Immigration Dataset

In [2]:
# Read in the data here
fname = 'data/i94_apr16_sub.sas7bdat'
i94_df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1", chunksize=1000)

In [3]:
i94_df = next(i94_df)

In [4]:
i94_df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [5]:
i94_df.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [6]:
# Drop columns with little analytic value
staging_i94_df = i94_df.drop(['cicid', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'insnum'], axis=1)
# Data types
staging_i94_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 16 columns):
i94yr       1000 non-null float64
i94mon      1000 non-null float64
i94cit      1000 non-null float64
i94res      1000 non-null float64
i94port     1000 non-null object
arrdate     1000 non-null float64
i94mode     999 non-null float64
i94addr     972 non-null object
depdate     958 non-null float64
i94bir      1000 non-null float64
i94visa     1000 non-null float64
gender      810 non-null object
airline     998 non-null object
admnum      1000 non-null float64
fltno       999 non-null object
visatype    1000 non-null object
dtypes: float64(10), object(6)
memory usage: 125.1+ KB


In [7]:
# Drop missing data across all columns
staging_i94_df.dropna(inplace=True)
print(f'{staging_i94_df.shape[0]} rows in staging table, {round(staging_i94_df.shape[0]/i94_df.shape[0]*100, 1)} % of original rows extracted')

749 rows in staging table, 74.9 % of original rows extracted


### City demographics Dataset

In [8]:
fname = 'data/us-cities-demographics.csv'
cities_df = pd.read_csv(fname, delimiter=';')

In [9]:
cities_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [10]:
staging_cities_df = cities_df.dropna()

In [11]:
# Drop missing data across all columns
print(f'{staging_cities_df.shape[0]} rows in staging table, {round(staging_cities_df.shape[0]/cities_df.shape[0]*100, 1)} % of original rows extracted')

2875 rows in staging table, 99.4 % of original rows extracted


### Data Wrangling and staging to AWS S3 with Spark

In [12]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
    .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
    .enableHiveSupport() \
    .getOrCreate()

In [14]:
# Some useful UDFs to convert codes to text and timestamp to MM/DD/YYYY 
udf_to_datetime = udf(lambda ts: datetime.strftime(datetime(1960, 1, 1) + timedelta(days=int(ts)), '%m/%d/%Y'))
visa_codes = { 1 : 'business', 2 : 'pleasure', 3 : 'student' }
arrival_mode_codes = { 1 : 'air', 2 : 'sea' , 3 : 'land', 9 : 'not reported' }
udf_vida_code_to_text = udf(lambda code: visa_codes[code])
udf_arrival_mode_to_text = udf(lambda mode: arrival_mode_codes[mode])

In [15]:
# df_spark =spark.read.format('com.github.saurfang.sas.spark').load('data/i94_apr16_sub.sas7bdat')
visits_df = spark.read.parquet("sas_data")\
        .drop('cicid', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'dtaddto', 'insnum', 'airline')\
        .dropna()\
        .selectExpr("cast(i94yr as int) as year", "cast(i94mon as int) as month", 
                    "cast(i94cit as int) as citizenship", "cast(i94res as int) as residency", 
                    "i94port as airport", "i94mode", 
                    "i94addr as state_code", "cast(i94bir as int) as age",
                    "gender", "cast(admnum as int) as visitor_id", "visatype as visa_type",
                    "arrdate", "depdate", "visatype", "i94visa")\
        .withColumn("arrival_date", udf_to_datetime("arrdate"))\
        .withColumn("departure_date", udf_to_datetime("depdate"))\
        .withColumn("visa_category", udf_vida_code_to_text("i94visa"))\
        .withColumn("arrival_mode", udf_arrival_mode_to_text("i94mode"))\
        .drop("arrdate", "depdate", "i94visa", "visatype", "i94mode")\
        .dropna()

In [16]:
visits_df.head()

Row(year=2016, month=4, citizenship=245, residency=438, airport='LOS', state_code='CA', age=40, gender='F', visitor_id=2147483647, visa_type='B1', arrival_date='04/30/2016', departure_date='05/08/2016', visa_category='business', arrival_mode='air')

In [17]:
visits_df.write.mode('overwrite').csv("s3a://data-eng-capstone-lugomes/staging_visits")

In [18]:
cities_df = spark.read.format('csv').load('data/us-cities-demographics.csv', header=True, sep=';')\
        .dropna()\
        .withColumnRenamed("City", "city")\
        .withColumnRenamed("State", "state_name")\
        .withColumn("median_age", col("Median Age").cast(LongType()))\
        .withColumn("male_pop", col("Male Population").cast(LongType()))\
        .withColumn("female_pop", col("Female Population").cast(LongType()))\
        .withColumn("total_pop", col("Total Population").cast(LongType()))\
        .withColumn("veteran_pop", col("Number of Veterans").cast(LongType()))\
        .withColumn("foreign_born_pop", col("Foreign-born").cast(LongType()))\
        .withColumnRenamed("Average Household Size", "avg_house_size")\
        .withColumnRenamed("State Code", "state_code")\
        .withColumnRenamed("Race","race")\
        .withColumn("race_count",col("Count").cast(LongType()))\
        .drop('Count', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', "Average Household Size", "Foreign-born")

In [19]:
cities_df.head()

Row(city='Silver Spring', state_name='Maryland', avg_house_size='2.6', state_code='MD', race='Hispanic or Latino', median_age=33, male_pop=40601, female_pop=41862, total_pop=82463, veteran_pop=1562, foreign_born_pop=30908, race_count=25924)

In [20]:
cities_df.write.mode('overwrite').csv("s3a://data-eng-capstone-lugomes/staging_cities")

In [24]:
airports_df = spark.read.format('csv').load('data/airport-codes.csv', header=True, sep=',')\
        .selectExpr("name as airport_name", "municipality as city", "type", "ident as airport_code")\
        .where("iso_country == 'US'")\
        .dropna()

In [25]:
airports_df.head()

Row(airport_name='Total Rf Heliport', city='Bensalem', type='heliport', airport_code='00A')

In [26]:
airports_df.write.mode('overwrite').csv("s3a://data-eng-capstone-lugomes/staging_airports")