In [2]:
import pandas as pd
import os
import re
import pyspark

In [3]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import udf

In [4]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

# Step 1: Scope the Project and Gather Data

## Scope 
- The goal is to create an ETL pipeline with given data sets, so that users can make queries to see if there is relationship between immigration and demographics of the US state. 

## Data Model
- Resulting schema will follow ERD below

![ERD](./ERD.PNG)

## Describe and Gather Data 
### I94 Immigration Data
- This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://travel.trade.gov/research/reports/i94/historical/2016.html) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. 

In [5]:
# Reading the sample data 
df_immigration = spark.read.parquet("sas_data")

In [6]:
# Printing schema
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

- You can see what each column represents in I94_SAS_Labels_Descriptions.SAS file 

In [7]:
# Printing first 5 rows of the data set 
df_immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


### U.S City Demographic Data 
- U.S City Demographic Data : This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

In [39]:
# Reading the demographic data
df_demographic = spark.read.option("delimiter",";").option("header", True).csv("us-cities-demographics.csv")

In [32]:
# Printing schema
df_demographic.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [10]:
df_demographic.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [207]:
df_demographic.select('*').filter(col('City') == 'Newark').toPandas()

Unnamed: 0,City,State,Male Population,Female Population,Total Population,State Code,Race,Count
0,Newark,New Jersey,138040,143873,281913,NJ,White,76402
1,Newark,New Jersey,138040,143873,281913,NJ,Black or African-American,144961
2,Newark,New Jersey,138040,143873,281913,NJ,Asian,7349
3,Newark,New Jersey,138040,143873,281913,NJ,American Indian and Alaska Native,2268
4,Newark,New Jersey,138040,143873,281913,NJ,Hispanic or Latino,100432


### Foreign Country Code Data
- This data set is extracted from I94_SAS_Labels_Description.SAS file. 

In [58]:
foreign_country = 'foreign_country.txt'
with open(foreign_country) as f:
    lines = f.readlines()
lines[0]

"582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'\n"

In [67]:
lines = [line.replace('"','') for line in lines]
lines[0]

"582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'\n"

In [84]:
regexp = re.compile(r"^(\d{3})\s*=\s*\'(.*)\'.*")
matches = [regexp.match(line) for line in lines]
country_dict = {
    'country_code' : list(),
    'country_name' : list()
}

for m in matches:
    if m is None:
        continue
    country_dict['country_code'].append(int(m.group(1)))
    country_dict['country_name'].append(m.group(2))

pdf_foreign_country = pd.DataFrame.from_dict(country_dict)

In [85]:
pdf_foreign_country.head(5)

Unnamed: 0,country_code,country_name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [86]:
df_foreign_country = spark.createDataFrame(pdf_foreign_country)

In [87]:
df_foreign_country.head(5)

[Row(country_code=582, country_name='MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'),
 Row(country_code=236, country_name='AFGHANISTAN'),
 Row(country_code=101, country_name='ALBANIA'),
 Row(country_code=316, country_name='ALGERIA'),
 Row(country_code=102, country_name='ANDORRA')]

### US City to State Code Data 
- This data set is also extracted from I94_SAS_Labels_Description.SAS file.

In [88]:
us_city_state = 'us_city_codes.txt'
with open(us_city_state) as f:
    lines = f.readlines()
lines[0]

"'ALC'\t=\t'ALCAN, AK             '\n"

In [91]:
lines = [line.replace("\t","").replace("'","").replace(" ","").replace("\n","") for line in lines]
lines[0]

'ALC=ALCAN,AK'

In [153]:
regexp = re.compile(r"^(\w{3})=(\w*),(\w{2})")
matches = [regexp.match(line) for line in lines]
city_state_dict = {
    'city_code' : list(),
    'state_name' : list(),
    'state_code' : list()
}

for m in matches:
    if m is None:
        continue
    city_state_dict['city_code'].append(m.group(1))
    city_state_dict['state_name'].append(m.group(2))
    city_state_dict['state_code'].append(m.group(3))

pdf_us_city_state = pd.DataFrame.from_dict(city_state_dict)

In [154]:
pdf_us_city_state.head(5)

Unnamed: 0,city_code,state_name,state_code
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,DAC,DALTONSCACHE,AK
3,PIZ,DEWSTATIONPTLAYDEW,AK
4,DTH,DUTCHHARBOR,AK


In [97]:
df_us_city_state = spark.createDataFrame(pdf_us_city_state)

### US State Code Data (Testing Purpose)

In [155]:
us_city_state = 'us_state_codes.txt'
with open(us_city_state) as f:
    lines = f.readlines()
lines[0]

"'AL'='ALABAMA'\n"

In [156]:
lines = [line.replace("'",'').replace("\n","") for line in lines]
lines[0]

'AL=ALABAMA'

In [159]:
regexp = re.compile(r'^(.{2})=(\w*)')
matches = [regexp.match(line) for line in lines]
state_dict = {
    'state_code' : list(),
    'state_name' : list(),
}

for m in matches:
    if m is None:
        continue
    state_dict['state_code'].append(m.group(1))
    state_dict['state_name'].append(m.group(2))

pdf_us_state = pd.DataFrame.from_dict(state_dict)
df_us_state = spark.createDataFrame(pdf_us_state)
df_us_state.select('*').count()

55

In [160]:
df_us_state.createOrReplaceTempView("us_state_table")

# Step 2: Explore and Clean the Data

## Demographics Data Set

In [11]:
from pyspark.sql.types import IntegerType, DoubleType

### Handling Missing Values

- First, let's see if there are missing values in the data set. Since demographics data set is relatively small (less than 3000 rows), I'll use pandas to explore and clean the data.  

In [33]:
df = df_demographic.toPandas()
for column in df.columns:
    print(f"Null data in {column}? : {any(df[column].isnull())}")

Null data in City? : False
Null data in State? : False
Null data in Median Age? : False
Null data in Male Population? : True
Null data in Female Population? : True
Null data in Total Population? : False
Null data in Number of Veterans? : True
Null data in Foreign-born? : True
Null data in Average Household Size? : True
Null data in State Code? : False
Null data in Race? : False
Null data in Count? : False


- We can see that there are missing values in Male Population, Female Population, Number of Veterans and Average Household Size columns. Let's take a look at how those rows with missing values look like. 

In [13]:
df[df["Male Population"].isnull() == True]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
333,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Black or African-American,331
1437,The Villages,Florida,70.5,,,72590,15231,4034,,FL,White,72211


In [14]:
df[df["Female Population"].isnull() == True]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
333,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Black or African-American,331
1437,The Villages,Florida,70.5,,,72590,15231,4034,,FL,White,72211


- There are 3 rows from The Villages Florida, having missing values on Male/Female populations. 
- Let's take a look other rows from The Villages, Florida.

In [15]:
df[df["City"] == "The Villages"]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
333,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,15231,4034,,FL,Black or African-American,331
1437,The Villages,Florida,70.5,,,72590,15231,4034,,FL,White,72211


- Now we can see that there's no valid data on Male/Female population for The Villages, FL. 
- But, these rows still contains data on Total Populations and Race/ Count. 
- These valid columns are still useful to us, so let's just keep these rows.

In [16]:
df[df["Number of Veterans"].isnull() == True]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
111,San Juan,Puerto Rico,41.4,155408,186829,342237,,,,PR,Hispanic or Latino,335559
155,Caguas,Puerto Rico,40.4,34743,42265,77008,,,,PR,Hispanic or Latino,76349
258,Carolina,Puerto Rico,42.0,64758,77308,142066,,,,PR,American Indian and Alaska Native,12143
637,Carolina,Puerto Rico,42.0,64758,77308,142066,,,,PR,Hispanic or Latino,139967
1747,San Juan,Puerto Rico,41.4,155408,186829,342237,,,,PR,American Indian and Alaska Native,4031
1748,Mayagüez,Puerto Rico,38.1,30799,35782,66581,,,,PR,Asian,235
1995,Ponce,Puerto Rico,40.5,56968,64615,121583,,,,PR,Hispanic or Latino,120705
2004,Bayamón,Puerto Rico,39.4,80128,90131,170259,,,,PR,Hispanic or Latino,169155
2441,San Juan,Puerto Rico,41.4,155408,186829,342237,,,,PR,Asian,2452
2589,Guaynabo,Puerto Rico,42.2,33066,37426,70492,,,,PR,Hispanic or Latino,69936


In [17]:
df[df["Average Household Size"].isnull() == True]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
111,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,Hispanic or Latino,335559
155,Caguas,Puerto Rico,40.4,34743.0,42265.0,77008,,,,PR,Hispanic or Latino,76349
258,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,American Indian and Alaska Native,12143
333,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Black or African-American,331
637,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,Hispanic or Latino,139967
1437,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,White,72211
1747,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,American Indian and Alaska Native,4031
1748,Mayagüez,Puerto Rico,38.1,30799.0,35782.0,66581,,,,PR,Asian,235
1995,Ponce,Puerto Rico,40.5,56968.0,64615.0,121583,,,,PR,Hispanic or Latino,120705


- There are lots of missing values in Number of Veterans and Average Household Size columns
- But, we can see that other columns have valid data(except for The Villages rows) 
- Since we are not going to use Number of Veterans and Average Household Size columns, let's skip to the next step.

### Handling Incorrect Column Types

- printSchema() call on demographics dataframe shows that some of the columns have incorrect data types
- Specifically, columns that are supposed to be numeric types, are now string types
- Let's correct this before move on

In [40]:
# first, replace None values with "0"
df_demographic = df_demographic.na.fill("0")

In [41]:
df_demographic.select("*").where(df_demographic["City"] == "The Villages").toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,The Villages,Florida,70.5,0,0,72590,15231,4034,0,FL,Hispanic or Latino,1066
1,The Villages,Florida,70.5,0,0,72590,15231,4034,0,FL,Black or African-American,331
2,The Villages,Florida,70.5,0,0,72590,15231,4034,0,FL,White,72211


In [42]:
# let's change column types 
df_demographic = df_demographic.withColumn("Median Age", df_demographic["Median Age"].cast(DoubleType()))\
                                .withColumn("Male Population", df_demographic["Male Population"].cast(IntegerType()))\
                                .withColumn("Female Population", df_demographic["Female Population"].cast(IntegerType()))\
                                .withColumn("Total Population", df_demographic["Total Population"].cast(IntegerType()))\
                                .withColumn("Count", df_demographic["Count"].cast(IntegerType()))

In [43]:
df_demographic.printSchema()

root
 |-- City: string (nullable = false)
 |-- State: string (nullable = false)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: string (nullable = false)
 |-- Foreign-born: string (nullable = false)
 |-- Average Household Size: string (nullable = false)
 |-- State Code: string (nullable = false)
 |-- Race: string (nullable = false)
 |-- Count: integer (nullable = true)



- Now, let's leave irrelevant columns behind. 

In [44]:
df_demographic = df_demographic.select("City","State","Male Population","Female Population","Total Population","State Code","Race","Count")

In [45]:
df_demographic.printSchema()

root
 |-- City: string (nullable = false)
 |-- State: string (nullable = false)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- State Code: string (nullable = false)
 |-- Race: string (nullable = false)
 |-- Count: integer (nullable = true)



## Immigrations Data Set

### Handling Invalid Values

In [49]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

- From immigrations data set, we are only going to use i94yr, i94mon, i94cit, i94port, arrdate and visa.
- Among those columns, i94port column will be used to join tables.
- So let's drop rows with invalid i94port column value

In [106]:
valid_city_codes = df_us_city_state.select("city_code").distinct().collect()
valid_city_codes = [row.city_code for row in valid_city_codes]

In [104]:
# printing rows before dropping rows with invalid city_codes 
df_immigration.select("*").count()

3096313

In [107]:
# Dropping rows with invalid city_codes (I94ports)
df_immigration = df_immigration.filter(df_immigration.i94port.isin(valid_city_codes))

In [108]:
# printing rows after dropping rows with invalid city_codes 
df_immigration.select("*").count()

2834543

- 261,770 rows with invalid I94port were dropped

# Step 3: Define the Data Model
## 3.1 Conceptual Data Model

- The problem we want to solve here, is to check if there's relationship between immigration and demographic of a state.
    - Specfically, we want to find out correlation between ethnic distribution of a state and immigration.
- Therefore, we are going to use immigration data as fact table and US demographics data as dimension table.
- Below is an Entity Relationship Diagram.

![ERD](./ERD.PNG)

## 3.2 Mapping Out Data Pipelines
### fact_immigrations
- Extracting relevant columns from immigrations data set
    - year : immigration.i94yr
    - month : immigration.i94mon
    - origin_city : immigration.i94cit
    - destination_city : immigration.i94port
    - arrival_date : immigartion.arrdate
    - visa : immigration.visatype
    
### dim_foreign_country 
- Extracting relevant data from I94_SAS_Labels_Description.SAS file 
- Done in previous cells. (df_foreign_country)

### dim_us_city 
- Extracting relevant data from I94_SAS_Labels_Description.SAS file 
- Done in previous cells. (df_us_city_state)

### dim_demographics
- Aggregate population data (male/female, total) over rows with same 'state'
    - There's multiple rows with same city/state/population value and only differs in 'race' and 'count'
    - Duplicate rows with same city/state/population should be dropped before aggregation
- Aggregate race counts over rows with same 'state' and 'race'
    - This aggregated data forms race_counts column. race_counts column is type of map(string, int) 
    - Keys are race and values are population of the race for corresponding state 

# Step 4: Run Pipelines to Model the Data 
## 4.1 Create the data model
- Since dim_foreign_country and dim_us_city are already made, there's only fact_immigrations and dim_demographics table left to be made.

### fact_immigrations 

In [None]:
df_immigration.createOrReplaceTempView('immigrations_table')
df_fact_immigrations = spark.sql("""
    SELECT 
        i94yr as year,
        i94mon as month,
        i94cit as origin_city,
        i94port as destination_city,
        arrdate as arrival_date,
        (CASE WHEN visatype = '1' 
            THEN 'business'
            ELSE
                CASE WHEN visatype = '3' THEN 'student' ELSE "pleasure" END
        END) as visa
    FROM immigrations_table
""")

In [117]:
df_fact_immigrations.limit(20).toPandas()

Unnamed: 0,year,month,origin_city,destination_city,arrival_date,visa
0,2016.0,4.0,245.0,LOS,20574.0,pleasure
1,2016.0,4.0,245.0,LOS,20574.0,pleasure
2,2016.0,4.0,245.0,LOS,20574.0,pleasure
3,2016.0,4.0,245.0,LOS,20574.0,pleasure
4,2016.0,4.0,245.0,LOS,20574.0,pleasure
5,2016.0,4.0,245.0,HHW,20574.0,pleasure
6,2016.0,4.0,245.0,HHW,20574.0,pleasure
7,2016.0,4.0,245.0,HHW,20574.0,pleasure
8,2016.0,4.0,245.0,HOU,20574.0,pleasure
9,2016.0,4.0,245.0,LOS,20574.0,pleasure


### dim_demographics

- Aggregate population data (male/female, total) over rows with same 'state'
    - There's multiple rows with same city/state/population value and only differs in 'race' and 'count'
    - Duplicate rows with same city/state/population should be dropped before aggregation
- Aggregate race counts over rows with same 'state' and 'race'
    - This aggregated data forms race_counts column. race_counts column is type of map(string, int)
    - Keys are race and values are population of the race for corresponding state

#### Aggregating Population Data

In [126]:
# Double checking that there's duplicate rows with same city/state/population values
df_demographic.filter(df_demographic['City'] == 'Quincy').show()

+------+-------------+---------------+-----------------+----------------+----------+--------------------+-----+
|  City|        State|Male Population|Female Population|Total Population|State Code|                Race|Count|
+------+-------------+---------------+-----------------+----------------+----------+--------------------+-----+
|Quincy|Massachusetts|          44129|            49500|           93629|        MA|               White|58723|
|Quincy|Massachusetts|          44129|            49500|           93629|        MA|  Hispanic or Latino| 2566|
|Quincy|Massachusetts|          44129|            49500|           93629|        MA|American Indian a...|  351|
|Quincy|Massachusetts|          44129|            49500|           93629|        MA|Black or African-...| 3917|
|Quincy|Massachusetts|          44129|            49500|           93629|        MA|               Asian|30473|
+------+-------------+---------------+-----------------+----------------+----------+--------------------

In [128]:
# Selecting city/state/population columns to drop duplicates and 
# change column names so they does not include spaces
df_demographic.createOrReplaceTempView('immigrations')
df_demo_populations = spark.sql("""
    SELECT City as city,
           State as state,
           `Male Population` as male_population,
           `Female Population` as female_population,
           `Total Population`as total_population,
           `State Code` as state_code
    FROM immigrations
""")

df_demo_populations.show(5)

+----------------+-------------+---------------+-----------------+----------------+----------+
|            city|        state|male_population|female_population|total_population|state_code|
+----------------+-------------+---------------+-----------------+----------------+----------+
|   Silver Spring|     Maryland|          40601|            41862|           82463|        MD|
|          Quincy|Massachusetts|          44129|            49500|           93629|        MA|
|          Hoover|      Alabama|          38040|            46799|           84839|        AL|
|Rancho Cucamonga|   California|          88127|            87105|          175232|        CA|
|          Newark|   New Jersey|         138040|           143873|          281913|        NJ|
+----------------+-------------+---------------+-----------------+----------------+----------+
only showing top 5 rows



In [130]:
# Drop duplicate rows from df_demo_populations 
df_demo_populations = df_demo_populations.dropDuplicates()

In [138]:
# Check if duplicate rows were dropped 
rows_count = df_demo_populations.select('*').count()
cities_count = df_demo_populations.select('city', 'state').distinct().count()
rows_count == cities_count

True

In [147]:
# Aggregate population info over states 
df_demo_populations.createOrReplaceTempView("demo_populations")
df_state_populations = spark.sql("""
    SELECT state,
           state_code, 
           SUM(male_population) as male_population,
           SUM(female_population) as female_population,
           SUM(total_population) as total_population
    FROM demo_populations
    GROUP BY state, state_code
""")
df_state_populations.show(5)

+------------+----------+---------------+-----------------+----------------+
|       state|state_code|male_population|female_population|total_population|
+------------+----------+---------------+-----------------+----------------+
| Mississippi|        MS|         112147|           130536|          242683|
|        Utah|        UT|         530818|           519773|         1050591|
|South Dakota|        SD|         122718|           122380|          245098|
|    Kentucky|        KY|         452483|           477394|          929877|
|  California|        CA|       12278281|         12544179|        24822460|
+------------+----------+---------------+-----------------+----------------+
only showing top 5 rows



In [171]:
df_state_populations.createOrReplaceTempView("state_populations")

#### Aggregating Race Counts

In [139]:
# Selecting city/state/state_cod/race/count to aggregate race count over city & state 
df_race_counts = spark.sql("""
    SELECT City as city,
           State as state,
           `State Code` as state_code,
           Race as race,
           Count as count
    FROM immigrations
""")
df_race_counts.show(5)

+----------------+-------------+----------+--------------------+-----+
|            city|        state|state_code|                race|count|
+----------------+-------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|        MA|               White|58723|
|          Hoover|      Alabama|        AL|               Asian| 4759|
|Rancho Cucamonga|   California|        CA|Black or African-...|24437|
|          Newark|   New Jersey|        NJ|               White|76402|
+----------------+-------------+----------+--------------------+-----+
only showing top 5 rows



In [186]:
# Aggregate count over state/race 
df_race_counts.createOrReplaceTempView('race_counts')
df_agg_counts = spark.sql("""
    SELECT state, state_code, race, SUM(count) as count
    FROM race_counts
    GROUP BY state, state_code, race
    ORDER BY race
""")
df_agg_counts.filter(df_agg_counts['state_code'] == "MA").toPandas()

Unnamed: 0,state,state_code,race,count
0,Massachusetts,MA,American Indian and Alaska Native,17060
1,Massachusetts,MA,Asian,218113
2,Massachusetts,MA,Black or African-American,378624
3,Massachusetts,MA,Hispanic or Latino,434271
4,Massachusetts,MA,White,1272935


In [181]:
from pyspark.sql.functions import collect_set, split, col

In [188]:
# Integrate race-count column values into one row based on states
# race counts are oredered by alphabetically 
df_state_race_counts = df_agg_counts.groupBy('state', 'state_code').agg(collect_set('count').alias('counts'))

In [189]:
df_state_race_counts.show(5)

+------------+----------+--------------------+
|       state|state_code|              counts|
+------------+----------+--------------------+
| Mississippi|        MS|[7264, 2587, 323,...|
|South Dakota|        SD|[6859, 13121, 137...|
|        Utah|        UT|[48801, 889798, 2...|
|    Kentucky|        KY|[50478, 32667, 20...|
|  California|        CA|[9856464, 4543730...|
+------------+----------+--------------------+
only showing top 5 rows



In [190]:
df_state_race_counts.createOrReplaceTempView("state_race_counts")
df_state_races = spark.sql("""
    SELECT state,
           state_code,
           counts[0] as indian_alaska,
           counts[1] as asian,
           counts[2] as black,
           counts[3] as hispanic_latino,
           counts[4] as white
    FROM state_race_counts
""")

In [191]:
df_state_races.limit(5).toPandas()

Unnamed: 0,state,state_code,indian_alaska,asian,black,hispanic_latino,white
0,Mississippi,MS,7264,2587,323,71645,167366
1,South Dakota,SD,6859,13121,13782,12359,213281
2,Utah,UT,48801,889798,21893,18746,201695
3,Kentucky,KY,50478,32667,202749,705790,7772
4,California,CA,9856464,4543730,2047009,14905129,401386


In [192]:
df_state_races.createOrReplaceTempView("state_races")

#### Merging Two Tables 
- Now we have state wise population table and race table
- Merging the two table becomes dim_demographics

In [193]:
df_dim_demographics = spark.sql("""
    SELECT sp.state,
           sp.state_code,
           male_population,
           female_population,
           total_population,
           indian_alaska as indian_alaskan_population,
           asian as asian_population,
           black as black_population,
           hispanic_latino as hispanic_latino_population,
           white as white_population
    FROM state_populations sp
    JOIN state_races sr
    ON sp.state_code = sr.state_code
""")

In [194]:
df_dim_demographics.limit(5).toPandas()

Unnamed: 0,state,state_code,male_population,female_population,total_population,indian_alaskan_population,asian_population,black_population,hispanic_latino_population,white_population
0,Mississippi,MS,112147,130536,242683,7264,2587,323,71645,167366
1,Utah,UT,530818,519773,1050591,48801,889798,21893,18746,201695
2,South Dakota,SD,122718,122380,245098,6859,13121,13782,12359,213281
3,Kentucky,KY,452483,477394,929877,50478,32667,202749,705790,7772
4,California,CA,12278281,12544179,24822460,9856464,4543730,2047009,14905129,401386


## Storing Tables
- 4 tables are now created. 
    - dim_foriegn_country
    - dim_us_city
    - dim_demographics
    - fact_immigrations
- Let's store these tables as last step of the pipeline

In [197]:
df_foreign_country.write.mode("overwrite").csv('./output/dim_foreign_country.csv')
df_us_city_state.write.mode("overwrite").csv('./output/dim_us_city.csv')
df_dim_demographics.write.mode("overwrite").csv('./output/dim_demographics.csv')
df_fact_immigrations.write.mode("overwrite").partitionBy("destination_city").parquet('./output/fact_immigrations.parquet')

## 4.2 Data Quality Checks
- Fact table is extracted and loaded without transformation from cleaned immigrations data set. 
- dim_foreign_country and dim_us_city are also extracted and loaded from cleaned data. 
- dim_demographics is the one which went through the most transformation. 
- dim_demographics deserves a good quality check

In [208]:
# Let's check if total population match up with race populations 
df_dim_demographics.createOrReplaceTempView("dim_demographics")
df_check = spark.sql("""
    SELECT state_code,
           total_population as total,
           indian_alaskan_population as indian,
           asian_population as asian,
           black_population as black,
           hispanic_latino_population as hispanic,
           white_population as white,
           total_population - (indian_alaskan_population + asian_population + black_population + hispanic_latino_population + white_population) as dif
    FROM dim_demographics
    WHERE total_population != (indian_alaskan_population + asian_population + black_population + hispanic_latino_population + white_population)
""")

df_check.count()

48

- In 48 states, numbers did not match up
- Let's compare source data set and dim_demographics

In [216]:
df_demographic.createOrReplaceTempView("demo_source")
race_sum_over_state = spark.sql("""
    SELECT `State Code` as state_code,
            SUM(Count) as race_total
    FROM demo_source
    GROUP BY `State Code`
""")

In [217]:
race_sum_over_state.createOrReplaceTempView("race_sum_from_source")

In [229]:
spark.sql("""
    SELECT dd.state_code,
           total_population as total_dim_demo,
           race_total,
           (indian_alaskan_population + asian_population + black_population + hispanic_latino_population + white_population) as dim_race_total
    FROM dim_demographics dd
    JOIN race_sum_from_source src
    ON dd.state_code = src.state_code
    WHERE (indian_alaskan_population + asian_population + black_population + hispanic_latino_population + white_population) != race_total
""").show()

+----------+--------------+----------+--------------+
|state_code|total_dim_demo|race_total|dim_race_total|
+----------+--------------+----------+--------------+
+----------+--------------+----------+--------------+



- It seems that numbers did not match up even in the data source, and our transformation was not wrong.
- It would have been much cleaner result if numbers from source were matched up, but we can still use population counts over race as reference
- Also, we have verified that there were no error in transforming demographics data

## 4.3 Data dictionary 
### fact_immigrations
- year : 4 digit year
- month : numeric month
- origin_country : immigrant's origination country code (3 digit number)
- destination_city : immigrant's destination US city (3 capital letters) 
- arrival_date : arrival date in SAS date numeric
- visa : visa type (business, pleasure, student)

### dim_foreign_country 
- country_code : foreign country code (3 digit number, match with fact origin_country)
- country_name : name of the country 

### dim_us_city 
- city_code : US city code (3 capital letters, match with fact destination_city)
- state_code : State code of the city (2 Capital letters) 

### dim_demographics 
- state : state name 
- state_code : State code of the city (2 Capital letters) 
- male_population : Male population count
- female_population : Female population count
- total_population : Total population count
- indian_alaskan_population : Indian and Alaska native population count
- asian_population : Asian population count
- black_population : Black and African-American population count 
- hispanic_latino_population : Hispanic and Latino population count
- white_population : White population count

# Step 5: Complete Project Write Up
Clearly state the rationale for the choice of tools and technologies for the project.

- For exploring small data set, Pandas was used since it has familiar API and good performance on in memory data. In other instances, Spark was used. Reasons of choosing Spark are:
    - Can read and write various file types
    - Schema on read is supported
    - Great performance on distributed data set 

Propose how often the data should be updated and why.

- It depends on analytical requirements. Since what we want to see is correlation between number of immigrants and state demographic, monthly or even yearly update would be sufficient.

Write a description of how you would approach the problem differently under the following scenarios:

- The data was increased by 100x.
    - I would horizontally scale up Spark clusters. 
- The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - I would use task schedulers such as Airflow. Creating DAG out of above codes would be done in no time. 
- The database needed to be accessed by 100+ people.
    - In this case, I would make OLAP cubes and store those in separate NoSQL DB. 