# Project Title
### Data Engineering Capstone Project


In [50]:
import pandas as pd
import pyspark
import configparser
import os
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DateType, StructType, StructField, TimestampType
from pathlib import Path


In [51]:
# Read in the data here
spark = SparkSession.builder \
            .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
            .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
            .enableHiveSupport().getOrCreate()
input_data = ''
immigration_data_path = input_data + "sas_data"
df = spark.read.parquet(immigration_data_path)
df.count()

3096313

In [52]:
df.head(5)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='WA', depdate=20582.0, i94bir=29.

In [53]:
immigration_data_path = input_data + "sas_data/part-00013-b9542815-7a8d-45fc-9c67-c9c5007ad0d4-c000.snappy.parquet"
df = spark.read.parquet(immigration_data_path)
df = df.filter(df.cicid.isNotNull() & df.arrdate.isNotNull() & df.depdate.isNotNull())

df.count()

203307

In [54]:
def rename_columns(df, new_columns):
    """Rename columns.
    Input:
        Data frame that needs to rename columns.
        List of new columns name.
    Output:
        Data frame with new columns name.
    """
    for original, new in zip(df.columns, new_columns):
        df = df.withColumnRenamed(original, new)
    return df

def to_date_func(date):
    if date is not None:
        return (datetime(1960,1,1) + timedelta(days=int(date)))
to_date_func_udf = udf(to_date_func, DateType())

In [55]:
fact_immigration = df.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr', \
                                 'arrdate', 'depdate', 'i94mode', 'i94visa') \
                        .distinct() \
                         .withColumn("immigration_id", monotonically_increasing_id())
    
new_columns = ['cic_id', 'arrival_year', 'arrival_month', 'city_code', 'state_code',\
               'arrival_date', 'departure_date', 'mode', 'visa']

fact_immigration = rename_columns(fact_immigration, new_columns)

fact_immigration = fact_immigration.withColumn('country', lit('United States'))

fact_immigration = fact_immigration.withColumn('arrival_date', \
                                    to_date_func_udf(col('arrival_date')))

fact_immigration = fact_immigration.withColumn('departure_date', \
                                    to_date_func_udf(col('departure_date')))

# fact_immigration.write.mode("overwrite").partitionBy('state_code')\
#                 .parquet(path=output_data + 'fact_immigration')

In [56]:
arrival_date = fact_immigration.select('arrival_date').rdd.flatMap(lambda x: x).collect()
departure_date = fact_immigration.select('departure_date').rdd.flatMap(lambda x: x).collect()
date = arrival_date + departure_date
date[0]

datetime.date(2016, 4, 29)

In [57]:
len(date)

406614

In [58]:
time = spark.createDataFrame(date, DateType())
time = rename_columns(time, ['date'])
time.printSchema()
time.count()

root
 |-- date: date (nullable = true)



406614

In [98]:
dim_time = time.select('date') \
                .withColumn('hour', hour(time['date'])) \
                .withColumn('day', dayofmonth(time['date'])) \
                .withColumn('week', weekofyear(time['date'])) \
                .withColumn('month', month(time['date'])) \
                .withColumn('year', year(time['date'])) \
                .withColumn('weekday', (dayofweek(time['date'])+5)%7+1) \
                .distinct()
dim_time.show()

+----------+----+---+----+-----+----+-------+
|      date|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|2016-04-28|   0| 28|  17|    4|2016|      4|
|2016-06-01|   0|  1|  22|    6|2016|      3|
|2016-05-01|   0|  1|  17|    5|2016|      7|
|2016-07-08|   0|  8|  27|    7|2016|      5|
|2015-04-25|   0| 25|  17|    4|2015|      6|
|2016-01-29|   0| 29|   4|    1|2016|      5|
|2015-04-03|   0|  3|  14|    4|2015|      5|
|2016-04-09|   0|  9|  14|    4|2016|      6|
|2016-06-02|   0|  2|  22|    6|2016|      4|
|2016-03-11|   0| 11|  10|    3|2016|      5|
|2016-07-12|   0| 12|  28|    7|2016|      2|
|2016-05-14|   0| 14|  19|    5|2016|      6|
|2016-08-09|   0|  9|  32|    8|2016|      2|
|2016-04-30|   0| 30|  17|    4|2016|      6|
|2016-08-03|   0|  3|  31|    8|2016|      3|
|2016-02-04|   0|  4|   5|    2|2016|      4|
|2016-06-09|   0|  9|  23|    6|2016|      4|
|2016-04-10|   0| 10|  14|    4|2016|      7|
|2016-05-23|   0| 23|  21|    5|20

In [60]:
dim_personal_information = df.select('cicid', 'i94cit', 'i94res', \
                                  'biryear', 'gender').distinct() \
                          .withColumn("immi_personal_id", monotonically_increasing_id())
    
# rename to meaningful columns name
new_columns = ['cic_id', 'citizen_country', 'residence_country', \
               'birth_year', 'gender']
dim_personal_information = rename_columns(dim_personal_information, new_columns)
dim_personal_information.count()

203307

In [61]:
input_data = ''
demographics_data_path = input_data + 'us-cities-demographics.csv'
df = spark.read.options(header=True, delimiter=';').csv(demographics_data_path)
df.head(5)

[Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924'),
 Row(City='Quincy', State='Massachusetts', Median Age='41.0', Male Population='44129', Female Population='49500', Total Population='93629', Number of Veterans='4147', Foreign-born='32935', Average Household Size='2.39', State Code='MA', Race='White', Count='58723'),
 Row(City='Hoover', State='Alabama', Median Age='38.5', Male Population='38040', Female Population='46799', Total Population='84839', Number of Veterans='4819', Foreign-born='8229', Average Household Size='2.58', State Code='AL', Race='Asian', Count='4759'),
 Row(City='Rancho Cucamonga', State='California', Median Age='34.5', Male Population='88127', Female Population='87105', Total Population='175232', Number of Veterans='5821', Foreign-born='3387

In [62]:
df.count()

2891

In [63]:
dim_state = df.select('State Code', 'State', \
                          'Median Age', 'Male Population', \
                          'Female Population', 'Total Population') \
                    .distinct()
new_columns = ['state_code', 'state_name', 'median_age', 'male_population', 'female_population', 'total_population']
dim_state = rename_columns(dim_state, new_columns)
# dim_state = dim_state.filter(dim_state.state_code.isNotNull())
dim_state.groupBy("state_code", "state_name") \
    .agg(expr('percentile(median_age, array(0.5))')[0].alias('median_age'),
      sum("male_population").alias("male_population"), 
      sum("female_population").alias("female_population"), 
      sum("total_population").alias("total_population"))  \
    .show(5)


+----------+--------------+------------------+---------------+-----------------+----------------+
|state_code|    state_name|        median_age|male_population|female_population|total_population|
+----------+--------------+------------------+---------------+-----------------+----------------+
|        MT|       Montana|              35.5|        87707.0|          93587.0|        181294.0|
|        NC|North Carolina|              35.1|      1466105.0|        1594094.0|       3060199.0|
|        MD|      Maryland|35.900000000000006|       627951.0|         684178.0|       1312129.0|
|        CO|      Colorado|              36.8|      1454619.0|        1481050.0|       2935669.0|
|        CT|   Connecticut|             34.75|       432157.0|         453424.0|        885581.0|
+----------+--------------+------------------+---------------+-----------------+----------------+
only showing top 5 rows



In [64]:
dim_state = df.select('State Code', 'State', \
                          'Median Age', 'Male Population', \
                          'Female Population', 'Total Population') \
                    .distinct()
new_columns = ['state_code', 'state_name', 'median_age', 'male_population', 'female_population', 'total_population']
dim_state = rename_columns(dim_state, new_columns)
dim_state.count()

596

In [65]:
all_df = {'fact_immigration': fact_immigration, \
              'dim_time': dim_time, \
              'dim_personal_information': dim_personal_information, \
              'dim_state': dim_state}
for df_name, df in all_df.items():
    num_records = df.count()
    if num_records <= 0:
        raise ValueError(f"Table {df_name} is empty!")
    else:
        print(f"Table: {df_name} is not empty: total {num_records} records.")
        
    num_null_key_records = df.filter(df[df.columns[0]].isNull()).count()
    if num_null_key_records > 0:
        raise ValueError(f"Table {df_name} have null value for {df.columns[0]} key column !")
    else:
        print(f"Table {df_name} does not have null value for {df.columns[0]} key column .")
    
    print("Schema of this table:")
    df.printSchema()
    

Table: fact_immigration is not empty: total 203307 records.
Table fact_immigration does not have null value for cic_id key column .
Schema of this table:
root
 |-- cic_id: double (nullable = true)
 |-- arrival_year: double (nullable = true)
 |-- arrival_month: double (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- mode: double (nullable = true)
 |-- visa: double (nullable = true)
 |-- immigration_id: long (nullable = false)
 |-- country: string (nullable = false)

Table: dim_time is not empty: total 228 records.
Table dim_time does not have null value for date key column .
Schema of this table:
root
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullabl

In [88]:
fact_immigration.createOrReplaceTempView("fact_immigration_table")
dim_time.createOrReplaceTempView("dim_time_table")
dim_personal_information.createOrReplaceTempView("dim_personal_information_table")
dim_state.createOrReplaceTempView("dim_state_table")

In [99]:
spark.conf.set("spark.sql.crossJoin.enabled", "true")
# Some analysis query example:
# Number of immigrants go to Maryland state:
num_immigrants_of_Maryland_state = spark.sql("""
    SELECT COUNT(*)
    FROM fact_immigration_table AS fit
    JOIN dim_state_table AS dst
    WHERE dst.state_name == "Maryland"
    """)
print(num_immigrants_of_Maryland_state.collect()[0][0])

2033070


In [105]:
# Number of immigrants on Saturday:
num_immigrants_on_Saturday = spark.sql("""
    SELECT COUNT(*)
    FROM fact_immigration_table AS fit
    JOIN dim_time_table AS dt
    WHERE dt.weekday == 6
    """)
print(num_immigrants_on_Saturday.collect()[0][0])

6912438


In [104]:
# Number of immigrants on Monday:
num_immigrants_on_Monday = spark.sql("""
    SELECT COUNT(*)
    FROM fact_immigration_table AS fit
    JOIN dim_time_table AS dt
    WHERE dt.weekday == 1
    """)
print(num_immigrants_on_Monday.collect()[0][0])

5895903
