# US Immigration
## 1. ETL Process

In [39]:
import re

from functools import reduce
from itertools import chain

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import first, count, when, col, create_map, lit, coalesce, year, month

In [40]:
spark = SparkSession \
    .builder \
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.0') \
    .config('spark.jars.packages','saurfang:spark-sas7bdat:2.0.0-s_2.11') \
    .getOrCreate()

Enter AWS credentials below and uncomment if data are to be stored in S3:

In [41]:
# spark.sparkContext \
#      ._jsc \
#      .hadoopConfiguration().set('fs.s3a.access.key', <enter access key>)

# spark.sparkContext \
#      ._jsc \
#      .hadoopConfiguration().set('fs.s3a.secret.key', <enter secret key>)
    
# spark.sparkContext \
#      ._jsc \
#      .hadoopConfiguration().set('fs.s3a.endpoint', 's3.amazonaws.com')

# spark.sparkContext \
#      ._jsc \
#      .hadoopConfiguration().set('mapreduce.fileoutputcommitter.algorithm.version', '2')

A function which creates a map between SAS labels and their values is defined.

In [42]:
def code_mapper(label):
    with open('./I94_SAS_Labels_Descriptions.SAS') as f:
        sas_labels_content = f.read()

    sas_labels_content = sas_labels_content.replace('\t', '')

    label_content = sas_labels_content[sas_labels_content.index(label):]
    label_content = label_content[:label_content.index(';')].split('\n')
    label_content = [i.replace("'", "") for i in label_content]

    label_dict = [i.split('=') for i in label_content[1:]]
    label_dict = dict([i[0].strip(), i[1].strip()] for i in label_dict if len(i) == 2)

    return label_dict

Maps are created for all SAS labels.

In [43]:
i94cit_res = code_mapper('i94cntyl')
i94port = code_mapper('i94prtl')
i94mode = code_mapper('i94model')
i94addr = code_mapper('i94addrl')
i94visa = {'1.0':'Business', '2.0': 'Pleasure', '3.0' : 'Student'}

A '.0' is added to cit and res labels to match the dataset 'i94cit' and 'i94res' columns.

Also, all invalid values are grouped together as 'INVALID ENTRY'.

In [44]:
i94cit_res = {f'{k}.0': (v if not re.match('^INVALID:|^Collapsed|^No Country Code', v) else 'INVALID ENTRY') 
                 for k, v in i94cit_res.items()}

A '.0' is added to mode labels to match the dataset 'i94mode' column.

In [45]:
i94mode = {f'{k}.0': v for k, v in i94mode.items()}

The states in map are formatted to comply with state names in other datasets.
- 'DIST. OF' is replaced with 'District of'
- 'S.', 'N.', 'W.' are replaced with 'South', 'North', 'West'
- all states are capitalized

In [46]:
def format_state(s):
    s = s.replace('DIST. OF', 'District of') \
         .replace('S.', 'South') \
         .replace('N.', 'North') \
         .replace('W.', 'West')
    return ' '.join([w.capitalize() if w != 'of' else w for w in s.split() ])

# format addr labels
i94addr = {k: format_state(v) for k, v in i94addr.items()}

If the port code is valid, the state is extracted and a new map, mapping codes to states is created.

All invalid codes and states are marked as 'INVALID ENTRY'.

In [47]:
i94port_state = {}

for k, v in i94port.items():
    if not re.match('^Collapsed|^No PORT Code', v):
        try:
            # extract state part from i94port
            # the state part contains the state and also other words
            state_part = v.rsplit(',', 1)[1]
            
            # create a set of all words in state part
            state_part_set = set(state_part.split())

            # if the state is valid (is in the set(i94addr.keys()), then retrieve state
            state = list(set(i94addr.keys()).intersection(state_part_set))[0]
            
            # add state to dict
            i94port_state[k] = state
            
        except IndexError:
            # no state is specified for Washington DC in labels so it is added here
            if v == 'WASHINGTON DC':
                i94port_state[k] = 'DC'
            else:
                i94port_state[k] = 'INVALID ENTRY'
    else:
        i94port_state[k] = 'INVALID ENTRY'

A separate map is created for port cities.

In [48]:
i94port_city = {}
for k, v in i94port.items():
    if i94port_state[k] == 'INVALID ENTRY':
        i94port_city[k] = 'INVALID ENTRY'
    else:
        # extract city part from i94port
        city_part = v.rsplit(',', 1)[0]
        
        # add city to dict
        i94port_city[k] = city_part

The data for each month are read from .sas7bdat files in Spark DataFrame.

In [49]:
# read US immigration dataset
# path = 'immigration_data_sample.csv'
# df = spark.read.csv(path, header=True)

# months = ['jan', 'feb']
months = ['jan', 'feb', 'mar', 'apr', 'may', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']

paths = [month.join(['datasets/i94_2016/i94_', '16_sub.sas7bdat']) for month in months]

df = spark.read.format('com.github.saurfang.sas.spark').load(paths[0])
for path in paths[1:]:
    df = df.union(spark.read.format('com.github.saurfang.sas.spark').load(path))

Columns that will not be used are dropped.

In [50]:
cols_to_drop = ['_c0', 'cicid', 'count', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'insnum']
df = df.drop(*cols_to_drop)

All invalid state codes are replaced with '99'

In [51]:
df = df.withColumn('i94addr', when(~df['i94addr'].isin(*(i94addr.keys())), '99').otherwise(df['i94addr']))

The maps created from SAS labels are used to replace codes in 'i94cit', 'i94res', 'i94port', 'i94mode', 'i94addr' and 'i94visa' columns with their values.

In [52]:
maps = [(i94cit_res, 'i94cit', 'i94cit'), (i94cit_res, 'i94res', 'i94res'),
        (i94port_state, 'i94port', 'i94port_state'), (i94port_city, 'i94port', 'i94port_city'),
        (i94mode, 'i94mode', 'i94mode'), (i94addr, 'i94addr', 'state'), (i94visa, 'i94visa', 'i94visa')]

for map_dic, from_col, col_name, in maps:
    mapping_expr = create_map([lit(x) for x in chain(*map_dic.items())])
    df = df.withColumn(col_name, mapping_expr.getItem(col(from_col)))

All data are cast from strings to their respective type and the following transformations take place:
- in gender column 'M' is replaced with 'Male' and 'F' with 'Female'
- if the transportation mode  is 'Land', 'Sea', 'Not reported' of NULL and flight number or airline is not NULL, it is supposed that transportation mode entry is wrong and should be changed to 'Air'

In [53]:
df.createOrReplaceTempView('immigration')

In [54]:
df = spark.sql("""
    SELECT CAST(i94yr AS INT) AS arrival_year,
           CAST(i94mon AS INT) AS arrival_month,
           DATE_ADD('1960-01-01', arrdate) AS arrival_date,
           DATE_ADD('1960-01-01', depdate) AS departure_date,

           i94port_city AS port_city,
           i94port_state AS port_state_code,
           
           i94cit AS origin_country,
           i94res AS residence_country,
           CAST(biryear AS INT) AS birth_year,
           CAST(i94bir AS INT) AS age,
           CASE
                WHEN gender = 'M' THEN 'Male'
                WHEN gender = 'F' THEN 'Female'
           END AS gender,
           
           CAST(admnum AS INT) AS admission_num,
           TO_DATE(dtadfile, 'yyyyMMdd') AS admission_date,
           TO_DATE(dtaddto, 'MMddyyyy') AS admitted_until,
           i94visa visa_category,
           visatype AS visa_type,
           
           state,
           i94addr AS state_code,
            
           CASE
                WHEN (i94mode = 'Land' AND ((fltno IS NOT NULL) OR (airline IS NOT NULL))) THEN 'Air'
                WHEN (i94mode = 'Sea' AND ((fltno IS NOT NULL) OR (airline IS NOT NULL))) THEN 'Air'
                WHEN (i94mode = 'Not reported' AND ((fltno IS NOT NULL) OR (airline IS NOT NULL))) THEN 'Air'
                WHEN (i94mode IS NULL AND ((fltno IS NOT NULL) OR (airline IS NOT NULL))) THEN 'Air'
                ELSE i94mode
           END AS transportation_mode,
           airline,
           fltno AS flight_num
      FROM immigration
""")

Remove duplicates.

In [55]:
df = df.distinct()

Save DataFrame as .parquet in s3a://us-immigration-project or locally.

In [18]:
# write to S3 bucket
# output_data = 's3a://us-immigration-project/'
# df.write.parquet(os.path.join(output_data, 'us_immigration'), 'overwrite', partitionBy=['arrival_month', 'port_state_code'])

# for local write
# df.write.parquet('./output/us_immigration', 'overwrite', partitionBy=['arrival_month', 'port_state_code'])

## 2. Dataset Info

DataFrame Schema:

In [19]:
df.printSchema()

root
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- port_city: string (nullable = true)
 |-- port_state_code: string (nullable = true)
 |-- origin_country: string (nullable = true)
 |-- residence_country: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- admission_num: integer (nullable = true)
 |-- admission_date: date (nullable = true)
 |-- admitted_until: date (nullable = true)
 |-- visa_category: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- transportation_mode: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight_num: string (nullable = true)



Sample DataFrame records:

In [20]:
df.show(n=2, truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------
 arrival_year        | 2016                                                      
 arrival_month       | 1                                                         
 arrival_date        | 2016-01-08                                                
 departure_date      | null                                                      
 port_city           | TUCSON                                                    
 port_state_code     | AZ                                                        
 origin_country      | null                                                      
 residence_country   | SOUTH KOREA                                               
 birth_year          | 1989                                                      
 age                 | 27                                                        
 gender              | Female                                                    
 admission_num  

DataFrame rows and columns:

In [21]:
print('rows: {}'.format(df.count()), 'columns: {}'.format(len(df.columns)))

rows: 35058752 columns: 21


## 3. Data Quality Checks

Check for nulls in each column:

In [22]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show(truncate=False, vertical=True)

-RECORD 0----------------------
 arrival_year        | 0       
 arrival_month       | 0       
 arrival_date        | 0       
 departure_date      | 2806865 
 port_city           | 1       
 port_state_code     | 1       
 origin_country      | 4169024 
 residence_country   | 0       
 birth_year          | 7616    
 age                 | 7616    
 gender              | 3357601 
 admission_num       | 0       
 admission_date      | 131015  
 admitted_until      | 1464128 
 visa_category       | 0       
 visa_type           | 0       
 state               | 2064975 
 state_code          | 1708388 
 transportation_mode | 12603   
 airline             | 1092315 
 flight_num          | 239614  



Check if admission numbers are unique:

In [23]:
df.distinct().count()

35058752

In [24]:
df.count() - df.select('admission_num').distinct().count()

34457281

In [25]:
df.select('admission_num').distinct().count()

601471

It turns out that they are not. Most of them are reused.

Display the record where the port city and state are NULL:

In [26]:
df.filter(reduce(lambda x, y: x & y, (col(c).isNull() for c in ['port_city', 'port_state_code']))).show(truncate=False, vertical=True)

-RECORD 0-------------------------
 arrival_year        | 2016       
 arrival_month       | 8          
 arrival_date        | 2016-08-16 
 departure_date      | null       
 port_city           | null       
 port_state_code     | null       
 origin_country      | ITALY      
 residence_country   | ITALY      
 birth_year          | 1966       
 age                 | 50         
 gender              | Male       
 admission_num       | 2147483647 
 admission_date      | 2016-08-16 
 admitted_until      | 2016-11-13 
 visa_category       | Pleasure   
 visa_type           | WT         
 state               | New York   
 state_code          | NY         
 transportation_mode | Land       
 airline             | null       
 flight_num          | null       



This is still a valid record despite the missing port city and state so it is not discarded.

Check if records with transportation mode 'Land' or 'Sea' also have flight number or airline:

In [27]:
df.filter(df.transportation_mode.isin('Land', 'Sea')).count()

153540

In [28]:
df.filter((df.transportation_mode.isin('Land', 'Sea')) & ((df.flight_num.isNotNull()) | (df.airline.isNotNull()))).count()

0

Check if records with transportation mode 'Not reported' or NULL also have flight number or airline:

In [29]:
df.filter(df.transportation_mode == 'Not reported').count()

56771

In [30]:
df.filter(((df.transportation_mode == 'Not reported') | (df.transportation_mode.isNull())) & ((df.flight_num.isNotNull()) | (df.airline.isNotNull()))).count()

0

Check if the year of arrival is different than '2016':

In [31]:
df.filter(df.arrival_year != 2016).count()

0

Check if a month is not valid:

In [32]:
df.filter((df.arrival_month <= 1) & (df.arrival_month >= 12)).count()

0

Check that the arrival date and year, month columns match:

In [33]:
df.filter(df.arrival_year != year(df.arrival_date)).count()

0

In [34]:
df.filter(df.arrival_month != month(df.arrival_date)).count()

0

## 4. Foreign Keys & Columns Related to Other Datasets

Total port cities in DataFrame:

In [35]:
df.select('port_city', 'port_state_code').distinct().orderBy('port_state_code').count()

341

Display all port cities and the state they belong:

In [36]:
df.select('port_city', 'port_state_code').distinct().orderBy('port_state_code').show(341, truncate=False)

+----------------------------+---------------+
|port_city                   |port_state_code|
+----------------------------+---------------+
|null                        |null           |
|ANCHORAGE                   |AK             |
|KETCHIKAN                   |AK             |
|JUNEAU                      |AK             |
|FAIRBANKS                   |AK             |
|TOKEEN                      |AK             |
|SKAGWAY                     |AK             |
|POKER CREEK                 |AK             |
|ALCAN                       |AK             |
|HOMER                       |AK             |
|DALTONS CACHE               |AK             |
|BIRMINGHAM                  |AL             |
|MADISON COUNTY - HUNTSVILLE |AL             |
|MOBILE                      |AL             |
|ROGERS ARPT                 |AR             |
|LUKEVILLE                   |AZ             |
|TUCSON                      |AZ             |
|NACO                        |AZ             |
|PORTAL      

Total states in DataFrame:

In [56]:
df.select('state', 'state_code').distinct().orderBy('state').count()

56

Display all states and state codes:

In [57]:
df.select('state', 'state_code').distinct().orderBy('state').show(56, truncate=False)

+--------------------+----------+
|state               |state_code|
+--------------------+----------+
|null                |null      |
|Alabama             |AL        |
|Alaska              |AK        |
|All Other Codes     |99        |
|Arizona             |AZ        |
|Arkansas            |AR        |
|California          |CA        |
|Colorado            |CO        |
|Connecticut         |CT        |
|Delaware            |DE        |
|District of Columbia|DC        |
|Florida             |FL        |
|Georgia             |GA        |
|Guam                |GU        |
|Hawaii              |HI        |
|Idaho               |ID        |
|Illinois            |IL        |
|Indiana             |IN        |
|Iowa                |IA        |
|Kansas              |KS        |
|Kentucky            |KY        |
|Louisiana           |LA        |
|Maine               |ME        |
|Maryland            |MD        |
|Massachusetts       |MA        |
|Michigan            |MI        |
|Minnesota    