In [1]:
import re
import boto3

from pyspark.sql.types import *
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *
from datetime import datetime, timedelta

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
    .enableHiveSupport().getOrCreate()

In [3]:
bucket_name = 'yonglun-udacity-capstone'
s3_bucket_name = 's3://yonglun-udacity-capstone'

sas_description_filekey = 'raw/I94_SAS_Labels_Descriptions.SAS'
sas_description_filename = '/tmp/I94_SAS_Labels_Descriptions.SAS'

In [4]:
def sas_to_datetime(x):
    try:
        base_date = datetime(1960, 1, 1)
        return base_date + timedelta(days=int(x))
    except:
        return None
udf_sas_to_datetime = udf(lambda x: sas_to_datetime(x), DateType())

In [5]:
#Parse Data Labels
# S3 client
s3 = boto3.resource('s3',
                    region_name="us-west-2",
                    aws_access_key_id='<aw access key>',
                    aws_secret_access_key='<aws secret key>',
)

In [6]:
# Get Label Descriptions File
s3.Bucket(bucket_name).download_file(sas_description_filekey, sas_description_filename)

with open(sas_description_filename) as header_file:
    lines = header_file.readlines()

    # valid_city: Line 10 to 298
    # valid_city len: 289
    city_regex = re.compile(r'([0-9]+)(.*)(\'.*\')(\s\;)?')
    valid_city = {}
    for line in lines[9:298]:
        match_groups = city_regex.search(line)
        valid_city[int(match_groups.group(1))] = match_groups.group(3).strip('\'')

    # valid_port: Line 303 to 962
    # valid_port len: 660
    port_regex = re.compile(r'\'(.+)\'(.*=.*)\'(.+)\'')
    valid_port = {}
    for line in lines[302:962]:
        match_groups = port_regex.search(line)
        valid_port[match_groups.group(1)] = match_groups.group(3).strip()

    # valid_addr: line 982 to 1036
    # valid_addr len:
    addr_regex = re.compile(r'\'(.{2})\'(.*=.*)\'(.+)\'')
    valid_addr = {}
    for line in lines[981:1036]:
        match_groups = addr_regex.search(line)
        valid_addr[match_groups.group(1)] = match_groups.group(3).strip()

In [7]:
#from pprint import pprint
#print(len(valid_city))
#pprint(valid_city)

#print(len(valid_port))
#pprint(valid_port)

#print(len(valid_addr))
#pprint(valid_addr)

In [8]:
month_year = "apr16"
filepath = '../../data/18-83510-I94-Data-2016/i94_{}_sub.sas7bdat'.format(month_year)

In [9]:
# Load
raw_immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(filepath)

In [10]:
# Clean
cleaned_immigration_df = raw_immigration_df\
    .filter(raw_immigration_df.i94port.isin(list(valid_port.keys()))) \
    .filter(raw_immigration_df.i94addr.isNotNull() & raw_immigration_df.i94addr.isin(list(valid_addr.keys())))\
    .filter(raw_immigration_df.i94cit.isin(list(valid_city.keys()))) \
    .filter(raw_immigration_df.i94mode != 'NaN') \

In [11]:
# Transform
transformed_immigration_df = cleaned_immigration_df\
    .selectExpr(
        "cast(cicid as int) id",
        "cast(i94yr as int) year",
        "cast(i94mon as int) month",
        "cast(i94cit as int) AS city_code",
        "i94port as port_code",
        "i94addr as state_code",
        "i94mode as arrival_mode",
        "cast(i94bir as int) AS age",
        "gender",
        "cast(admnum as long) AS admission_no",
        "visatype",
        "arrdate",
        "depdate")\
    .withColumn("arrival_date", udf_sas_to_datetime("arrdate"))\
    .withColumn("departure_date", udf_sas_to_datetime("depdate"))

In [12]:
# Write
# transformed_immigration_df.write\
#     .partitionBy("year", "month")\
#     .mode("append")\
#     .parquet("{}/transformed/immigration/".format(s3_bucket_name))

transformed_immigration_df.show(10)

+---+----+-----+---------+---------+----------+------------+---+------+------------+--------+-------+-------+------------+--------------+
| id|year|month|city_code|port_code|state_code|arrival_mode|age|gender|admission_no|visatype|arrdate|depdate|arrival_date|departure_date|
+---+----+-----+---------+---------+----------+------------+---+------+------------+--------+-------+-------+------------+--------------+
| 15|2016|    4|      101|      WAS|        MI|         1.0| 55|     M|   666643185|      B2|20545.0|20691.0|  2016-04-01|    2016-08-25|
| 16|2016|    4|      101|      NYC|        MA|         1.0| 28|  null| 92468461330|      B2|20545.0|20567.0|  2016-04-01|    2016-04-23|
| 17|2016|    4|      101|      NYC|        MA|         1.0|  4|  null| 92468463130|      B2|20545.0|20567.0|  2016-04-01|    2016-04-23|
| 18|2016|    4|      101|      NYC|        MI|         1.0| 57|  null| 92471038030|      B1|20545.0|20555.0|  2016-04-01|    2016-04-11|
| 19|2016|    4|      101|      NY