# Create the fact table

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, FloatType, TimestampType, LongType, DateType, NullType
from pyspark.sql.functions import udf, lit, when, col
import datetime as dt

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [21]:
df_idf=spark.read.parquet("sas_data")

In [22]:
df_idf.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [23]:
df_addr = pd.read_csv("dimensions/us_states.csv")
df_port_locations = pd.read_csv("dimensions/us_ports.csv")
df_cit_res = pd.read_csv("dimensions/countries.csv")

In [24]:
# Remove invalid port_codes
temp = df_port_locations.port_code.tolist()
df_idf_filtered = df_idf.filter( df_idf.i94port.isin(temp) )

In [25]:
# Remove rows having invalid CoC & CoR
temp = df_cit_res.country_code.astype('int').tolist()
df_idf_filtered = df_idf_filtered.filter( df_idf_filtered.i94cit.isin(temp) )
df_idf_filtered = df_idf_filtered.filter( df_idf_filtered.i94res.isin(temp) )

In [26]:
# Remove invalid i94addr
temp = df_addr.state_code.tolist()
df_idf_filtered = df_idf_filtered.filter( df_idf_filtered.i94addr.isin(temp) )

In [27]:
# Convert floats to ints
cols_to_convert_float_to_integer = ['i94cit', 'i94res', 'arrdate', 'i94mode', 'depdate', 'i94bir'
                                , 'i94visa', 'biryear', 'admnum', 'cicid']
for colu in cols_to_convert_float_to_integer:    
    df_idf_filtered = df_idf_filtered.na.fill(0, subset=[colu])
    df_idf_filtered = df_idf_filtered.withColumn(colu, df_idf_filtered[colu].cast(IntegerType()))

In [28]:
# Dropping unused columns
keep_columns = set(['i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate'
                , 'i94bir', 'i94visa', 'dtadfile', 'visapost', 'occup', 'biryear'
                , 'dtaddto', 'gender', 'airline', 'admnum', 'fltno', 'visatype', 'cicid'])
all_colls = set(df_idf_filtered.columns)
drop_cols = list(all_colls.difference(keep_columns))
print(drop_cols)

['i94mon', 'entdepd', 'insnum', 'entdepu', 'matflag', 'entdepa', 'count', 'i94yr']


In [29]:
df_idf_filtered = df_idf_filtered.drop(*drop_cols)

In [30]:
df_idf_filtered = (df_idf_filtered
            .withColumnRenamed("i94bir",  "age")
            .withColumnRenamed("i94cit", "coc")
            .withColumnRenamed("i94res", "cor")
            .withColumnRenamed("i94port", "port_code")
            .withColumnRenamed("i94addr", "landing_state")
            .withColumnRenamed("visapost", "visa_issued_in")
            .withColumnRenamed("cicid", "id"))

In [31]:
def cdf_mdY_to_mmddYYYY(x):
    try:
        return dt.datetime.strptime(x, '%m%d%Y')
    except:
        return None
    
def cdf_Ymd_to_mmddYYYY(x):
    try:
        return dt.datetime.strptime(x, '%Y%m%d')
    except:
        return None

def to_datetime(x):
    try:
        start = dt.datetime(1960, 1, 1).date()
        return start + dt.timedelta(days=int(x))
    except:
        return None

udf_to_datetime_sas = udf(lambda x: to_datetime(x), DateType())
udf_cdf_Ymd_to_mmddYYYY = udf(lambda x: cdf_Ymd_to_mmddYYYY(x), DateType())
udf_cdf_mdY_to_mmddYYYY = udf(lambda x: cdf_mdY_to_mmddYYYY(x), DateType())

In [32]:
# Conversion dates

df_idf_filtered = df_idf_filtered.withColumn("arrival_dt", udf_to_datetime_sas(df_idf_filtered.arrdate))
df_idf_filtered = df_idf_filtered.withColumn("arrival_dt",when(col("arrival_dt")=="1960-01-01",lit(None)).otherwise(col("arrival_dt")))

df_idf_filtered = df_idf_filtered.withColumn("departure_dt", udf_to_datetime_sas(df_idf_filtered.depdate))
df_idf_filtered = df_idf_filtered.withColumn("departure_dt",when(col("departure_dt")=="1960-01-01",lit(None)).otherwise(col("departure_dt")))

# Departure date can't before Arrival date 
df_idf_filtered = df_idf_filtered.filter(~(df_idf_filtered.arrival_dt > df_idf_filtered.departure_dt) | (df_idf_filtered.departure_dt.isNull()))

df_idf_filtered = df_idf_filtered.withColumn("added_to_i94", udf_cdf_Ymd_to_mmddYYYY(df_idf_filtered.dtadfile))
df_idf_filtered = df_idf_filtered.withColumn("allowed_until", udf_cdf_mdY_to_mmddYYYY(df_idf_filtered.dtaddto))

drop_cols = ['arrdate', 'depdate', 'dtadfile', 'dtaddto']
df_idf_filtered = df_idf_filtered.drop(*drop_cols)

In [33]:
#temp = {'1' : 'Air', '2' : 'Sea', '3' : 'Land', '9' : 'Not reported'}
temp = [["1", "Air"], ["2", "Sea"], ["3","Land"], ["9", "Not reported"]]
i94mode = spark.sparkContext.parallelize(temp).toDF(["code", "arrival_mode"])
df_idf_filtered = df_idf_filtered.join(i94mode, df_idf_filtered.i94mode == i94mode.code).select(df_idf_filtered["*"], i94mode["arrival_mode"])

temp = [["1", "Business"], ["2", "Pleasure"], ["3", "Student"]]
i94visa = spark.sparkContext.parallelize(temp).toDF(["code", "visit_purpose"])
df_idf_filtered = df_idf_filtered.join(i94visa, df_idf_filtered.i94visa == i94visa.code).select(df_idf_filtered["*"], i94visa["visit_purpose"])

drop_cols = ['i94mode', 'i94visa']
df_idf_filtered = df_idf_filtered.drop(*drop_cols)

In [34]:
df_idf_filtered.printSchema()

root
 |-- id: integer (nullable = true)
 |-- coc: integer (nullable = true)
 |-- cor: integer (nullable = true)
 |-- port_code: string (nullable = true)
 |-- landing_state: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- visa_issued_in: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: integer (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- arrival_dt: date (nullable = true)
 |-- departure_dt: date (nullable = true)
 |-- added_to_i94: date (nullable = true)
 |-- allowed_until: date (nullable = true)
 |-- arrival_mode: string (nullable = true)
 |-- visit_purpose: string (nullable = true)



In [35]:
df_idf_filtered.show(5)

+-------+---+---+---------+-------------+---+--------------+-----+-------+------+-------+----------+-----+--------+----------+------------+------------+-------------+------------+-------------+
|     id|coc|cor|port_code|landing_state|age|visa_issued_in|occup|biryear|gender|airline|    admnum|fltno|visatype|arrival_dt|departure_dt|added_to_i94|allowed_until|arrival_mode|visit_purpose|
+-------+---+---+---------+-------------+---+--------------+-----+-------+------+-------+----------+-----+--------+----------+------------+------------+-------------+------------+-------------+
|5748517|245|438|      LOS|           CA| 40|           SYD| null|   1976|     F|     QF|2147483647|00011|      B1|2016-04-30|  2016-05-08|  2016-04-30|   2016-10-29|         Air|     Business|
|5748518|245|438|      LOS|           NV| 32|           SYD| null|   1984|     F|     VA|2147483647|00007|      B1|2016-04-30|  2016-05-17|  2016-04-30|   2016-10-29|         Air|     Business|
|5748519|245|438|      LOS|   

In [36]:
df_idf_shape = (df_idf_filtered.count(), len(df_idf_filtered.columns))
print(df_idf_shape)

(2215678, 20)


In [37]:
df_idf_filtered.write.partitionBy('landing_state').mode('overwrite').format("csv").option("header","true").save('fact/')