# Immigration Data Warehouse
### Data Engineering Capstone Project
## 2 Mapping & Implementing the Data Pipelines
This notebook implements the ETL data pipelines for the immigration data warehouse. The pipelines are based on the data model identified in `explore_data.ipynb` and described in `data_dictionary.md`. ETL plan:

1. Fact: Immigration Data
1. Dimension: Date
1. Dimension: Countries
1. Dimensions: States & State Race Counts
1. Dimension: Airports
1. Data Quality Checks

Project write-up past the data pipelines is completed in `README.md`.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DateType
import pyspark.sql.functions as f
import helper as h

In [2]:
spark = SparkSession.builder \
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.5') \
        .getOrCreate()
output_folder = 'output_data/'

### 2.1 Fact: Immigration Data

This data comes from the [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html). A data dictionary is included in the workspace.

In [3]:
i94data = spark.read.parquet('i94_data')

In [4]:
i94data = i94data \
    .withColumnRenamed('cicid',    'i94_id') \
    .withColumnRenamed('arrdate',  'arrive_date') \
    .withColumnRenamed('i94yr',    'arrive_year') \
    .withColumnRenamed('i94mon',   'arrive_month') \
    .withColumnRenamed('i94port',  'arrive_port') \
    .withColumnRenamed('i94mode',  'arrive_by') \
    .withColumnRenamed('airline',  'arrive_airline') \
    .withColumnRenamed('fltno',    'arrive_flight') \
    .withColumnRenamed('entdepa',  'arrive_flag') \
    .withColumnRenamed('i94addr',  'arrive_to_state') \
    .withColumnRenamed('i94cit',   'pers_country_birth') \
    .withColumnRenamed('i94res',   'pers_country_resid') \
    .withColumnRenamed('biryear',  'pers_birth_year') \
    .withColumnRenamed('i94bir',   'pers_age') \
    .withColumnRenamed('occup',    'pers_occupation') \
    .withColumnRenamed('gender',   'pers_gender') \
    .withColumnRenamed('insnum',   'pers_ins_number')  \
    .withColumnRenamed('i94visa',  'visa') \
    .withColumnRenamed('visapost', 'visa_issued') \
    .withColumnRenamed('visatype', 'visa_type') \
    .withColumnRenamed('dtaddto',  'allow_stay_until') \
    .withColumnRenamed('entdepd',  'depart_flag') \
    .withColumnRenamed('depdate',  'depart_date') \
    .withColumnRenamed('count',    'cnt') \
    .withColumnRenamed('dtadfile', 'char_date') \
    .withColumnRenamed('entdepu',  'update_flag') \
    .withColumnRenamed('matflag',  'match_flag') \
    .withColumnRenamed('admnum',   'admission_number')

Drop columns

In [5]:
i94data = i94data.drop('cnt', 'pers_ins_number', 'pers_occupation', 'update_flag', 'char_date')

Type conversions

In [7]:
i94data = i94data \
    .withColumn('i94_id',             i94data.i94_id.cast(IntegerType())) \
    .withColumn('arrive_date',        h.udf_date(i94data.arrive_date)) \
    .withColumn('arrive_month',       i94data.arrive_month.cast(IntegerType())) \
    .withColumn('arrive_year',        i94data.arrive_year.cast(IntegerType())) \
    .withColumn('pers_age',           i94data.pers_age.cast(IntegerType())) \
    .withColumn('pers_birth_year',    i94data.pers_birth_year.cast(IntegerType())) \
    .withColumn('pers_country_birth', i94data.pers_country_birth.cast(IntegerType())) \
    .withColumn('pers_country_resid', i94data.pers_country_resid.cast(IntegerType())) \
    .withColumn('depart_date',        h.udf_date(i94data.depart_date)) \
    .withColumn('arrive_by',          h.udf_arrive_by(i94data.arrive_by)) \
    .withColumn('visa',               h.udf_visa(i94data.visa))

Write

In [8]:
i94data.write.parquet(output_folder + 'i94data',
                      partitionBy = ['arrive_year', 'arrive_month'],
                      mode = 'overwrite')

### 2.2 Dimension: Date
Separate a `date` dimension based on `arrive_date` *and* `depart_date` and keep columns `arrive_year` and `arrive_month` for partitioning.

In [18]:
date = i94data \
    .selectExpr('arrive_date as date').distinct() \
    .union(i94data.selectExpr('depart_date as date').distinct()) \
    .distinct() \
    .where('date is not null') \
    .withColumn('year',    f.year('date')) \
    .withColumn('month',   f.month('date')) \
    .withColumn('week',    f.weekofyear('date')) \
    .withColumn('day',     f.dayofmonth('date')) \
    .withColumn('weekday', f.dayofweek('date'))

Write

In [19]:
date.write.parquet(output_folder + 'date',
                   partitionBy = ['year', 'month'],
                   mode = 'overwrite')

### 2.3 Dimension: Countries
Having integer ids for countries is not sufficient, so I made `I94Countries.csv` based on labels descriptions.

In [9]:
countries = spark.read.csv('dimension_data/I94Countries.csv',
                           sep = ';',
                           inferSchema = True, header = True)

Strip quotes from names

In [10]:
countries = countries.withColumn('name', h.udf_strip_quotes('name'))

Nonexistent countries from the fact data:

In [12]:
ne_countries = i94data \
    .join(countries,
          (i94data.pers_country_birth == countries.id),
          how = 'left_anti') \
    .groupBy('pers_country_birth') \
    .count().sort(f.desc('count'))

In [13]:
print(f'{countries.count()} countries original, append with')
print(f'{ne_countries.count()} countries nonexistent:')

countries = countries \
    .union(ne_countries \
           .withColumnRenamed('pers_country_birth', 'id') \
           .withColumn('name', h.udf_name_ne('id')) \
           .drop('count'))

print(f'{countries.count()} countries total.')

289 countries original, append with
30 countries nonexistent:
319 countries total.


Write

In [14]:
countries.write.parquet(output_folder + 'countries', mode = 'overwrite')

### 2.4 Dimensions: States & State Race Counts
This data comes [from OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

In [20]:
citydem = spark.read.csv('dimension_data/us-cities-demographics.csv',
                         sep = ';',
                         inferSchema = True, header = True)

In the target DWH model, demographics will be normalized by splitting races and race counts into a separate table.

Let's rename columns to remove spaces:

In [21]:
citydem = citydem \
    .withColumnRenamed('City',                   'city') \
    .withColumnRenamed('State',                  'state') \
    .withColumnRenamed('Median Age',             'age_median') \
    .withColumnRenamed('Male Population',        'pop_male') \
    .withColumnRenamed('Female Population',      'pop_female') \
    .withColumnRenamed('Total Population',       'pop_total') \
    .withColumnRenamed('Number of Veterans',     'pop_veteran') \
    .withColumnRenamed('Foreign-born',           'pop_foreign_born') \
    .withColumnRenamed('Average Household Size', 'avg_household_size') \
    .withColumnRenamed('State Code',             'state_code') \
    .withColumnRenamed('Race',                   'race') \
    .withColumnRenamed('Count',                  'race_count')

#### 2.4.1 States
In the fact table, the arrival destination information is limited to state, see `arrive_to_state`. Demographics on the state level rather than the city level would be sufficient in this case. First, let's aggregate based on state without race:

In [22]:
states = citydem \
    .groupBy('state', 'state_code') \
    .agg(f.sum('pop_male').alias('pop_male'),
         f.sum('pop_female').alias('pop_female'),
         f.sum('pop_total').alias('pop_total'),
         f.sum('pop_veteran').alias('pop_veteran'),
         f.sum('pop_foreign_born').alias('pop_foreign_born'))

Write

In [23]:
states.write.parquet(output_folder + 'states', mode = 'overwrite')

#### 2.4.2 State Race Counts
We have the state name also here to skip join with `states` when the race counts are used.

In [24]:
state_race_counts = citydem \
    .groupBy('state', 'state_code', 'race') \
    .agg(f.sum('race_count').alias('race_count'))

Write

In [25]:
state_race_counts.write.parquet(output_folder + 'state_race_counts', mode = 'overwrite')

### 2.5 Dimension: Airports
This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

In [27]:
airports = spark.read.csv('dimension_data/airport-codes.csv',
                          sep = ',',
                          inferSchema = True, header = True)            

Renaming and value conversions

In [28]:
airports = airports \
    .where('iso_country = "US"') \
    .withColumnRenamed('ident',        'id') \
    .withColumn(       'state_code',   h.udf_state('iso_region')) \
    .withColumnRenamed('municipality', 'city') \
    .withColumn(       'lat',          h.udf_lat('coordinates')) \
    .withColumn(       'long',         h.udf_long('coordinates')) \
    .drop('continent', 'iso_country', 'iata_code', 'coordinates', 'iso_region')   

Write

In [29]:
airports.write.parquet(output_folder + 'airports', mode = 'overwrite')

### 2.6 Data Quality Checks
#### 2.6.1 Fact: Immigration

Check missing and duplicate IDs

In [31]:
i94data.groupBy('i94_id').count().where('count > 1').show(10)

+------+-----+
|i94_id|count|
+------+-----+
+------+-----+



In [32]:
i94data.where('i94_id is null').count()

0

There are no missing or duplicate rows by `i94_id`

Check date boundaries

In [33]:
i94data.select(f.min('arrive_date'), f.max('arrive_date')).show()

+----------------+----------------+
|min(arrive_date)|max(arrive_date)|
+----------------+----------------+
|      2016-04-01|      2016-04-30|
+----------------+----------------+



In [34]:
i94data.select(f.min('depart_date'), f.max('depart_date')).show()

+----------------+----------------+
|min(depart_date)|max(depart_date)|
+----------------+----------------+
|      2001-07-20|      2084-05-16|
+----------------+----------------+



#### 2.6.2 Dimension: Date
Check duplicates:

In [35]:
date.groupBy('date').count().where('count > 1').show(10)

+----+-----+
|date|count|
+----+-----+
+----+-----+



Check boundaries, mind the depart date is included

In [36]:
date.select(f.min('date'), f.max('date')).show()

+----------+----------+
| min(date)| max(date)|
+----------+----------+
|2001-07-20|2084-05-16|
+----------+----------+



#### 2.6.3 Dimension: Countries
Do we have any missing country ids in the immigration data?

In [38]:
i94data.join(countries,
              (i94data.pers_country_resid == countries.id),
              how = 'left_anti') \
        .groupBy('pers_country_resid') \
        .count().sort(f.desc('count')).show()

+------------------+-----+
|pers_country_resid|count|
+------------------+-----+
+------------------+-----+



None for the country of residence.

In [39]:
i94data.join(countries,
              (i94data.pers_country_birth == countries.id),
              how = 'left_anti') \
        .groupBy('pers_country_birth') \
        .count().sort(f.desc('count')).show()

+------------------+-----+
|pers_country_birth|count|
+------------------+-----+
+------------------+-----+



None for the country of birth.

#### 2.6.4 Dimension: States
Check missing and duplicate IDs

In [40]:
states.groupBy('state', 'state_code').count().where('count > 1').show(10)

+-----+----------+-----+
|state|state_code|count|
+-----+----------+-----+
+-----+----------+-----+



In [41]:
states.where('state is null or state_code is null').count()

0

There are no missing or duplicate rows

#### 2.6.5 Dimension: State Race Counts
Check missing and duplicate IDs

In [42]:
state_race_counts.groupBy('state', 'state_code', 'race').count().where('count > 1').show(10)

+-----+----------+----+-----+
|state|state_code|race|count|
+-----+----------+----+-----+
+-----+----------+----+-----+



In [43]:
state_race_counts.where('state is null or state_code is null or race is null').count()

0

There are no missing or duplicate rows

#### 2.6.6 Dimension: Airports
Limiting the immigration data to arrivals by air, we have this many unique arrival ports:

In [45]:
arrive_ports = i94data \
    .where('arrive_by = "Air"') \
    .select('arrive_port').distinct()

Evaluate match to the airport local code

In [46]:
arrive_ports_m = arrive_ports \
    .join(airports.where('local_code is not null'),
          (arrive_ports.arrive_port == airports.local_code),
          how = 'left_semi').distinct()

apc = arrive_ports.count()
apc_m = arrive_ports_m.count()

print(f'{apc_m} out of {apc} arrive|air ports ({(100 * apc_m / apc):.2f}%) match airports by the local code.')

109 out of 181 arrive|air ports (60.22%) match airports by the local code.


Project write-up past the data pipelines is completed in `README.md`.