In [1]:
import pandas as pd
import os, re
import configparser
from datetime import timedelta, datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, lower, isnull, year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, to_date
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType, LongType

In [2]:
# The AWS key id and password are configured in a configuration file "dl.cfg"
config = configparser.ConfigParser()
config.read('dl.cfg')
# Reads and saves the AWS access key information and saves them in a environment variable
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
# OUTPUT = config['ETL']['OUTPUT_DATA']

In [3]:
# spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

In [4]:
spark = SparkSession.builder.config("spark.jars.packages",
                                        "saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0")\
    .enableHiveSupport().getOrCreate()

In [5]:
# Read US Cities Demo dataset file
demographics=spark.read.csv("us-cities-demographics.csv", sep=';', header=True)

In [6]:
# Verifying Total Number of Records
demographics.count()

2891

In [7]:
# Print Schema to verify that all the columns are in "string" format
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [8]:
def cast_type(df, cols):
    """
    Convert the types of the columns according to the configuration supplied in the cols dictionary in the format {"column_name": type}
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        cols (:obj:`dict`): Dictionary in the format of {"column_name": type} indicating what columns and types they should be converted to
    """
    for k,v in cols.items():
        if k in df.columns:
            df = df.withColumn(k, df[k].cast(v))
    return df

In [9]:
# Convert numeric columns to the proper types: Integer and Double
int_cols = ['Count', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born']
float_cols = ['Median Age', 'Average Household Size']
demographics = cast_type(demographics, dict(zip(int_cols, len(int_cols)*[IntegerType()])))
demographics = cast_type(demographics, dict(zip(float_cols, len(float_cols)*[DoubleType()])))

In [10]:
# demographics.printSchema()
    
first_agg = {"Median Age": "first", "Male Population": "first", "Female Population": "first", 
            "Total Population": "first", "Number of Veterans": "first", "Foreign-born": "first", "Average Household Size": "first"}

# First aggregation - City
agg_df = demographics.groupby(["City", "State", "State Code"]).agg(first_agg)

# Pivot Table to transform values of the column Race to different columns
piv_df = demographics.groupBy(["City", "State", "State Code"]).pivot("Race").sum("Count")

In [11]:
agg_df.head()

Row(City='Rockville', State='Maryland', State Code='MD', first(Total Population)=66998, first(Female Population)=35793, first(Median Age)=38.1, first(Number of Veterans)=1990, first(Foreign-born)=25047, first(Male Population)=31205, first(Average Household Size)=2.6)

In [12]:
piv_df.head()

Row(City='Delray Beach', State='Florida', State Code='FL', American Indian and Alaska Native=None, Asian=1696, Black or African-American=21138, Hispanic or Latino=6397, White=40980)

In [13]:
# Rename column names removing the spaces to avoid problems when saving to disk (we got errors when trying to save column names with spaces)
demographics = agg_df.join(other=piv_df, on=["City", "State", "State Code"], how="inner")\
    .withColumnRenamed('State Code', 'StateCode')\
    .withColumnRenamed('first(Total Population)', 'TotalPopulation')\
    .withColumnRenamed('first(Female Population)', 'FemalePopulation')\
    .withColumnRenamed('first(Male Population)', 'MalePopulation')\
    .withColumnRenamed('first(Median Age)', 'MedianAge')\
    .withColumnRenamed('first(Number of Veterans)', 'NumberVeterans')\
    .withColumnRenamed('first(Foreign-born)', 'ForeignBorn')\
    .withColumnRenamed('first(Average Household Size)', 'AverageHouseholdSize')\
    .withColumnRenamed('Hispanic or Latino', 'HispanicOrLatino')\
    .withColumnRenamed('Black or African-American', 'BlackOrAfricanAmerican')\
    .withColumnRenamed('American Indian and Alaska Native', 'AmericanIndianAndAlaskaNative')

In [14]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- TotalPopulation: integer (nullable = true)
 |-- FemalePopulation: integer (nullable = true)
 |-- MedianAge: double (nullable = true)
 |-- NumberVeterans: integer (nullable = true)
 |-- ForeignBorn: integer (nullable = true)
 |-- MalePopulation: integer (nullable = true)
 |-- AverageHouseholdSize: double (nullable = true)
 |-- AmericanIndianAndAlaskaNative: long (nullable = true)
 |-- Asian: long (nullable = true)
 |-- BlackOrAfricanAmerican: long (nullable = true)
 |-- HispanicOrLatino: long (nullable = true)
 |-- White: long (nullable = true)



In [15]:
numeric_cols = ['TotalPopulation', 'FemalePopulation', 'MedianAge', 'NumberVeterans', 'ForeignBorn', 'MalePopulation', 
                'AverageHouseholdSize', 'AmericanIndianAndAlaskaNative', 'Asian', 'BlackOrAfricanAmerican', 'HispanicOrLatino', 'White']

# Fill the null values with 0
demographics = demographics.fillna(0, numeric_cols)

In [16]:
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- TotalPopulation: integer (nullable = true)
 |-- FemalePopulation: integer (nullable = true)
 |-- MedianAge: double (nullable = false)
 |-- NumberVeterans: integer (nullable = true)
 |-- ForeignBorn: integer (nullable = true)
 |-- MalePopulation: integer (nullable = true)
 |-- AverageHouseholdSize: double (nullable = false)
 |-- AmericanIndianAndAlaskaNative: long (nullable = true)
 |-- Asian: long (nullable = true)
 |-- BlackOrAfricanAmerican: long (nullable = true)
 |-- HispanicOrLatino: long (nullable = true)
 |-- White: long (nullable = true)



In [17]:
# Now write (and overwrite) transformed `demographics` dataset onto parquet file
# demographics.write.mode('overwrite').parquet("s3a://udacitycapstone-us-immigration-data-lake/us_cities_demographics.parquet")
demographics.write.mode('overwrite').parquet("us_cities_demographics.parquet")

In [18]:
# Read i94 immigration dataset
immigration=spark.read.parquet("sas_data")

In [19]:
immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [20]:
int_cols = ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 
        'arrdate', 'i94mode', 'i94bir', 'i94visa', 'count', 'biryear', 'dtadfile', 'depdate']
    
date_cols = ['arrdate', 'depdate']
    
high_null = ["visapost", "occup", "entdepu", "insnum"]
not_useful_cols = ["count", "entdepa", "entdepd", "matflag", "dtaddto", "biryear", "admnum"]

In [21]:
# Convert columns read as string/double to integer
immigration = cast_type(immigration, dict(zip(int_cols, len(int_cols)*[IntegerType()])))

In [22]:
# The date format string preferred to our work here: YYYY-MM-DD
date_format = "%Y-%m-%d"
convert_sas_udf = udf(lambda x: x if x is None else (timedelta(days=x) + datetime(1960, 1, 1)).strftime(date_format))

In [23]:
def convert_sas_date(df, cols):
    """
    Convert dates in the SAS datatype to a date in a string format YYYY-MM-DD
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed. 
            Represents the entry point to programming Spark with the Dataset and DataFrame API.
        cols (:obj:`list`): List of columns in the SAS date format to be convert
    """
    for c in [c for c in cols if c in df.columns]:
        df = df.withColumn(c, convert_sas_udf(df[c]))
    return df

In [24]:
 # Convert SAS date to a meaningful string date in the format of YYYY-MM-DD
immigration = convert_sas_date(immigration, date_cols)
    
# Drop high null columns and not useful columns
immigration = immigration.drop(*high_null)
immigration = immigration.drop(*not_useful_cols)

In [25]:
def date_diff(date1, date2):
    '''
    Calculates the difference in days between two dates
    '''
    if date2 is None:
        return None
    else:
        a = datetime.strptime(date1, date_format)
        b = datetime.strptime(date2, date_format)
        delta = b - a
        return delta.days

In [26]:
date_diff_udf = udf(date_diff)

In [27]:
# Create a new columns to store the length of the visitor stay in the US
immigration = immigration.withColumn('stay', date_diff_udf(immigration.arrdate, immigration.depdate))
immigration = cast_type(immigration, {'stay': IntegerType()})

In [28]:
immigration.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- dtadfile: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- stay: integer (nullable = true)



In [29]:
immigration.write.mode("overwrite").parquet('immigration.parquet')

In [30]:
# Start processing the I9I94_SAS_Labels_Description.SAS to create master i94 code dimensions:

'''
/* I94MODE - There are missing values as well as not reported (9) */
	1 = 'Air'
	2 = 'Sea'
	3 = 'Land'
	9 = 'Not reported' ;
'''
# Create i94mode list
i94mode_data =[[1,'Air'],[2,'Sea'],[3,'Land'],[9,'Not reported']]

# Convert to spark dataframe
i94mode=spark.createDataFrame(i94mode_data)

In [31]:
i94mode.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)



In [32]:
# Create i94mode parquet file
i94mode.write.mode("overwrite").parquet('i94mode.parquet')

In [33]:
countries = spark.read.format('csv').options(header='true', inferSchema='true').load("../../data2/GlobalLandTemperaturesByCity.csv")

In [34]:
# Aggregates the dataset by Country and rename the name of new columns
countries = countries.groupby(["Country"]).agg({"AverageTemperature": "avg", "Latitude": "first", "Longitude": "first"})\
.withColumnRenamed('avg(AverageTemperature)', 'Temperature')\
.withColumnRenamed('first(Latitude)', 'Latitude')\
.withColumnRenamed('first(Longitude)', 'Longitude')

In [35]:
def change_field_value_condition(df, change_list):
    '''
    Helper function used to rename column values based on condition.
    
    Args:
        df (:obj:`SparkDataFrame`): Spark dataframe to be processed.
        change_list (:obj: `list`): List of tuples in the format (field, old value, new value)
    '''
    for field, old, new in change_list:
        df = df.withColumn(field, when(df[field] == old, new).otherwise(df[field]))
    return df

In [36]:
# Rename specific country names to match the I94CIT_I94RES lookup table when joining them
change_countries = [("Country", "Congo (Democratic Republic Of The)", "Congo"), ("Country", "Côte D'Ivoire", "Ivory Coast")]
countries = change_field_value_condition(countries, change_countries)
countries = countries.withColumn('CountryLower', lower(countries.Country))

In [37]:
  # Rename specific country names to match the demographics dataset when joining them
change_res = [("I94CTRY", "BOSNIA-HERZEGOVINA", "BOSNIA AND HERZEGOVINA"), 
                  ("I94CTRY", "INVALID: CANADA", "CANADA"),
                  ("I94CTRY", "CHINA, PRC", "CHINA"),
                  ("I94CTRY", "GUINEA-BISSAU", "GUINEA BISSAU"),
                  ("I94CTRY", "INVALID: PUERTO RICO", "PUERTO RICO"),
                  ("I94CTRY", "INVALID: UNITED STATES", "UNITED STATES")]

In [38]:
# Loads the lookup table I94CIT_I94RES
res = spark.read.format('csv').options(header='true', inferSchema='true').load("I94CIT_I94RES.csv")

In [39]:
res = cast_type(res, {"Code": IntegerType()})
res = change_field_value_condition(res, change_res)
res = res.withColumn('resCountry_Lower', lower(res.I94CTRY))

In [40]:
capitalize_udf = udf(lambda x: x if x is None else x.title())

In [41]:
# Join the two datasets to create the country dimmension table
res = res.join(countries, res.resCountry_Lower == countries.CountryLower, how="left")
res = res.withColumn("Country", when(isnull(res["Country"]), capitalize_udf(res.I94CTRY)).otherwise(res["Country"]))   
res = res.drop("I94CTRY", "CountryLower")

In [42]:
res = res.drop("resCountry_Lower")
res.printSchema()

root
 |-- Code: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [43]:
# Create i94mode parquet file
res.write.mode("overwrite").parquet('country.parquet')

In [44]:
immigration.show(3)

+-------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+--------+------+-------+-----+--------+----+
|  cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|dtadfile|gender|airline|fltno|visatype|stay|
+-------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+--------+------+-------+-----+--------+----+
|5748517| 2016|     4|   245|   438|    LOS|2016-04-30|      1|     CA|2016-05-08|    40|      1|20160430|     F|     QF|00011|      B1|   8|
|5748518| 2016|     4|   245|   438|    LOS|2016-04-30|      1|     NV|2016-05-17|    32|      1|20160430|     F|     VA|00007|      B1|  17|
|5748519| 2016|     4|   245|   438|    LOS|2016-04-30|      1|     WA|2016-05-08|    29|      1|20160430|     M|     DL|00040|      B1|   8|
+-------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+--------+------+-------+-----+--------+----+
only s

In [45]:
# Read i94 immigration dataset to create Date Frame
i94_spark=spark.read.parquet("sas_data")

In [46]:
i94_spark.select("i94res","i94port","arrdate","i94mode","depdate","i94bir","i94visa","count" \
                  ,"gender",col("admnum").cast(LongType())).show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
| 438.0|    LOS|20574.0|    1.0|20582.0|  40.0|    1.0|  1.0|     F|94953870030|
| 438.0|    LOS|20574.0|    1.0|20591.0|  32.0|    1.0|  1.0|     F|94955622830|
| 438.0|    LOS|20574.0|    1.0|20582.0|  29.0|    1.0|  1.0|     M|94956406530|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



In [47]:
i94_spark=i94_spark.select(col("i94res").cast(IntegerType()),col("i94port"),
                           col("arrdate").cast(IntegerType()), \
                           col("i94mode").cast(IntegerType()),col("depdate").cast(IntegerType()),
                           col("i94bir").cast(IntegerType()),col("i94visa").cast(IntegerType()), 
                           col("count").cast(IntegerType()), \
                              "gender",col("admnum").cast(LongType()))

In [48]:
i94_spark.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|   438|    LOS|  20574|      1|  20582|    40|      1|    1|     F|94953870030|
|   438|    LOS|  20574|      1|  20591|    32|      1|    1|     F|94955622830|
|   438|    LOS|  20574|      1|  20582|    29|      1|    1|     M|94956406530|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



In [49]:
i94_spark.count(), i94_spark.dropDuplicates().count()

(3096313, 3096302)

In [50]:
i94_spark.dropDuplicates(['admnum']).count()

3075579

In [51]:
# We will drop duplicate rows and save it as final dataset for i94
i94_spark=i94_spark.dropDuplicates()

In [52]:
i94_spark.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
|   582|    XXX|  20557|   null|  20558|    34|      2|    1|  null|91904214530|
|   209|    AGA|  20552|      1|   null|  null|      2|    1|     M|47842155333|
|   209|    ATL|  20571|      1|   null|  null|      2|    1|     M|44537883633|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+
only showing top 3 rows



In [53]:
i94_spark.printSchema()

root
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- admnum: long (nullable = true)



In [54]:
import datetime as dt
# Convert SAS arrival date to datetime format
get_date = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
i94non_immigrant_port_entry = i94_spark.withColumn("arrival_date", get_date(i94_spark.arrdate))

In [55]:
i94non_immigrant_port_entry.show(3)

+------+-------+-------+-------+-------+------+-------+-----+------+-----------+------------+
|i94res|i94port|arrdate|i94mode|depdate|i94bir|i94visa|count|gender|     admnum|arrival_date|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+------------+
|   582|    XXX|  20557|   null|  20558|    34|      2|    1|  null|91904214530|  2016-04-13|
|   209|    AGA|  20552|      1|   null|  null|      2|    1|     M|47842155333|  2016-04-08|
|   209|    ATL|  20571|      1|   null|  null|      2|    1|     M|44537883633|  2016-04-27|
+------+-------+-------+-------+-------+------+-------+-----+------+-----------+------------+
only showing top 3 rows



In [56]:
i94non_immigrant_port_entry.printSchema()

root
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- admnum: long (nullable = true)
 |-- arrival_date: string (nullable = true)



In [57]:
from pyspark.sql import functions as F
i94date= i94non_immigrant_port_entry.withColumn('Darrival_date',F.to_date(i94non_immigrant_port_entry.arrival_date))

In [58]:
i94date.printSchema()

root
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- admnum: long (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- Darrival_date: date (nullable = true)



In [59]:
i94date = i94date.withColumn('arrival_month',month(i94date.Darrival_date))
i94date = i94date.withColumn('arrival_year',year(i94date.Darrival_date))
i94date = i94date.withColumn('arrival_day',dayofmonth(i94date.Darrival_date))
i94date = i94date.withColumn('day_of_week',dayofweek(i94date.Darrival_date))
i94date = i94date.withColumn('arrival_weekofyear',weekofyear(i94date.Darrival_date))

In [60]:
i94date.printSchema()

root
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: integer (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- depdate: integer (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- admnum: long (nullable = true)
 |-- arrival_date: string (nullable = true)
 |-- Darrival_date: date (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- arrival_weekofyear: integer (nullable = true)



In [62]:
i94date=i94date.select(col('arrdate').alias('arrival_sasdate'),col('Darrival_date').alias('arrival_iso_date'),'arrival_month','day_of_week','arrival_year','arrival_day','arrival_weekofyear').dropDuplicates()

In [None]:
i94date.printSchema()

In [None]:
# Create temporary sql table
i94date.createOrReplaceTempView("i94date_table")

In [None]:
# Add seasons to i94 date dimension table
i94date_season=spark.sql('''select arrival_sasdate,
                         arrival_iso_date,
                         arrival_month,
                         day_of_week,
                         arrival_year,
                         arrival_day,
                         arrival_weekofyear,
                         CASE WHEN arrival_month IN (12, 1, 2) THEN 'winter' 
                                WHEN arrival_month IN (3, 4, 5) THEN 'spring' 
                                WHEN arrival_month IN (6, 7, 8) THEN 'summer' 
                                ELSE 'autumn' 
                         END AS date_season from i94date_table''')

In [None]:
i94date_season.printSchema()

In [None]:
# Save i94date dimension to parquet file partitioned by year and month:
i94date_season.write.mode("overwrite").partitionBy("arrival_year", "arrival_month").parquet('i94date.parquet')