In [1]:
# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
# http://spark.apache.org/docs/1.2.1/configuration.html
from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit
from pyspark.sql import Row
# Increased memory
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.config("spark.python.worker.memory", "15g") \
.enableHiveSupport().getOrCreate()
df=spark.read.parquet("sas_data")

In [2]:
# function to output the file as parquet
output_path = "output_data/"
def write_to_parquet(table, file_name):
    """
    Write the table as parquet file
    
    Parameters:
        table
        Name of the file
    Returns:
        Outputs the table as parquet file to a folder
    """
    # write artists table to parquet files
    file_output = output_path + file_name
    table.write.mode("overwrite").parquet(file_output)
    
# https://stackoverflow.com/questions/39758045/how-to-perform-union-on-two-dataframes-with-different-amounts-of-columns-in-spar
def customUnion(df1, df2):
    """
    Combines two dataframes 
    
    Parameters:
         dataframe1
         dataframe2
     
     Returns:
         A combined dataframe
    """
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended

In [3]:
print("Immigration_table schema:")
df.printSchema()

Immigration_table schema:
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- a

### Spark has imported the values as double. Changed to integer

In [4]:
df = df \
.withColumn("cicid", col("cicid").cast("integer")) \
.withColumn("year", col("i94yr").cast("integer")) \
.drop("i94yr") \
.withColumn("month", col("i94mon").cast("integer")) \
.drop("i94mon") \
.withColumn("bornCountry", col("i94cit").cast("integer")) \
.drop("i94cit") \
.withColumn("residentCountry", col("i94res").cast("integer")) \
.drop("i94res") \
.withColumnRenamed("i94port", "arrivalPort") \
.withColumn("mode", col("i94mode").cast("integer")) \
.drop("i94mode") \
.withColumnRenamed("i94addr", "arrivalAddress") \
.withColumn("age", col("i94bir").cast("integer")) \
.drop("i94bir") \
.withColumn("visa", col("i94visa").cast("integer")) \
.drop("i94visa") \
.withColumnRenamed("entdepa", "arrivalFlag") \
.withColumnRenamed("entdepd", "departureFlag") \
.withColumnRenamed("entdepu", "updateFlag") \
.withColumnRenamed("matflag", "matchFlag") \
.withColumn("birthYear", col("biryear").cast("integer")) \
.drop("biryear") \
.withColumnRenamed("fltno", "flightNumber") \
.withColumnRenamed("visaType", "visaType") \
.withColumn("sasDate", to_date(lit("01/01/1960"), "MM/dd/yyyy")) \
.withColumn("arrivalDate", expr("date_add(sasDate, arrdate)")) \
.withColumn("departureDate", expr("date_add(sasDate, depdate)")) \
.drop("sasDate", "arrdate", "depdate", "count", "admnum", "dtadfile", "visapost", "occup", "dtaddto", "insnum")


In [5]:
print("New Immigration data schema:")
df.printSchema()

New Immigration data schema:
root
 |-- cicid: integer (nullable = true)
 |-- arrivalPort: string (nullable = true)
 |-- arrivalAddress: string (nullable = true)
 |-- arrivalFlag: string (nullable = true)
 |-- departureFlag: string (nullable = true)
 |-- updateFlag: string (nullable = true)
 |-- matchFlag: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- flightNumber: string (nullable = true)
 |-- visaType: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- bornCountry: integer (nullable = true)
 |-- residentCountry: integer (nullable = true)
 |-- mode: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- visa: integer (nullable = true)
 |-- birthYear: integer (nullable = true)
 |-- arrivalDate: date (nullable = true)
 |-- departureDate: date (nullable = true)



### Select where age is not negative, cicid is available and drop duplicates

In [6]:
df = df.where((col("age") >= 0) & (col("cicid").isNotNull())) \
     .dropDuplicates(['cicid'])

In [7]:
df.show(2)

+-----+-----------+--------------+-----------+-------------+----------+---------+------+-------+------------+--------+----+-----+-----------+---------------+----+---+----+---------+-----------+-------------+
|cicid|arrivalPort|arrivalAddress|arrivalFlag|departureFlag|updateFlag|matchFlag|gender|airline|flightNumber|visaType|year|month|bornCountry|residentCountry|mode|age|visa|birthYear|arrivalDate|departureDate|
+-----+-----------+--------------+-----------+-------------+----------+---------+------+-------+------------+--------+----+-----+-----------+---------------+----+---+----+---------+-----------+-------------+
|  148|        NEW|            NY|          G|            O|      null|        M|     F|     OS|       00089|      WT|2016|    4|        103|            103|   1| 21|   2|     1995| 2016-04-01|   2016-04-08|
|  463|        MIA|            FL|          O|            R|      null|        M|  null|     OS|       00097|      WT|2016|    4|        103|            103|   1| 25|  

In [8]:
print((df.count(), len(df.columns)))

(3095510, 21)


### Dimensions table

In [9]:
time_table = df.select(['arrivalDate'])\
                    .withColumnRenamed('arrivalDate','time') 

time_table = time_table \
             .withColumn('day', F.dayofmonth('time')) \
             .withColumn('month', F.month('time')) \
             .withColumn('year', F.year('time')) \
             .withColumn('week', F.weekofyear('time')) \
             .withColumn('weekday', F.dayofweek('time'))\
             .dropDuplicates()
write_to_parquet(time_table,"time_table.parquet" )

In [10]:
time_table.show(2)

+----------+---+-----+----+----+-------+
|      time|day|month|year|week|weekday|
+----------+---+-----+----+----+-------+
|2016-04-01|  1|    4|2016|  13|      6|
|2016-04-06|  6|    4|2016|  14|      4|
+----------+---+-----+----+----+-------+
only showing top 2 rows



In [11]:
print("Time data schema:")
time_table.printSchema()

Time data schema:
root
 |-- time: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [12]:
print((time_table.count(), len(time_table.columns)))

(30, 6)


#### Person

In [13]:
person_table = df.select(['birthYear','gender']) \
                 .dropDuplicates() \
                 .withColumn("personId", \
                        monotonically_increasing_id())
print((person_table.count(), len(person_table.columns)))

(475, 3)


In [14]:
write_to_parquet(person_table,"person_table.parquet" )

In [15]:
print("Person data schema:")
person_table.printSchema()

Person data schema:
root
 |-- birthYear: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- personId: long (nullable = false)



In [16]:
person_table.show(2)

+---------+------+--------+
|birthYear|gender|personId|
+---------+------+--------+
|     1959|  null|       0|
|     2004|     F|       1|
+---------+------+--------+
only showing top 2 rows



In [17]:
country_table = df.select('bornCountry').distinct().withColumnRenamed("bornCountry", "country") \
                  .withColumn("countryId", \
                        monotonically_increasing_id())
                 
print((country_table.count(), len(country_table.columns)))

(243, 2)


In [18]:
write_to_parquet(country_table,"country_table.parquet" )

In [19]:
print("Country data schema:")
country_table.printSchema()

Country data schema:
root
 |-- country: integer (nullable = true)
 |-- countryId: long (nullable = false)



In [20]:
country_table.show(2)

+-------+----------+
|country| countryId|
+-------+----------+
|    148|         0|
|    392|8589934592|
+-------+----------+
only showing top 2 rows



### Arrival Port

In [21]:
arrival_port_table = df.groupby('arrivalPort') \
                     .agg(F.countDistinct("arrivalAddress")).withColumnRenamed("count(DISTINCT arrivalAddress)","numArrivalAddress") \
                     .dropna() \
                     .dropDuplicates()\
                     .withColumn("arrivalPortId", \
                        monotonically_increasing_id())
print((arrival_port_table.count(), len(arrival_port_table.columns)))

(299, 3)


In [22]:
write_to_parquet(arrival_port_table,"arrival_port_table.parquet" )

In [23]:
arrival_port_table.show(2)

+-----------+-----------------+-------------+
|arrivalPort|numArrivalAddress|arrivalPortId|
+-----------+-----------------+-------------+
|        FMY|               74|            0|
|        BGM|               15|            1|
+-----------+-----------------+-------------+
only showing top 2 rows



In [24]:
print("Arrival Port data schema:")
arrival_port_table.printSchema()

Arrival Port data schema:
root
 |-- arrivalPort: string (nullable = true)
 |-- numArrivalAddress: long (nullable = false)
 |-- arrivalPortId: long (nullable = false)



#### Status Flag

In [25]:
status_table = df.select(['arrivalFlag','departureFlag', 'updateFlag','matchFlag']) \
                 .dropna() \
               .dropDuplicates() \
               .withColumn("statusFlagId", \
                        monotonically_increasing_id()) 
print((status_table.count(), len(status_table.columns))) 

(16, 5)


In [26]:
write_to_parquet(status_table,"status_table.parquet" )

In [27]:
status_table.show(2)

+-----------+-------------+----------+---------+------------+
|arrivalFlag|departureFlag|updateFlag|matchFlag|statusFlagId|
+-----------+-------------+----------+---------+------------+
|          H|            O|         U|        M|120259084288|
|          T|            O|         U|        M|146028888064|
+-----------+-------------+----------+---------+------------+
only showing top 2 rows



In [28]:
print("Status Flag data schema:")
status_table.printSchema()


Status Flag data schema:
root
 |-- arrivalFlag: string (nullable = true)
 |-- departureFlag: string (nullable = true)
 |-- updateFlag: string (nullable = true)
 |-- matchFlag: string (nullable = true)
 |-- statusFlagId: long (nullable = false)



### Airport

In [29]:
airport = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("airport-codes_csv.csv")

In [30]:
airport.show(2)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
only showing top 2 rows



#### Select only US, where iata_code is available and valid airport

In [31]:
airport =   airport \
            .where(
            (col("iso_country") == "US") & (col("iata_code").isNotNull()) & (col("type").isin("large_airport", "medium_airport", "small_airport"))) \
            .withColumn("isoRegion", substring(col("iso_region"), 4, 2)) \
            .drop("local_code", "elevation_ft", "iso_region", 'continent') \
            .dropDuplicates()

In [32]:
print("Airport data schema:")
airport.printSchema()

Airport data schema:
root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- isoRegion: string (nullable = true)



In [33]:
airport_table = airport.select(['ident', 'type', 'name', 'isoRegion', 'municipality','gps_code', 'iata_code','iso_country', 'coordinates']) \
               .dropDuplicates().dropna()
write_to_parquet(airport_table,"airport_table.parquet" )

In [34]:
airport_table.show(2)

+-----+--------------+--------------------+---------+------------+--------+---------+-----------+--------------------+
|ident|          type|                name|isoRegion|municipality|gps_code|iata_code|iso_country|         coordinates|
+-----+--------------+--------------------+---------+------------+--------+---------+-----------+--------------------+
| KBOI| large_airport|Boise Air Termina...|       ID|       Boise|    KBOI|      BOI|         US|   -116.223, 43.5644|
| KEVV|medium_airport|Evansville Region...|       IN|  Evansville|    KEVV|      EVV|         US|-87.5324020386, 3...|
+-----+--------------+--------------------+---------+------------+--------+---------+-----------+--------------------+
only showing top 2 rows



In [35]:
print("Airport data schema:")
airport_table.printSchema()

Airport data schema:
root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- isoRegion: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- coordinates: string (nullable = true)



### Demographics Dataset

In [36]:
dem = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")

In [37]:
print("Demographics data schema:")
dem.printSchema()

Demographics data schema:
root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



### Groupby and aggregrate columns

In [38]:
dem_table = dem \
.groupBy(col("State Code").alias("stateCode"), col("State").alias("state")).agg(
round(mean('Median Age'), 2).alias("medianAge"),\
sum("Total Population").alias("totalPopulation"),\
sum("Male Population").alias("malePopulation"), \
sum("Female Population").alias("femalePopulation"),\
sum("Number of Veterans").alias("numberOfVeterans"),\
sum("Foreign-born").alias("foreignBorn"), \
round(mean("Average Household Size"),2).alias("averageHouseholdSize")
).dropna()

In [39]:
dem_table.show(2)

+---------+--------------+---------+---------------+--------------+----------------+----------------+-----------+--------------------+
|stateCode|         state|medianAge|totalPopulation|malePopulation|femalePopulation|numberOfVeterans|foreignBorn|averageHouseholdSize|
+---------+--------------+---------+---------------+--------------+----------------+----------------+-----------+--------------------+
|       MT|       Montana|     35.5|       906470.0|      438535.0|        467935.0|         69270.0|    29885.0|                2.27|
|       NC|North Carolina|    33.79|    1.5300995E7|     7330525.0|       7970470.0|        830730.0|  1896635.0|                2.48|
+---------+--------------+---------+---------------+--------------+----------------+----------------+-----------+--------------------+
only showing top 2 rows



In [40]:
print("New Demographics data schema:")
dem_table.printSchema()

New Demographics data schema:
root
 |-- stateCode: string (nullable = true)
 |-- state: string (nullable = true)
 |-- medianAge: double (nullable = true)
 |-- totalPopulation: double (nullable = true)
 |-- malePopulation: double (nullable = true)
 |-- femalePopulation: double (nullable = true)
 |-- numberOfVeterans: double (nullable = true)
 |-- foreignBorn: double (nullable = true)
 |-- averageHouseholdSize: double (nullable = true)



In [41]:
write_to_parquet(dem_table,"dem_table.parquet" )

In [42]:
small_table = df.select(['cicid', 'arrivalDate','departureDate','mode','bornCountry', 'airline','flightNumber','visa','visaType',
                         'gender','arrivalPort', 'matchFlag']) \
                .dropna() \
                .dropDuplicates(['cicid'])
small_table = small_table.withColumnRenamed("arrivalDate", "time")                            

In [43]:
small_table = small_table.withColumn('stateCode', F.array( F.lit("MT"), F.lit("NY"),F.lit("SC"),).getItem(
    (F.rand()*3).cast("int"))) \
                     .withColumn("ident", F.array( F.lit("KBOI"), F.lit("KVEE"),F.lit("STOB"),).getItem(
    (F.rand()*3).cast("int"))) \
                .dropDuplicates()
small_table.show()

+-----+----------+-------------+----+-----------+-------+------------+----+--------+------+-----------+---------+---------+-----+
|cicid|      time|departureDate|mode|bornCountry|airline|flightNumber|visa|visaType|gender|arrivalPort|matchFlag|stateCode|ident|
+-----+----------+-------------+----+-----------+-------+------------+----+--------+------+-----------+---------+---------+-----+
|  148|2016-04-01|   2016-04-08|   1|        103|     OS|       00089|   2|      WT|     F|        NEW|        M|       MT| KBOI|
|  471|2016-04-01|   2016-04-03|   2|        103|    VES|       91285|   2|      WT|     M|        MIA|        M|       SC| STOB|
|  833|2016-04-01|   2016-04-09|   1|        104|     FI|       00631|   2|      WT|     F|        BOS|        M|       MT| KVEE|
| 1088|2016-04-01|   2016-04-08|   1|        104|     DL|       00149|   2|      WT|     F|        NEW|        M|       NY| KBOI|
| 1238|2016-04-01|   2016-04-09|   1|        104|     AF|       00006|   2|      WT|     M

In [44]:
print((small_table.count(), len(small_table.columns))) 

(2482042, 14)


In [45]:
small_table = small_table.join(status_table.select(['statusFlagId', 'matchFlag']), (small_table.matchFlag == status_table.matchFlag) , how = 'inner') \
                       .drop('matchFlag') \
                       .dropna() \
                       .dropDuplicates() 
small_table.show()

+-----+----------+-------------+----+-----------+-------+------------+----+--------+------+-----------+---------+-----+-------------+
|cicid|      time|departureDate|mode|bornCountry|airline|flightNumber|visa|visaType|gender|arrivalPort|stateCode|ident| statusFlagId|
+-----+----------+-------------+----+-----------+-------+------------+----+--------+------+-----------+---------+-----+-------------+
|  148|2016-04-01|   2016-04-08|   1|        103|     OS|       00089|   2|      WT|     F|        NEW|       MT| KBOI|1675037245440|
|  148|2016-04-01|   2016-04-08|   1|        103|     OS|       00089|   2|      WT|     F|        NEW|       MT| KBOI|1640677507072|
|  148|2016-04-01|   2016-04-08|   1|        103|     OS|       00089|   2|      WT|     F|        NEW|       MT| KBOI|1632087572480|
|  148|2016-04-01|   2016-04-08|   1|        103|     OS|       00089|   2|      WT|     F|        NEW|       MT| KBOI|1623497637888|
|  148|2016-04-01|   2016-04-08|   1|        103|     OS|     

In [46]:
#print((small_table.count(), len(small_table.columns)))

In [47]:
small_table = small_table.join(arrival_port_table.select(['arrivalPortId', 'arrivalPort']), (small_table.arrivalPort == arrival_port_table.arrivalPort) , how = 'inner') \
                       .drop('arrivalPort') \
                       .dropna() \
                       .dropDuplicates()
small_table.show(2)

+-----+----------+-------------+----+-----------+-------+------------+----+--------+------+---------+-----+-------------+-------------+
|cicid|      time|departureDate|mode|bornCountry|airline|flightNumber|visa|visaType|gender|stateCode|ident| statusFlagId|arrivalPortId|
+-----+----------+-------------+----+-----------+-------+------------+----+--------+------+---------+-----+-------------+-------------+
|  148|2016-04-01|   2016-04-08|   1|        103|     OS|       00089|   2|      WT|     F|       MT| KBOI|1675037245440| 747324309504|
|  148|2016-04-01|   2016-04-08|   1|        103|     OS|       00089|   2|      WT|     F|       MT| KBOI|1640677507072| 747324309504|
+-----+----------+-------------+----+-----------+-------+------------+----+--------+------+---------+-----+-------------+-------------+
only showing top 2 rows



In [48]:
small_table = small_table.join(country_table, (small_table.bornCountry == country_table.country) , how = 'inner') \
                         .dropna() \
                        .drop('bornCountry', 'country') \
                       .dropDuplicates()
small_table.show(2)

+-----+----------+-------------+----+-------+------------+----+--------+------+---------+-----+-------------+-------------+------------+
|cicid|      time|departureDate|mode|airline|flightNumber|visa|visaType|gender|stateCode|ident| statusFlagId|arrivalPortId|   countryId|
+-----+----------+-------------+----+-------+------------+----+--------+------+---------+-----+-------------+-------------+------------+
|  148|2016-04-01|   2016-04-08|   1|     OS|       00089|   2|      WT|     F|       MT| KBOI|1675037245440| 747324309504|197568495616|
|  148|2016-04-01|   2016-04-08|   1|     OS|       00089|   2|      WT|     F|       MT| KBOI|1640677507072| 747324309504|197568495616|
+-----+----------+-------------+----+-------+------------+----+--------+------+---------+-----+-------------+-------------+------------+
only showing top 2 rows



In [49]:
i94_table = small_table.join(person_table.select(['personId', 'gender']), (small_table.gender == person_table.gender) , how = 'inner') \
                      .drop('gender') \
                      .dropna() \
                      .dropDuplicates()

In [50]:
print("Fact Table schema:")
i94_table.printSchema()

Fact Table schema:
root
 |-- cicid: integer (nullable = true)
 |-- time: date (nullable = true)
 |-- departureDate: date (nullable = true)
 |-- mode: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- flightNumber: string (nullable = true)
 |-- visa: integer (nullable = true)
 |-- visaType: string (nullable = true)
 |-- stateCode: string (nullable = true)
 |-- ident: string (nullable = true)
 |-- statusFlagId: long (nullable = false)
 |-- arrivalPortId: long (nullable = false)
 |-- countryId: long (nullable = false)
 |-- personId: long (nullable = false)



In [51]:
i94_table.show()

+-----+----------+-------------+----+-------+------------+----+--------+---------+-----+-------------+-------------+-------------+-----------+
|cicid|      time|departureDate|mode|airline|flightNumber|visa|visaType|stateCode|ident| statusFlagId|arrivalPortId|    countryId|   personId|
+-----+----------+-------------+----+-------+------------+----+--------+---------+-----+-------------+-------------+-------------+-----------+
|69637|2016-04-01|   2016-04-02|   1|     MU|       00577|   2|      B2|       MT| STOB| 120259084288|            0|1133871366144|          1|
|69637|2016-04-01|   2016-04-02|   1|     MU|       00577|   2|      B2|       MT| STOB| 146028888064|            0|1133871366144|          1|
|69637|2016-04-01|   2016-04-02|   1|     MU|       00577|   2|      B2|       MT| STOB| 214748364800|            0|1133871366144|          1|
|69637|2016-04-01|   2016-04-02|   1|     MU|       00577|   2|      B2|       MT| STOB| 523986010112|            0|1133871366144|          1|

#### Partition by day

In [52]:
i94_table =i94_table \
        .withColumn("arrival_date-split", split(col("time"), "-")) \
        .withColumn("arrivalDay", col("arrival_date-split")[2]) \
        .drop("arrival_date-split")

In [53]:
i94_table.printSchema

<bound method DataFrame.printSchema of DataFrame[cicid: int, time: date, departureDate: date, mode: int, airline: string, flightNumber: string, visa: int, visaType: string, stateCode: string, ident: string, statusFlagId: bigint, arrivalPortId: bigint, countryId: bigint, personId: bigint, arrivalDay: string]>

In [None]:
i94_file_name = "i94_table.parquet"
file_output = output_path + i94_file_name
i94_table.write.mode("overwrite").partitionBy("arrivalDay")\
            .parquet(file_output)  