In [1]:
import configparser
import os
from os import listdir
from os.path import isfile, join
import logging
import pandas as pd
from datetime import timedelta, datetime
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id, lit, substring
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

In [2]:
# logging setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [3]:
# configuration
config = configparser.ConfigParser()
config.read('dl.cfg', encoding='utf-8-sig')

['dl.cfg']

In [4]:
os.environ['AWS_ACCESS_KEY_ID']=config['S3']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['S3']['AWS_SECRET_ACCESS_KEY']

In [5]:
def create_spark_session():
    spark = SparkSession.builder.\
    config("spark.jars.repositories", "https://repos.spark-packages.org/").\
    config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
    enableHiveSupport().getOrCreate()
    return spark

In [6]:
spark = create_spark_session()

In [7]:
input_data = ""
sas_input_dir  = '../../data/18-83510-I94-Data-2016/'
sas_input_file = 'i94_apr16_sub.sas7bdat'
output_data = "sas_data2/"

In [8]:
print('Apache Spark Version :'+spark.version)
print('Apache Spark Version :'+spark.sparkContext.version)

Apache Spark Version :2.4.3
Apache Spark Version :2.4.3


In [9]:
def load_demo(spark, input_data):
    
    
    
    # start logging message
    logging.info("Started loading demographics data")
    
    # load the file
    f_demo = os.path.join(input_data + 'us-cities-demographics.csv')
    df = spark.read.format('csv').options(header=True, delimiter=';').load(f_demo)
    
    # Get only selected columns and de-dupe them
    df_demo = df.select("City", "Median Age", "Male Population", "Female Population", "Total Population", \
                        "Number of Veterans", "Foreign-born", "Average Household Size", "State Code")\
                .drop_duplicates()
    
    # end logging message and return the dataframe
    logging.info("Completed loading US Cities data")
    return df_demo

In [13]:
df_demo = load_demo(spark, input_data)
df_demo.show()

INFO:root:Started loading demographics data
INFO:root:Completed loading US Cities data


+----------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|            City|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|
+----------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+
|      Providence|      29.9|          89090|            90114|          179204|              4933|       53532|                  2.72|        RI|
|      Lewisville|      31.6|          52776|            52032|          104808|              4211|       24865|                  2.78|        TX|
|          Layton|      29.5|          37748|            36394|           74142|              3811|        4268|                  3.24|        UT|
|        San Juan|      41.4|         155408|           186829|          342237|              null|        null|      

In [14]:
def load_us_cities(spark, input_data):
    # start logging message
    logging.info("Started loading US Cities data")
    
    # load file
    f_uscit = os.path.join(input_data + 'uscities.csv')
    df = spark.read.format('csv').options(header=True).load(f_uscit)
    
    # select only the relevant columns
    df_cit = df.select("id", "city", "state_id", "state_name", "lat","lng")\
                .drop_duplicates()
    
    # end logging and return dataframe
    logging.info("Completed loading US Cities data")
    return df_cit

In [15]:
df_cit = load_us_cities(spark, input_data)
df_cit.show()

INFO:root:Started loading US Cities data
INFO:root:Completed loading US Cities data


+----------+------------------+--------+--------------+-------+---------+
|        id|              city|state_id|    state_name|    lat|      lng|
+----------+------------------+--------+--------------+-------+---------+
|1840020578|  Huntington Beach|      CA|    California|33.6960|-118.0018|
|1840008192|            Joliet|      IL|      Illinois|41.5188| -88.1499|
|1840019612|          Beaumont|      TX|         Texas|30.0849| -94.1451|
|1840011319|        Schaumburg|      IL|      Illinois|42.0308| -88.0838|
|1840015633|          Marietta|      GA|       Georgia|33.9533| -84.5422|
|1840020295|            Newark|      CA|    California|37.5201|-122.0307|
|1840021532|          Danville|      CA|    California|37.8121|-121.9698|
|1840020173|    Pleasant Grove|      UT|          Utah|40.3716|-111.7412|
|1840024507|      South Riding|      VA|      Virginia|38.9120| -77.5132|
|1840021561|        Washington|      UT|          Utah|37.1303|-113.4878|
|1840008132|        Oak Forest|      I

In [16]:
def load_airport(spark, input_data):
    # start logging message
    logging.info("Started loading airport codes data")

    # load file
    f_air = os.path.join(input_data + 'airport-codes_csv.csv')
    df = spark.read.format('csv').options(header=True).load(f_air)

    # transform and de-dupe data
    df_air = df.where(df['iso_country']=="US")\
            .select("municipality", "iata_code", "local_code", "iso_region") \
            .withColumn("country", substring("iso_region",1,2))\
            .withColumn("state_code", substring("iso_region",4,2)) \
            .na.drop(subset=["iata_code","local_code"]) \
            .drop_duplicates()
    df_air = df_air.select("municipality", "iata_code", "local_code", "state_code")

    # end logging message and return data frame
    logging.info("Completed loading airport codes data")
    return df_air

In [None]:
df_airport = load_airport(spark, input_data)
df_airport.show()

In [11]:
def process_city_demo_airport(df_airport, df_cit, df_demo):

    # Join city and airport data
    logging.info("Joining airport codes and city data")
    df_ap_cit = df_airport.join(df_cit, (df_airport.municipality==df_cit.city) \
                           & (df_airport.state_code==df_cit.state_id))\
            .select("id", "city", "iata_code", "state_id", "state_name", "lat", "lng" )

    # Join demographics data to it
    logging.info("Joining demographics data")
    df_ap_cit_demo = df_ap_cit.join(df_demo, (df_ap_cit.city==df_demo.City) \
                        & (df_ap_cit.state_id==df_demo["State Code"]))\
                    .select("id", df_ap_cit["city"], "iata_code", "state_id", "state_name", "lat", "lng"\
                        ,col("Median Age").alias("median_age")\
                        ,col("Male Population").alias("male_population")\
                        ,col("Female Population").alias("female_population")\
                        ,col("Total Population").alias("total_population")\
                        ,col("Number of Veterans").alias("no_of_veterans")\
                        ,col("Foreign-born").alias("foreign_born")\
                        ,col("Average Household Size").alias("avg_household_size"))
    
    # write out the data
    df_ap_cit_demo.write.parquet(os.path.join(output_data, "cit_demo_air/"), mode="overwrite")
    logging.info("Processed city, demographics and airport data")

In [None]:
process_city_demo_airport(df_airport, df_cit, df_demo)

In [19]:
df_cit_dem_air=spark.read.parquet("sas_data2/cit_demo_air")
df_cit_dem_air.show()

+----------+------------+---------+--------+--------------------+-------+---------+----------+---------------+-----------------+----------------+--------------+------------+------------------+
|        id|        city|iata_code|state_id|          state_name|    lat|      lng|median_age|male_population|female_population|total_population|no_of_veterans|foreign_born|avg_household_size|
+----------+------------+---------+--------+--------------------+-------+---------+----------+---------------+-----------------+----------------+--------------+------------+------------------+
|1840000494|     Chicago|      ORD|      IL|            Illinois|41.8375| -87.6866|      34.2|        1320015|          1400541|         2720556|         72042|      573463|              2.53|
|1840000494|     Chicago|      MDW|      IL|            Illinois|41.8375| -87.6866|      34.2|        1320015|          1400541|         2720556|         72042|      573463|              2.53|
|1840004773|    Hartford|      HFD|

In [141]:
# test code
df2 = df_n \
    .groupby(['id','city', 'state_code']) \
    .count() \
    .where('count > 1') \
    .sort('count', ascending=False) 

df2.show()
    

+----------+------------+----------+-----+
|        id|        city|state_code|count|
+----------+------------+----------+-----+
|1840020925|     Houston|        TX|    9|
|1840034016|    New York|        NY|    6|
|1840015031|Jacksonville|        FL|    5|
|1840015149|       Miami|        FL|    5|
|1840021990|   San Diego|        CA|    5|
|1840006060|  Washington|        DC|    4|
|1840020696|  Fort Worth|        TX|    4|
|1840022220| San Antonio|        TX|    4|
|1840034249|      Dayton|        OH|    4|
|1840020364|   Las Vegas|        NV|    4|
|1840021491|  Sacramento|        CA|    4|
|1840001686|     Wichita|        KS|    4|
|1840015982|       Tampa|        FL|    4|
|1840015099|     Orlando|        FL|    4|
|1840019941|    Portland|        OR|    3|
|1840003971|     Detroit|        MI|    3|
|1840019440|      Dallas|        TX|    3|
|1840021117|     Seattle|        WA|    3|
|1840000596|   Cleveland|        OH|    3|
|1840000673|Philadelphia|        PA|    3|
+----------

In [143]:
# test code
df2.groupby(['id']) \
    .count() \
    .where('count > 1') \
    .sort('count', ascending=False).show()

+---+-----+
| id|count|
+---+-----+
+---+-----+



In [11]:
# test code
from os import listdir
from os.path import isfile, join
mypath = '../../data/18-83510-I94-Data-2016/'
onlyfiles = [f for f in listdir(mypath) if isfile(join(mypath, f))]
tf = 'i94_jan16_sub.sas7bdat'
j=0
for i in onlyfiles:
    j+=1 
    if i!=tf and j<3:
        print(i)

i94_apr16_sub.sas7bdat
i94_sep16_sub.sas7bdat


In [None]:
# test code
sas_input_data_dir  = '../../data/18-83510-I94-Data-2016/'
immi_data1 = os.path.join(sas_input_data_dir + 'i94_apr16_sub.sas7bdat')
immi_data2 = os.path.join(sas_input_data_dir + 'i94_mar16_sub.sas7bdat')
#     spark.read.format("com.github.saurfang.sas.spark").load("/users/shobhana/sas_files", pathGlobFilter="*.sas7bdat")
df1 = spark.read.format("com.github.saurfang.sas.spark").load(immi_data1)
df2 = spark.read.format("com.github.saurfang.sas.spark").load(immi_data2)
df3 = df1.union(df2)
df3.show()

In [12]:
def load_immi(spark, sas_file):
    logging.info("Started loading immigration data")
    df = spark.read.format('com.github.saurfang.sas.spark')\
            .load(sas_file)
    
    # function to convert sas date
    date_format = "%Y-%m-%d"
    convert_sas_udf = udf(lambda x: x if x is None else (timedelta(days=x) + datetime(1960, 1, 1)).strftime(date_format))

    # process immigration data
    df_immi = df.select("cicid", "arrdate", "i94yr", "i94mon", "i94visa", "i94port", "airline", "fltno", "visatype")\
                .na.drop(subset=["arrdate", "i94yr", "i94mon", "i94visa", "i94port", "airline", "fltno", "visatype"]) \
                .withColumn("cicid", col("cicid").cast("int")) \
                .withColumn("i94mon", col("i94mon").cast("int")) \
                .withColumn("i94yr", col("i94yr").cast("int")) \
                .withColumn("i94visa", col("i94visa").cast("int")) \
                .withColumn("arrdate", convert_sas_udf("arrdate")) \
                .withColumn("mon", col("i94mon").cast("int")) \
                .withColumn("yr", col("i94yr").cast("int")) \
                .drop_duplicates()
    
    logging.info("Finished loading immigration data")
    return df_immi
    

In [26]:
# test code
sas_input_file = 'i94_dec16_sub.sas7bdat'
sas_file = join(sas_input_dir, sas_input_file)
sas_file
df_immi = load_immi(spark, sas_file)
df_immi.show()


INFO:root:Started loading immigration data
INFO:root:Finished loading immigration data


+-----+----------+-----+------+-------+-------+-------+-----+--------+---+----+
|cicid|   arrdate|i94yr|i94mon|i94visa|i94port|airline|fltno|visatype|mon|  yr|
+-----+----------+-----+------+-------+-------+-------+-----+--------+---+----+
|  778|2016-12-01| 2016|    12|      2|    XXX|     TK|00009|      B2| 12|2016|
| 1585|2016-12-01| 2016|    12|      2|    NYC|     WS| 1216|      WT| 12|2016|
| 1630|2016-12-01| 2016|    12|      2|    AGA|     UA|00136|     GMT| 12|2016|
| 1839|2016-12-01| 2016|    12|      2|    LOS|     LA|00600|      WT| 12|2016|
| 2003|2016-12-01| 2016|    12|      2|    AGA|     UA|00827|     GMT| 12|2016|
| 2166|2016-12-01| 2016|    12|      2|    AGA|     UA|00827|     GMT| 12|2016|
| 2243|2016-12-01| 2016|    12|      2|    AGA|     UA|00827|     GMT| 12|2016|
| 4865|2016-12-01| 2016|    12|      2|    AGA|     7C|03154|     GMT| 12|2016|
| 5388|2016-12-01| 2016|    12|      2|    AGA|     UA|00178|     GMT| 12|2016|
| 5586|2016-12-01| 2016|    12|      2| 

In [34]:
onlyfiles = [f for f in listdir(sas_input_dir) if isfile(join(sas_input_dir, f))]
df_immi = None
for i in onlyfiles:
    logging.info("Started processing immigration file {}".format(i))
    if df_immi is None:
        print(i)
#         df_nnn = load_demo(spark, input_data)
    else:
        print(i)

INFO:root:Started processing immigration file i94_apr16_sub.sas7bdat
INFO:root:Started processing immigration file i94_sep16_sub.sas7bdat
INFO:root:Started processing immigration file i94_nov16_sub.sas7bdat
INFO:root:Started processing immigration file i94_mar16_sub.sas7bdat
INFO:root:Started processing immigration file i94_jun16_sub.sas7bdat
INFO:root:Started processing immigration file i94_aug16_sub.sas7bdat
INFO:root:Started processing immigration file i94_may16_sub.sas7bdat
INFO:root:Started processing immigration file i94_jan16_sub.sas7bdat
INFO:root:Started processing immigration file i94_oct16_sub.sas7bdat
INFO:root:Started processing immigration file i94_jul16_sub.sas7bdat
INFO:root:Started processing immigration file i94_feb16_sub.sas7bdat
INFO:root:Started processing immigration file i94_dec16_sub.sas7bdat


i94_apr16_sub.sas7bdat
i94_sep16_sub.sas7bdat
i94_nov16_sub.sas7bdat
i94_mar16_sub.sas7bdat
i94_jun16_sub.sas7bdat
i94_aug16_sub.sas7bdat
i94_may16_sub.sas7bdat
i94_jan16_sub.sas7bdat
i94_oct16_sub.sas7bdat
i94_jul16_sub.sas7bdat
i94_feb16_sub.sas7bdat
i94_dec16_sub.sas7bdat


In [20]:
def process_immi(spark, sas_input_dir, output_data): 

    # process all files
    df_immi = None
    df_t = None
    onlyfiles = [f for f in listdir(sas_input_dir) if isfile(join(sas_input_dir, f))]
    for i in onlyfiles:
        logging.info("Started processing immigration file {}".format(i))
        sas_file = join(sas_input_dir, i)
        if df_immi is None:
            df_immi = load_immi(spark, sas_file)
        else:
            df_t = load_immi(spark, sas_file)
            df_immi = df_immi.union(df_t)
        logging.info("Finished processing immigration file {}".format(i))
    
    logging.info("Completed processing immigration data")
    logging.info("Writing out immigration data")
    
    # write immi data 
    df_immi.write.parquet(os.path.join(output_data, "immigration/"), mode="overwrite", partitionBy=["yr","mon"])
    logging.info("Writing immigration data completed")
    
    
    logging.info("Processing time data")
    # process time data
    time_table = df_immi.withColumn("day",dayofmonth("arrdate"))\
                    .withColumn("week",weekofyear("arrdate"))\
                    .withColumn("month",month("arrdate"))\
                    .withColumn("year",year("arrdate"))\
                    .withColumn("weekday",dayofweek("arrdate")) \
                    .select("arrdate", "day", "week", "month", "year", "weekday").drop_duplicates()
    
    # write time data 
    time_table.write.parquet(os.path.join(output_data, "time/"), mode="overwrite")
    logging.info("Time data processed")
    
    logging.info("Processing flights data")
    
    # process flights data
    flights = df_immi.select("airline", "fltno").drop_duplicates()
                
    # write flights data 
    flights.write.parquet(os.path.join(output_data, "flights/"), mode="overwrite")
    logging.info("Flights data processed")
    
    logging.info("Processing visa data")
    
    # process visa data
    visas = df_immi.select("i94visa", "visatype").drop_duplicates()
                    
    # write visa data 
    visas.write.parquet(os.path.join(output_data, "visas/"), mode="overwrite")
    logging.info("Visa data processed")

In [21]:
process_immi(spark, sas_input_dir, output_data)

INFO:root:Started processing immigration file i94_apr16_sub.sas7bdat
INFO:root:Started loading immigration data
INFO:root:Finished loading immigration data
INFO:root:Finished processing immigration file i94_apr16_sub.sas7bdat
INFO:root:Started processing immigration file i94_sep16_sub.sas7bdat
INFO:root:Started loading immigration data
INFO:root:Finished loading immigration data
INFO:root:Finished processing immigration file i94_sep16_sub.sas7bdat
INFO:root:Started processing immigration file i94_nov16_sub.sas7bdat
INFO:root:Started loading immigration data
INFO:root:Finished loading immigration data
INFO:root:Finished processing immigration file i94_nov16_sub.sas7bdat
INFO:root:Started processing immigration file i94_mar16_sub.sas7bdat
INFO:root:Started loading immigration data
INFO:root:Finished loading immigration data
INFO:root:Finished processing immigration file i94_mar16_sub.sas7bdat
INFO:root:Started processing immigration file i94_jun16_sub.sas7bdat
INFO:root:Started loading i

In [24]:
time_table = spark.read.parquet("sas_data2/time")                 
time_table.show()

+----------+---+----+-----+----+-------+
|   arrdate|day|week|month|year|weekday|
+----------+---+----+-----+----+-------+
|2016-09-07|  7|  36|    9|2016|      4|
|2016-09-19| 19|  38|    9|2016|      2|
|2016-09-21| 21|  38|    9|2016|      4|
|2016-01-21| 21|   3|    1|2016|      5|
|2016-02-13| 13|   6|    2|2016|      7|
|2016-02-11| 11|   6|    2|2016|      5|
|2016-11-12| 12|  45|   11|2016|      7|
|2016-03-20| 20|  11|    3|2016|      1|
|2016-05-05|  5|  18|    5|2016|      5|
|2016-05-31| 31|  22|    5|2016|      3|
|2016-07-21| 21|  29|    7|2016|      5|
|2016-07-28| 28|  30|    7|2016|      5|
|2016-04-02|  2|  13|    4|2016|      7|
|2016-08-09|  9|  32|    8|2016|      3|
|2016-08-16| 16|  33|    8|2016|      3|
|2016-08-17| 17|  33|    8|2016|      4|
|2016-01-07|  7|   1|    1|2016|      5|
|2016-10-20| 20|  42|   10|2016|      5|
|2016-03-18| 18|  11|    3|2016|      6|
|2016-06-15| 15|  24|    6|2016|      4|
+----------+---+----+-----+----+-------+
only showing top

In [23]:
flights = spark.read.parquet("sas_data2/flights")
flights.show()


+-------+-----+
|airline|fltno|
+-------+-----+
|     AF|00077|
|     BA|00271|
|     UA|00008|
|     AA|01489|
|     OB|00766|
|     DL|00469|
|     UA|00037|
|     NH|00172|
|     2D|00412|
|     KE|   62|
|     DL|00749|
|     AC|00757|
|     AV|00042|
|     WS|01118|
|     UA|05547|
|     UA|01691|
|     UA|03928|
|     AA|01739|
|     LH|  464|
|     AA| 1526|
+-------+-----+
only showing top 20 rows



In [22]:
visa = spark.read.parquet("sas_data2/visas")
visa.show()

+-------+--------+
|i94visa|visatype|
+-------+--------+
|      2|      CP|
|      2|     SBP|
|      1|     GMB|
|      2|     CPL|
|      2|     GMT|
|      3|      F2|
|      1|      E2|
|      3|      M1|
|      1|      B1|
|      1|      I1|
|      1|      WB|
|      2|      B2|
|      3|      F1|
|      1|      E1|
|      2|      WT|
|      3|      M2|
|      1|       I|
+-------+--------+



In [12]:
time = spark.read.parquet("sas_data2/time")
time.show()

+----------+---+----+-----+----+-------+
|   arrdate|day|week|month|year|weekday|
+----------+---+----+-----+----+-------+
|2016-04-10| 10|  14|    4|2016|      1|
|2016-04-24| 24|  16|    4|2016|      1|
|2016-04-11| 11|  15|    4|2016|      2|
|2016-04-25| 25|  17|    4|2016|      2|
|2016-04-15| 15|  15|    4|2016|      6|
|2016-04-21| 21|  16|    4|2016|      5|
|2016-04-03|  3|  13|    4|2016|      1|
|2016-04-04|  4|  14|    4|2016|      2|
|2016-04-14| 14|  15|    4|2016|      5|
|2016-04-26| 26|  17|    4|2016|      3|
|2016-04-13| 13|  15|    4|2016|      4|
|2016-04-09|  9|  14|    4|2016|      7|
|2016-04-02|  2|  13|    4|2016|      7|
|2016-04-16| 16|  15|    4|2016|      7|
|2016-04-06|  6|  14|    4|2016|      4|
|2016-04-08|  8|  14|    4|2016|      6|
|2016-04-23| 23|  16|    4|2016|      7|
|2016-04-30| 30|  17|    4|2016|      7|
|2016-04-01|  1|  13|    4|2016|      6|
|2016-04-29| 29|  17|    4|2016|      6|
+----------+---+----+-----+----+-------+
only showing top

In [13]:
df_imm = spark.read.parquet("sas_data2/immigration")
df_imm.show()

+-----+----------+-----+------+-------+-------+-------+-----+--------+----+---+
|cicid|   arrdate|i94yr|i94mon|i94visa|i94port|airline|fltno|visatype|  yr|mon|
+-----+----------+-----+------+-------+-------+-------+-----+--------+----+---+
|  225|2016-04-01| 2016|     4|      2|    NYC|     OS|00087|      WT|2016|  4|
|  598|2016-04-01| 2016|     4|      2|    NAS|     UP|00221|      WT|2016|  4|
|  604|2016-04-01| 2016|     4|      1|    TOR|     AA|01259|      WB|2016|  4|
| 1053|2016-04-01| 2016|     4|      2|    NEW|     DL|00021|      WT|2016|  4|
| 1164|2016-04-01| 2016|     4|      2|    WAS|     UA|00947|      WT|2016|  4|
| 1391|2016-04-01| 2016|     4|      2|    NYC|     SN|01401|      WT|2016|  4|
| 1405|2016-04-01| 2016|     4|      2|    NYC|     AF|00012|      WT|2016|  4|
| 1463|2016-04-01| 2016|     4|      2|    NYC|     DL|00049|      WT|2016|  4|
| 1695|2016-04-01| 2016|     4|      2|    MIA|     UX|00097|      WT|2016|  4|
| 2222|2016-04-01| 2016|     4|      2| 