# Udacity provided capstone project
#### Project Summary
In the Udacity provided project, the main data includes data on immigration to the United States, and supplementary datasets include data on airport codes, U.S. city demographics, and temperature data. Using this data, we create data lake tables using Pyspark. Created data lake tables can be used to analyze different queries such as immigration trends at destination cities, gender or age distribution of the immigrants, or visa distribution of the immigrants. We use 3 different sources, the immigration dataset, city temperature and demographics data. 

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import pandas as pd

import os
import glob
import re
from datetime import datetime, timedelta
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType

### Step 1: Scope the Project and Gather Data

#### Scope 
Our goal is to pull data from 3 different sources and create dimension and fact tables so that different queries can be run on dataset such as:
- Which city was most visited in a specific month?
- What are the top countries of the origin ?
- What is the visa type distribution of the immigrants?

#### Describe and Gather Data 
1. **I94 Immigration Data:** This data comes from the US National Tourism and Trade Office. the original data source is: https://travel.trade.gov/research/reports/i94/historical/2016.html. The dataset within sas_data contains data from 2016. A data dictionary and also a sample csv data exist.

2. **World Temperature Data:** This data comes from Kaggle and contains average weather temperatures by city. The source link is: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data 

3. **U.S. City Demographic Data:** This data comes from OpenSoft and contains information about the demographics of US cities such as average age, male and female population. The source link: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/

In [4]:
# Create spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [5]:
# Read immigration data

# There's a file for each month of the year. For this project due to performance issues, 
# we will use data from April 2016 (i94_apr16_sub.sas7bdat).

i94_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
i94_fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
i94_df = spark.read.format("com.github.saurfang.sas.spark").load(i94_fname)

In [6]:
# Read temperature data
temperature_fname = "../../data2/GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature_fname)

In [7]:
# Read demographics data
demo_fname = "us-cities-demographics.csv"
demo_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demo_fname)

### Step 2: Explore and Assess the Data
#### Exploration and cleaning of the immigration data 
We will check the data by counting rows and also have a look at the first records to get a better idea on existing columns. We need to drop all entries with invalid codes as described in labels description file (I94_SAS_Labels_Description). For cleaning the data, we will remove any missing values, extract valid states, and convert SAS format dates to PySpark format and drop duplicates. 

In [8]:
# Check the number of rows in the dataset
i94_df.count() # number of rows = 3096313

3096313

In [9]:
i94_df.head() # get a quick overview of the data columns

Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2')

In [10]:
# Now we need to identify valid ports for data quality purposes
# We want to drop all entries where the destination city code i94port is not a valid value (e.g., XXX, 99, etc). 

i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))

659


In [11]:
# Now we want to check valid states by creating of a list 
valid_states = demo_df.toPandas()["State Code"].unique()
print(len(valid_states))
print(valid_states)

49
['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [12]:
# Create udf to convert SAS date to PySpark date 
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [13]:
# Create udf to validate state
@udf(StringType())
def validate_state(x):  
    if x in valid_states:
        return x
    return 'other'

In [14]:
# Clean immigration data

# Remove any missing values
cleaned_i94_df = i94_df.dropna(how="any", subset=["i94port", "i94addr", "gender"])

# Extract valid states 
cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))

# Convert arrival_date (SAS format) to PySpark format
cleaned_i94_df = cleaned_i94_df.withColumn("arrdate", convert_datetime(cleaned_i94_df.arrdate))

# Keep only us related immigration data
cleaned_i94_df = cleaned_i94_df.filter(cleaned_i94_df.i94addr != 'other')

staging_i94_df = cleaned_i94_df.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()

staging_i94_df.limit(5).toPandas() # staging immigration data 

Unnamed: 0,id,date,city_code,state_code,age,gender,visa_type,count
0,168.0,2016-04-01,WAS,DC,34.0,M,2.0,1.0
1,383.0,2016-04-01,MIA,FL,40.0,M,2.0,1.0
2,608.0,2016-04-01,TOR,TX,45.0,M,1.0,1.0
3,930.0,2016-04-01,NEW,NY,49.0,F,2.0,1.0
4,1229.0,2016-04-01,NYC,CT,32.0,M,1.0,1.0


In [15]:
staging_i94_df.printSchema()

root
 |-- id: double (nullable = true)
 |-- date: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: double (nullable = true)
 |-- count: double (nullable = true)



#### Exploration and cleaning of the temperature data 

We will map city full name to city port abbreviation and we will only use temperature values from United States. We also need to remove invalid ports.

In [16]:
temperature_df.count() # 8599212

8599212

In [17]:
temperature_df.limit(3).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E


In [18]:
# Create udf to map city full name to city port abbreviation

@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [19]:
# Clean temperature data

# Only use temperatures from United States
# Map full name to city port abbreviation
# Remove invalid ports
cleaned_temp_df = temperature_df.filter(temperature_df["Country"] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df["dt"])) \
    .withColumn("i94port", city_to_port(temperature_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

# Only use temperatures from 2013 (the latest year in the dataset)
cleaned_temp_df = cleaned_temp_df.filter(cleaned_temp_df["year"] == 2013)

staging_temp_df = cleaned_temp_df.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

print(staging_temp_df.count()) # 1044
staging_temp_df.limit(5).toPandas()

1044


Unnamed: 0,year,month,city_code,avg_temperature,lat,long
0,2013,4,COL,16.9,32.95N,85.21W
1,2013,1,DAB,0.5,39.38N,83.24W
2,2013,1,ONT,6.8,34.56N,116.76W
3,2013,2,POM,5.8,45.81N,123.46W
4,2013,5,PRO,14.3,42.59N,72.00W


In [20]:
staging_temp_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



#### Exploring and cleansing of demographics data

After getting an idea of the data by checking a sample data, we will convert some numeric values to percentages for future analyses. Additionaly, we will map full city name to city code. We will also create corresponding race columns by pivoting the race column.

In [21]:
demo_df.count() # number of demographics data is 2891

2891

In [22]:
# print a view of the data
demo_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [23]:
# Clean demographics data

# Calculate percentages of numeric demographic variables (e.g. Male Population, Female Population,...) and create new ones
cleaned_demo_df = demo_df.withColumn("median_age", demo_df['Median Age']) \
    .withColumn("pct_male_pop", (demo_df['Male Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demo_df['Female Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demo_df['Number of Veterans'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demo_df['Foreign-born'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_race", (demo_df['Count'] / demo_df['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(demo_df["City"])) \
    .dropna(how='any', subset=["city_code"])

cleaned_demo_df = cleaned_demo_df.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()

cleaned_demo_df.count() # 883

883

In [24]:
# Pivot the race column
pivot_demo_df = cleaned_demo_df.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_demo_df = pivot_demo_df.withColumn("city_code", city_to_port(pivot_demo_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

staging_demo_df = pivot_demo_df.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")
print(staging_demo_df.count()) # 180
staging_demo_df.limit(10).toPandas()

180


Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop
0,TUC,AZ,Tucson,33.6,49.8,50.2,7.2,7.2,4.6,4.6,6.4,43.5,76.1,531674
1,MCA,TX,Allen,37.2,52.3,47.7,3.6,3.6,0.2,16.1,13.4,10.8,71.2,98138
2,CRP,TX,Corpus Christi,35.0,49.5,50.5,7.7,7.7,0.9,2.8,4.6,61.9,90.3,324082
3,FMY,FL,Fort Myers,37.3,49.8,50.2,5.8,5.8,,4.8,23.4,24.1,67.8,74015
4,ORL,FL,Orlando,33.1,48.3,51.7,4.7,4.7,0.9,4.1,25.1,33.0,66.1,270917
5,LOS,CA,Los Angeles,35.0,49.3,50.7,2.2,2.2,1.6,12.9,10.2,48.8,54.8,3971896
6,PRO,RI,Providence,29.9,49.7,50.3,2.8,2.8,2.3,7.5,17.1,43.5,54.6,179204
7,CID,IA,Cedar Rapids,36.2,48.4,51.6,6.0,6.0,1.0,4.1,9.1,4.1,89.6,130405
8,SPI,IL,Springfield,38.8,47.2,52.8,6.4,6.4,1.4,3.3,21.5,2.3,77.2,117809
9,POM,OR,Portland,36.7,49.6,50.4,4.7,4.7,2.4,10.2,7.3,9.7,82.9,632187


In [25]:
staging_demo_df.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- pct_male_pop: double (nullable = true)
 |-- pct_female_pop: double (nullable = true)
 |-- pct_veterans: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_native_american: double (nullable = true)
 |-- pct_asian: double (nullable = true)
 |-- pct_black: double (nullable = true)
 |-- pct_hispanic_or_latino: double (nullable = true)
 |-- pct_white: double (nullable = true)
 |-- total_pop: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### Staging tables
`staging immigration table`
- id
- date
- city_code
- state_code
- age
- gender
- visa_type
- count
    
`staging temperature table`
 - year
 - month
 - city_code
 - city_name
 - avg_temperature
 - lat
 - long
    
`staging demographics table` 
 - city_code
 - state_code
 - city_name
 - median_age
 - pct_male_pop
 - pct_female_pop
 - pct_veterans
 - pct_foreign_born
 - pct_native_american
 - pct_asian
 - pct_black
 - pct_hispanic_or_latino
 - pct_white
 - total_pop
 
#### Dimension tables

`immigrant table`
- id
- gender
- age
- visa_type

`city table`
- city_code
- state_code
- city_name
- median_age
- pct_male_pop
- pct_female_pop
- pct_veterans
- pct_foreign_born
- pct_native_american
- pct_asian
- pct_black
- pct_hispanic_or_latino
- pct_white
- total_pop
- lat
- long


`monthly city temperature table`
- city_code
- year
- month
- avg_temperature

`time table`
- date
- dayofweek
- weekofyear
- month

#### Fact table

`immigration table`
- id
- state_code
- city_code
- date
- count



#### 3.2 Mapping Out Data Pipelines
The following ETL steps are necessary to pipeline the data:

1. Load immigration, demographics and temperature datasets.
2. Clean each dataset for invalid/incorrect records, missing values or duplicates etc.
3. Load staging tables for each dataset.
4. Create dimension tables for immigrant, city, city_temp and time. 
5. Create fact table for immigration.
6. Perform data quality checks by checking if each table has records in it.
7. Save dataframes in Parquet format.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [26]:
# Write code here

# Create dimension table for immigrant

immigrant_df = staging_i94_df.select("id", "gender", "age", "visa_type").drop_duplicates()
immigrant_df.count() # 2435922

2435922

In [27]:
immigrant_df.limit(5).toPandas() 

Unnamed: 0,id,gender,age,visa_type
0,270799.0,F,49.0,2.0
1,275758.0,M,50.0,2.0
2,445416.0,M,69.0,1.0
3,488015.0,M,6.0,2.0
4,503609.0,M,21.0,2.0


In [28]:
# Create dimension table for city

city_df = staging_demo_df.join(staging_temp_df, "city_code") \
    .select("city_code", "state_code", "city_name", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

In [29]:
city_df.count()

142

In [30]:
city_df.limit(5).toPandas()

Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop,lat,long
0,BRO,TX,Brownsville,30.6,47.7,52.3,2.3,2.3,0.6,0.9,0.7,92.5,95.0,183888,26.52N,96.72W
1,HSV,WI,Madison,30.7,49.2,50.8,3.9,3.9,0.9,9.6,8.2,7.9,82.1,248956,34.56N,85.62W
2,ATL,GA,Atlanta,33.8,48.3,51.7,4.0,4.0,1.0,5.2,52.9,4.0,42.3,463875,34.56N,83.68W
3,NEW,NJ,Newark,34.6,49.0,51.0,2.1,2.1,0.8,2.6,51.4,35.6,27.1,281913,40.99N,74.56W
4,NWH,CT,New Haven,29.9,48.9,51.1,2.0,2.0,1.7,6.1,33.3,33.4,43.2,130310,40.99N,72.43W


In [31]:
# Create dimension table for monthly city temperature

monthly_city_temp_df = staging_temp_df.select("city_code", "year", "month", "avg_temperature").drop_duplicates()
monthly_city_temp_df.count()
monthly_city_temp_df.limit(5).toPandas()

Unnamed: 0,city_code,year,month,avg_temperature
0,SAA,2013,6,18.6
1,PHI,2013,5,16.6
2,BOS,2013,5,14.3
3,BUR,2013,3,14.5
4,RNO,2013,2,4.7


In [32]:
# Create dimension table for time

time_df = staging_i94_df.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
time_df = time_df.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

In [33]:
time_df.count()

30

In [34]:
time_df.limit(5).toPandas()

Unnamed: 0,date,dayofweek,weekofyear,month
0,2016-04-23,7,16,4
1,2016-04-22,6,16,4
2,2016-04-08,6,14,4
3,2016-04-09,7,14,4
4,2016-04-26,3,17,4


In [35]:
# Create fact table for immigration

immigration_df = staging_i94_df.select("id", "state_code", "city_code", "date", "count").drop_duplicates()

In [36]:
immigration_df.count() #2435922

2435922

In [37]:
immigration_df.limit(5).toPandas()

Unnamed: 0,id,state_code,city_code,date,count
0,25716.0,CA,LOS,2016-04-01,1.0
1,56083.0,HI,HHW,2016-04-01,1.0
2,261977.0,FL,NYC,2016-04-02,1.0
3,290139.0,NY,NYC,2016-04-02,1.0
4,487570.0,HI,HHW,2016-04-03,1.0


In [38]:
# Write dimension tables to parquet
immigrant_df.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
city_df.write.mode("overwrite").partitionBy("state_code").parquet("cities")
monthly_city_temp_df.write.mode("overwrite").parquet("monthly_city_temperatues")
time_df.write.mode("overwrite").parquet("time")

# Dimension tables written

In [39]:
# Write immigration fact table to parquet
# immigration_df.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration") # this takes forever

# Since the first code takes forever, I uses the code given by a mentor in this link: https://knowledge.udacity.com/questions/566345
# dataFrame.coalesce(1).write.format("parquet").mode("overwrite").save("temp.parquet") # this is the example given by the mentor

# due to performance issues, the following code is used for testing purposes
# dataFrame.coalesce(1).write.format("parquet").mode("overwrite").save("temp.parquet")

immigration_df.coalesce(1).write.format("parquet").mode("overwrite").save("immigration") # this works fine for testing purposes

#### 4.2 Data Quality Checks
Here, we perform data quality checks by checking if the tables exist and if they contain records as expected. This will ensure the completeness of the data.

Run Quality Checks

In [40]:
# Perform quality checks here
# Check if the tables exist

def table_exists(df):
    if df is not None:
        return True
    else:
        return False
        
if table_exists(immigrant_df) & table_exists(city_df) & table_exists(monthly_city_temp_df) & table_exists(time_df) & table_exists(immigration_df):
    print("Data quality check successful!")
    print("Dimension tables and fact table exist!")
    print()
else:
    print("Data quality check failed!")
    print("Missing table!")

Data quality check successful!
Dimension tables and fact table exist!



In [41]:
# Check if the tables are empty or not. Tables must contain records.
def table_not_empty(df):
    return df.count() != 0 

if table_not_empty(immigrant_df) & table_not_empty(city_df) & table_not_empty(monthly_city_temp_df) & table_not_empty(time_df) & table_not_empty(immigration_df):
    print("Data quality check successful!")
    print("Dimension tables and fact table contain records!")
    print()
else:
    print("Data quality check failed!")
    print("Null values...")

Data quality check successful!
Dimension tables and fact table contain records!



#### 4.3 Data dictionary 
In the following, you can see the data table with its context and fields:

**Fact table `immigration`focuses on events/facts on immigration process and contains the following variables:** <br>
|-- id: id from sas file<br>
|-- state_code: of the arrival city<br>
|-- city_code: city port of arrival city<br>
|-- date: date of arrival in U.S.<br>
|-- count: count of immigrant's entries into the US<br>

**Dimension table `immigrant` focuses on people who immigrated and contains the following variables:**<br>
|-- id: immigrant's id<br>
|-- gender: immigrant's gender<br>
|-- age: immigrant's age<br>
|-- visa_type: immigrant's visa type<br>


**Dimension table `city` focuses on demographic character of the cities and contains the following variables:**<br>
|-- city_code: code of the city<br>
|-- state_code: code of the state<br>
|-- city_name: city's name<br>
|-- median_age: median age of the population living on that city<br>
|-- pct_male_pop: percentage of male population<br>
|-- pct_female_pop: percentage of female population<br>
|-- pct_veterans: percentage of veterans<br>
|-- pct_foreign_born: percentage of foreigners<br>
|-- pct_native_american: percentage of native americans<br>
|-- pct_asian: percentage of asian population<br>
|-- pct_black: percentage of black people<br>
|-- pct_hispanic_or_latino: percentage of hispanic or latino people<br>
|-- pct_white: percentage of white people<br>
|-- total_pop: total population<br>
|-- lat: latitude of the city<br>
|-- long: longitude of the city<br>


**Dimension table `monthly city temperature` focuses on temperature values on a particular city and contains the following variables:**<br>
|-- city_code: city port code<br>
|-- year: year <br>
|-- month: month of the year <br>
|-- avg_temperature: average temperature of a city in a given month<br>

**Dimension table `time` contains the following time related variables:**<br>
|-- date: date<br>
|-- dayofweek: day of the week<br>
|-- weekofyear: week of year<br>
|-- month: month<br>

#### Step 5: Complete Project Write Up

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.
  
We have chosen Spark as our data processing engine because Spark executes much faster by caching data in memory across multiple parallel operations. Therefore it is a good choice to process big data sets by splitting the work up into chunks and assigning those chunks accross computational resources. In order to read the data into a data frame, we have used Pandas library. The final tables are written in columnar format using Parquet.
 
The data update cycle depends on the availability of new data. Frequent update for such a big dataset might be a bit cumbersome, since collecting those infos is not that easy. Since immigration and temperature raw datasets built up monthly, we would recommend monthly update.

**If the data was increased by 100x:** First of all, we need to increase the performance. There are several ways to increase the Spark performance such as workload type, partitioning scheme, or memory consumption. We can start to increase number of executers and as a result, more worker nodes in cluster can process the data. 

**If the data populates a dashboard that must be updated on a daily basis by 7am every day:** We can use Apache Airflow to schedule a job at 7 am daily which initiates a Spark job/task. 

**If the database needed to be accessed by 100+ people:** In this case, our system should scale because of increased amount of users. Scalability means an ability to handle more users, clients, data, transactions, or requests without affecting the user experience by adding more resources. For that, we should host our system in a data warehouse on cloud infrastructure (e.g. Amazon Redshift) which provides larger capacity to serve mass consumption. 

#### Testing the results
Here we show that the chosen data model is appropriate for the identified purpose.

In [42]:
# Create tables e.g. local temporary views with the given data frames in order to run SQL queries
immigration_df.createOrReplaceTempView("fact_immigration")
city_df.createOrReplaceTempView("dim_city")
immigrant_df.createOrReplaceTempView("dim_immigrant")
monthly_city_temp_df.createOrReplaceTempView("dim_monthly_city_temp")
time_df.createOrReplaceTempView("dim_time")

In [50]:
# Example: Show me the cities with the most foreigners in it (greater than 10%)
spark.sql("""
    SELECT city_code, state_code, city_name, pct_foreign_born
    FROM dim_city
    WHERE pct_foreign_born > 10
    """).show()

+---------+----------+----------------+----------------+
|city_code|state_code|       city_name|pct_foreign_born|
+---------+----------+----------------+----------------+
|      COS|        CO|Colorado Springs|            10.8|
|      JAC|        NC|    Jacksonville|            12.3|
|      JAC|        NC|    Jacksonville|            12.3|
|      FAY|        NC|    Fayetteville|            13.9|
|      NOR|        VA|         Norfolk|            12.0|
|      COL|        GA|        Columbus|            10.8|
|      COL|        GA|        Columbus|            10.8|
+---------+----------+----------------+----------------+

