In [4]:
from pyspark.sql import SparkSession, SQLContext, GroupedData, HiveContext
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit
from pyspark.sql import Row
# Increased memory
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.config("spark.python.worker.memory", "15g") \
.enableHiveSupport().getOrCreate()
df=spark.read.parquet("sas_data")

In [5]:
# Output the file as parquet
output_path = "output_data/"
def write_to_parquet(table, file_name):
    """
    This function Allows us to write/transform/Overwrite a file to parquet.
    It also stores the outputed filed in the directory output_data
    """
    # write artists table to parquet files
    file_output = output_path + file_name
    table.write.mode("overwrite").parquet(file_output)
    
#Combine Two data frames    

def customUnion(df1, df2):
    """
    This function is responsible for combining two data frames.
    """
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended

In [6]:
dfAirport = spark.read.format("csv").option("header", "true").load("airport-codes_csv.csv")

In [7]:
dfAirport.show(10)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [8]:
#Take the countries that are US only for the requirements of this project
#drop Unneccesary columns
dfAirport = dfAirport \
        .where(
        (col("iso_country")=="US"))\
        .dropDuplicates()\
        .drop("local_code", "elevation_ft", "continent", "iso_region", "municipality")


In [9]:
dimAirport = dfAirport.select(['ident', 'type', 'name', 'gps_code', 'iata_code', 'coordinates']) \
                .dropDuplicates().dropna()

In [10]:
write_to_parquet(dimAirport, "dimAirport.parquet")

In [11]:
dimAirport.show(10)

+-----+--------------+--------------------+--------+---------+--------------------+
|ident|          type|                name|gps_code|iata_code|         coordinates|
+-----+--------------+--------------------+--------+---------+--------------------+
| KLAR|medium_airport|Laramie Regional ...|    KLAR|      LAR|-105.675003051757...|
| KE24| small_airport|  Whiteriver Airport|     E24|      WTR|-109.9869995, 33....|
| KSYI| small_airport|Bomar Field Shelb...|    KSYI|      SYI|-86.44249725, 35....|
| KGNG| small_airport|Gooding Municipal...|    KGNG|      GNG|-114.7649994, 42....|
| KCSQ| small_airport|Creston Municipal...|    KCSQ|      CSQ|-94.3632965087999...|
| KUNU| small_airport|Dodge County Airport|    KUNU|      UNU|-88.70320129, 43....|
| PAGN| seaplane_base|Angoon Seaplane Base|    PAGN|      AGN|-134.585007, 57.5...|
| PHLI|medium_airport|       Lihue Airport|    PHLI|      LIH|-159.339004516601...|
| PAVL|medium_airport|    Kivalina Airport|    PAVL|      KVL|-164.563003540

In [12]:
demo = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("us-cities-demographics.csv")

In [13]:
demo.show(10)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

In [14]:

print("Print schema:")
df.printSchema()

Print schema:
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: doubl

In [15]:
# Rename columns, drop unneccesary columns and give them the right data type
df = df \
.withColumn("cicid", col("cicid").cast("integer")) \
.withColumn("Year", col("i94yr").cast("integer")) \
.drop("i94yr") \
.withColumn("Month", col("i94mon").cast("integer")) \
.drop("i94mon") \
.withColumn("BirthCountry", col("i94cit").cast("integer")) \
.drop("i94cit") \
.withColumn("ResidingCountry", col("i94res").cast("integer")) \
.drop("i94res") \
.withColumnRenamed("i94port", "arrivalPort") \
.withColumn("Mode", col("i94mode").cast("integer")) \
.drop("i94mode") \
.withColumnRenamed("i94addr", "ArrivalAddress") \
.withColumn("Age", col("i94bir").cast("integer")) \
.drop("i94bir") \
.withColumn("VisaCodes", col("i94visa").cast("integer")) \
.drop("i94visa") \
.withColumnRenamed("entdepa", "ArrivalFlag") \
.withColumnRenamed("entdepd", "DepartureFlag") \
.withColumnRenamed("entdepu", "UpdateFlag") \
.withColumnRenamed("matflag", "MatchFlag") \
.withColumn("YearOfBirth", col("biryear").cast("integer")) \
.drop("biryear") \
.withColumnRenamed("fltno", "FlightNumber") \
.withColumnRenamed("VisaType", "VisaType") \
.withColumn("sasDate", to_date(lit("01/01/1960"), "MM/dd/yyyy")) \
.withColumn("arrivalDate", expr("date_add(sasDate, arrdate)")) \
.withColumn("departureDate", expr("date_add(sasDate, depdate)")) \
.drop("sasDate", "arrdate", "depdate", "count", "admnum", "dtadfile", "visapost", "occup", "dtaddto", "insnum")

In [16]:
print("Print schema:")
df.printSchema()

Print schema:
root
 |-- cicid: integer (nullable = true)
 |-- arrivalPort: string (nullable = true)
 |-- ArrivalAddress: string (nullable = true)
 |-- ArrivalFlag: string (nullable = true)
 |-- DepartureFlag: string (nullable = true)
 |-- UpdateFlag: string (nullable = true)
 |-- MatchFlag: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- FlightNumber: string (nullable = true)
 |-- VisaType: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- BirthCountry: integer (nullable = true)
 |-- ResidingCountry: integer (nullable = true)
 |-- Mode: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- VisaCodes: integer (nullable = true)
 |-- YearOfBirth: integer (nullable = true)
 |-- arrivalDate: date (nullable = true)
 |-- departureDate: date (nullable = true)



In [17]:
df.head(20)

[Row(cicid=5748517, arrivalPort='LOS', ArrivalAddress='CA', ArrivalFlag='G', DepartureFlag='O', UpdateFlag=None, MatchFlag='M', gender='F', airline='QF', FlightNumber='00011', VisaType='B1', Year=2016, Month=4, BirthCountry=245, ResidingCountry=438, Mode=1, Age=40, VisaCodes=1, YearOfBirth=1976, arrivalDate=datetime.date(2016, 4, 30), departureDate=datetime.date(2016, 5, 8)),
 Row(cicid=5748518, arrivalPort='LOS', ArrivalAddress='NV', ArrivalFlag='G', DepartureFlag='O', UpdateFlag=None, MatchFlag='M', gender='F', airline='VA', FlightNumber='00007', VisaType='B1', Year=2016, Month=4, BirthCountry=245, ResidingCountry=438, Mode=1, Age=32, VisaCodes=1, YearOfBirth=1984, arrivalDate=datetime.date(2016, 4, 30), departureDate=datetime.date(2016, 5, 17)),
 Row(cicid=5748519, arrivalPort='LOS', ArrivalAddress='WA', ArrivalFlag='G', DepartureFlag='O', UpdateFlag=None, MatchFlag='M', gender='M', airline='DL', FlightNumber='00040', VisaType='B1', Year=2016, Month=4, BirthCountry=245, ResidingCoun

In [18]:
dimTime = df.select(['arrivalDate'])\
                .withColumnRenamed('arrivalDate', 'FullYear')

dimTime = dimTime \
             .withColumn('Day', F.dayofmonth('FullYear')) \
             .withColumn('Month', F.month('FullYear')) \
             .withColumn('Year', F.year('FullYear')) \
             .withColumn('WeekDay', F.dayofweek('FullYear'))\
             .dropDuplicates()
write_to_parquet(dimTime,"dimTime.parquet" )

In [19]:
dimTime.show(10)

+----------+---+-----+----+-------+
|  FullYear|Day|Month|Year|WeekDay|
+----------+---+-----+----+-------+
|2016-04-28| 28|    4|2016|      5|
|2016-04-14| 14|    4|2016|      5|
|2016-04-11| 11|    4|2016|      2|
|2016-04-01|  1|    4|2016|      6|
|2016-04-26| 26|    4|2016|      3|
|2016-04-27| 27|    4|2016|      4|
|2016-04-16| 16|    4|2016|      7|
|2016-04-20| 20|    4|2016|      4|
|2016-04-24| 24|    4|2016|      1|
|2016-04-30| 30|    4|2016|      7|
+----------+---+-----+----+-------+
only showing top 10 rows



In [20]:
dimImmigrant = df.select(['YearOfBirth','gender'])\
                    .dropDuplicates() \
                    .withColumn("Immigrant_ID", \
                               monotonically_increasing_id())

# then since the Immigrant_ID is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
# depending on what operations need to be done on the dim
dimImmigrant.createOrReplaceTempView('dimImmigrant')
new_dimImmigrant = spark.sql('select row_number() over (order by "Immigrant_ID") as num, * from dimImmigrant')

In [21]:
new_dimImmigrant.show(10)

+---+-----------+------+------------+
|num|YearOfBirth|gender|Immigrant_ID|
+---+-----------+------+------------+
|  1|       1972|     M|           0|
|  2|       1959|  null|           1|
|  3|       2004|     F|           2|
|  4|       1927|     M|           3|
|  5|       1930|     M|           4|
|  6|       1990|  null|  8589934592|
|  7|       1937|     M|  8589934593|
|  8|       1933|     M|  8589934594|
|  9|       1932|     X|  8589934595|
| 10|       1908|     M| 17179869184|
+---+-----------+------+------------+
only showing top 10 rows



In [22]:
write_to_parquet(dimImmigrant,"dimImmigrant.par")

In [23]:
dimImmigrant.show(10)

+-----------+------+------------+
|YearOfBirth|gender|Immigrant_ID|
+-----------+------+------------+
|       1972|     M|           0|
|       1959|  null|           1|
|       2004|     F|           2|
|       1927|     M|           3|
|       1930|     M|           4|
|       1990|  null|  8589934592|
|       1937|     M|  8589934593|
|       1933|     M|  8589934594|
|       1932|     X|  8589934595|
|       1908|     M| 17179869184|
+-----------+------+------------+
only showing top 10 rows



In [24]:
dimCountry = df.select('BirthCountry').distinct()\
                .withColumn("Country_ID", \
                          monotonically_increasing_id())

dimCountry.createOrReplaceTempView('dimCountry')
new_dimCountry = spark.sql('select row_number() over (order by "Country_ID") as num, * from dimCountry')

In [25]:
new_dimCountry.show(10)

+---+------------+-----------+
|num|BirthCountry| Country_ID|
+---+------------+-----------+
|  1|         148|          0|
|  2|         392| 8589934592|
|  3|         516|17179869184|
|  4|         251|25769803776|
|  5|         580|25769803777|
|  6|         255|42949672960|
|  7|         296|51539607552|
|  8|         133|51539607553|
|  9|         472|51539607554|
| 10|         322|60129542144|
+---+------------+-----------+
only showing top 10 rows



In [26]:
write_to_parquet(dimCountry, "dimCountry.parquet")

In [27]:
dimCountry.show(10)

+------------+-----------+
|BirthCountry| Country_ID|
+------------+-----------+
|         148|          0|
|         392| 8589934592|
|         516|17179869184|
|         251|25769803776|
|         580|25769803777|
|         255|42949672960|
|         296|51539607552|
|         133|51539607553|
|         472|51539607554|
|         322|60129542144|
+------------+-----------+
only showing top 10 rows



In [29]:
dimLandingHarbour=df.select('arrivalPort').distinct()\
                    .dropDuplicates()\
                    .withColumn("Port_ID",
                               monotonically_increasing_id())


dimLandingHarbour.createOrReplaceTempView('dimLandingHarbour')
new_dimLandingHarbour = spark.sql('select row_number() over (order by "Port_ID") as num, * from dimLandingHarbour')

In [30]:
new_dimLandingHarbour.show(5)

+---+-----------+----------+
|num|arrivalPort|   Port_ID|
+---+-----------+----------+
|  1|        FMY|         0|
|  2|        BGM|         1|
|  3|        HEL|8589934592|
|  4|        DNS|8589934593|
|  5|        MOR|8589934594|
+---+-----------+----------+
only showing top 5 rows



In [31]:
write_to_parquet(dimLandingHarbour, "dimLandingHarbour.parquet")

In [32]:
dimLandingHarbour.show(10)

+-----------+-----------+
|arrivalPort|    Port_ID|
+-----------+-----------+
|        FMY|          0|
|        BGM|          1|
|        HEL| 8589934592|
|        DNS| 8589934593|
|        MOR| 8589934594|
|        FOK|25769803776|
|        HVR|25769803777|
|        SNA|34359738368|
|        PTK|34359738369|
|        CLG|42949672960|
+-----------+-----------+
only showing top 10 rows



In [33]:
dimVisa=df.select('VisaType').distinct()\
                    .dropDuplicates()\
                    .withColumn("Visa_ID",
                               monotonically_increasing_id())

In [34]:
dimVisa.show(10)

+--------+------------+
|VisaType|     Visa_ID|
+--------+------------+
|      F2|103079215104|
|     GMB|352187318272|
|      B2|369367187456|
|      F1|498216206336|
|     CPL|601295421440|
|      I1|704374636544|
|      WB|738734374912|
|      M1|747324309504|
|      B1|807453851648|
|      WT|884763262976|
+--------+------------+
only showing top 10 rows

