# Immigrants, demographics and temperature analysis 
### Data Engineering Capstone Project
#### Project Summary

This project is desinged to manipulate data from US immigration to discover popular cities for immigration. Second, demographic information to analyze the gender of the immigrants. Along with the visa types and median age. Lastly, we extracted data for the average temperature per city. 

#### 3 Dataset breakdown:

1) i94 dataset of 2016 
2) Kaggle - average temperature per city
3) OpenSoft - city demographic data 

* 3 different sources of data extraction are required

- Star Schema will be used. 

#### Data sources

The main dataset will include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data. 

1. **I94 Immigration Data**: The dataset contains data from 2016 from the U.S. National Tourism and Trade Office. [link](https://travel.trade.gov/research/reports/i94/historical/2016.html)

2. **World Temperature Data**: Comes from Kaggle and contains average weather temperatures by city. [link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

3. **U.S. City Demographic Data**: comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population. [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import os
import glob
import re
from datetime import datetime, timedelta
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, year, month, avg, round, dayofweek, weekofyear, isnull
from pyspark.sql.types import StringType, IntegerType

In [2]:
# Create Spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 

This project is desinged to manipulate data from US immigration to discover popular cities for immigration. Second, demographic information to analyze the gender of the immigrants. Along with the visa types and basic details of age. Lastly, we extracted data for the average temperature per city. 

#### Describe and Gather Data 

1. **I94 Immigration Data**: The dataset contains data from 2016 from the U.S. National Tourism and Trade Office. [link](https://travel.trade.gov/research/reports/i94/historical/2016.html)

2. **World Temperature Data**: Comes from Kaggle and contains average weather temperatures by city. [link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

3. **U.S. City Demographic Data**: comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population. [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

In [3]:
# Read in the data here
# immigration data

i94_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
i94_fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
i94_df = spark.read.format("com.github.saurfang.sas.spark").load(i94_fname)

In [4]:
i94_df.head()

Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2')

In [5]:
# Read temperature data
temperature_fname = "../../data2/GlobalLandTemperaturesByCity.csv"
temperature_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature_fname)

In [7]:
#Examine data 
temperature_df.head()

Row(dt='1743-11-01', AverageTemperature='6.068', AverageTemperatureUncertainty='1.7369999999999999', City='Århus', Country='Denmark', Latitude='57.05N', Longitude='10.33E')

In [8]:
# Read in demographics data
demo_fname = "us-cities-demographics.csv"
demo_df = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(demo_fname)

In [9]:
#Examine data 
demo_df.head()

Row(City='Silver Spring', State='Maryland', Median Age='33.8', Male Population='40601', Female Population='41862', Total Population='82463', Number of Veterans='1562', Foreign-born='30908', Average Household Size='2.6', State Code='MD', Race='Hispanic or Latino', Count='25924')

In [10]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps - Clean the following from each dataset

 - Immigration data -remove missing values / extract states / convert to PySpark format

 - Temperature data - Only US temp. / Full name to city port / remove invalid ports
 
 - Clean demographics data - Calculate percentages of numeric columns / create new one

# Performing exploring / cleaning tasks here

## Immigrations Data Set 

In [11]:
# Check count
i94_df.count()

3096313

In [14]:
#Examine columns and data
i94_df.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [12]:
# Create list of valid ports
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_compiled = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_compiled.search(line)
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))
#print(valid_ports)

659


In [13]:
# Create list of valid states
valid_states = demo_df.toPandas()["State Code"].unique()
print(len(valid_states))
print(valid_states)

49
['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA' 'NV'
 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY' 'SC' 'LA'
 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID' 'IN' 'AK' 'MS'
 'HI' 'SD' 'ME' 'MT']


In [14]:
# Create udf to convert SAS date to PySpark date 
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [15]:
# Create udf to validate state
@udf(StringType())
def validate_state(x):  
    if x in valid_states:
        return x
    return 'other'

In [17]:
# Clean immigration data - remove missing values / extract states / convert to PySpark format 

# Remove any missing values
cleaned_i94_df = i94_df.dropna(how="any", subset=["i94port", "i94addr", "gender"])

# Extract valid states 
cleaned_i94_df = cleaned_i94_df.withColumn("i94addr", validate_state(cleaned_i94_df.i94addr))

# Convert arrival_date (SAS format) to PySpark format
cleaned_i94_df = cleaned_i94_df.withColumn("arrdate", convert_datetime(cleaned_i94_df.arrdate))

# only keep us related immigration data
cleaned_i94_df = cleaned_i94_df.filter(cleaned_i94_df.i94addr != 'other')

staging_i94_df = cleaned_i94_df.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("date"),
                                       col("i94port").alias("city_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()

#Examine data
staging_i94_df.limit(10).toPandas()

Unnamed: 0,id,date,city_code,state_code,age,gender,visa_type,count
0,168.0,2016-04-01,WAS,DC,34.0,M,2.0,1.0
1,383.0,2016-04-01,MIA,FL,40.0,M,2.0,1.0
2,608.0,2016-04-01,TOR,TX,45.0,M,1.0,1.0
3,930.0,2016-04-01,NEW,NY,49.0,F,2.0,1.0
4,1229.0,2016-04-01,NYC,CT,32.0,M,1.0,1.0
5,1238.0,2016-04-01,NYC,FL,10.0,M,2.0,1.0
6,1333.0,2016-04-01,NYC,NY,58.0,F,2.0,1.0
7,1338.0,2016-04-01,NYC,NY,52.0,F,2.0,1.0
8,1422.0,2016-04-01,NYC,NY,45.0,M,2.0,1.0
9,1581.0,2016-04-01,LOS,CA,33.0,M,2.0,1.0


In [18]:
#Show Schema of i94 data
staging_i94_df.printSchema()

root
 |-- id: double (nullable = true)
 |-- date: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- age: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: double (nullable = true)
 |-- count: double (nullable = true)



# Performing exploring / cleaning tasks here

## Temperature Data Set 

In [21]:
#Explore count
temperature_df.count()

8599212

In [24]:
temperature_df.limit(10).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [25]:
# Create udf to map city full name to city port abbreviation

@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

In [26]:
# Clean temperature data - Only US temp. / Full name to city port / remove invalid ports

cleaned_temp_df = temperature_df.filter(temperature_df["Country"] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df["dt"])) \
    .withColumn("i94port", city_to_port(temperature_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])

# Only use temperatures from 2013 (the latest year in the dataset)
cleaned_temp_df = cleaned_temp_df.filter(cleaned_temp_df["year"] == 2013)

staging_temp_df = cleaned_temp_df.select(col("year"), col("month"), col("i94port").alias("city_code"),
                                         round(col("AverageTemperature"), 1).alias("avg_temperature"),
                                         col("Latitude").alias("lat"), col("Longitude").alias("long")).drop_duplicates()

print(staging_temp_df.count())
staging_temp_df.limit(10).toPandas()

1044


Unnamed: 0,year,month,city_code,avg_temperature,lat,long
0,2013,4,COL,16.9,32.95N,85.21W
1,2013,1,DAB,0.5,39.38N,83.24W
2,2013,1,ONT,6.8,34.56N,116.76W
3,2013,2,POM,5.8,45.81N,123.46W
4,2013,5,PRO,14.3,42.59N,72.00W


In [29]:
#Explore schema of temp data 
staging_temp_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- city_code: string (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)



# Performing exploring / cleaning tasks here

## Demographic Dataset 

In [30]:
#Explore count 
demo_df.count()

2891

In [31]:
demo_df.limit(10).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [32]:
# Clean demographics data - Calculate percentages of numeric columns / create new one
cleaned_demo_df = demo_df.withColumn("median_age", demo_df['Median Age']) \
    .withColumn("pct_male_pop", (demo_df['Male Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_female_pop", (demo_df['Female Population'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_veterans", (demo_df['Number of Veterans'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_foreign_born", (demo_df['Foreign-born'] / demo_df['Total Population']) * 100) \
    .withColumn("pct_race", (demo_df['Count'] / demo_df['Total Population']) * 100) \
    .withColumn("city_code", city_to_port(demo_df["City"])) \
    .dropna(how='any', subset=["city_code"])

cleaned_demo_df = cleaned_demo_df.select(col("City").alias("city_name"), col("State Code").alias("state_code"), 
                         "median_age", "pct_male_pop", "pct_female_pop","pct_veterans", 
                         "pct_foreign_born", col("Total Population").alias("total_pop"), 
                         col("Race").alias("race"), "pct_race").drop_duplicates()

cleaned_demo_df.count()

883

In [33]:
# Pivot the race column
pivot_demo_df = cleaned_demo_df.groupBy("city_name", "state_code", "median_age", "pct_male_pop",
                                        "pct_female_pop","pct_veterans", "pct_foreign_born", "total_pop").pivot("Race").avg("pct_race")

pivot_demo_df = pivot_demo_df.withColumn("city_code", city_to_port(pivot_demo_df["city_name"])) \
    .dropna(how='any', subset=["city_code"])

staging_demo_df = pivot_demo_df.select("city_code", "state_code", "city_name", "median_age",
                                    round(col("pct_male_pop"), 1).alias("pct_male_pop"),
                                    round(col("pct_female_pop"), 1).alias("pct_female_pop"),
                                    round(col("pct_veterans"), 1).alias("pct_veterans"),
                                    round(col("pct_veterans"), 1).alias("pct_foreign_born"),
                                    round(col("American Indian and Alaska Native"), 1).alias("pct_native_american"),
                                    round(col("Asian"), 1).alias("pct_asian"),
                                    round(col("Black or African-American"), 1).alias("pct_black"),
                                    round(col("Hispanic or Latino"), 1).alias("pct_hispanic_or_latino"),
                                    round(col("White"), 1).alias("pct_white"), "total_pop")
print(staging_demo_df.count())
staging_demo_df.limit(10).toPandas()

180


Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop
0,TUC,AZ,Tucson,33.6,49.8,50.2,7.2,7.2,4.6,4.6,6.4,43.5,76.1,531674
1,MCA,TX,Allen,37.2,52.3,47.7,3.6,3.6,0.2,16.1,13.4,10.8,71.2,98138
2,CRP,TX,Corpus Christi,35.0,49.5,50.5,7.7,7.7,0.9,2.8,4.6,61.9,90.3,324082
3,FMY,FL,Fort Myers,37.3,49.8,50.2,5.8,5.8,,4.8,23.4,24.1,67.8,74015
4,ORL,FL,Orlando,33.1,48.3,51.7,4.7,4.7,0.9,4.1,25.1,33.0,66.1,270917
5,LOS,CA,Los Angeles,35.0,49.3,50.7,2.2,2.2,1.6,12.9,10.2,48.8,54.8,3971896
6,PRO,RI,Providence,29.9,49.7,50.3,2.8,2.8,2.3,7.5,17.1,43.5,54.6,179204
7,CID,IA,Cedar Rapids,36.2,48.4,51.6,6.0,6.0,1.0,4.1,9.1,4.1,89.6,130405
8,SPI,IL,Springfield,38.8,47.2,52.8,6.4,6.4,1.4,3.3,21.5,2.3,77.2,117809
9,POM,OR,Portland,36.7,49.6,50.4,4.7,4.7,2.4,10.2,7.3,9.7,82.9,632187


In [36]:
#explore schema of demo data
staging_demo_df.printSchema()

root
 |-- city_code: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- pct_male_pop: double (nullable = true)
 |-- pct_female_pop: double (nullable = true)
 |-- pct_veterans: double (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_native_american: double (nullable = true)
 |-- pct_asian: double (nullable = true)
 |-- pct_black: double (nullable = true)
 |-- pct_hispanic_or_latino: double (nullable = true)
 |-- pct_white: double (nullable = true)
 |-- total_pop: string (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

- Reasoning to why I used a Star Schema - 1) All of the data connects through the fact table. 2) The multiple dimension tables are treated as one large table of information. 3) It makes queries simpler, quicker and easier to perform. 

- Closure look at the Schema(Staging / Dimensions / Fact tables)

#### Staging Tables 

staging_i94_df
   - id
   - date
   - city_code
   - state_code
   - age
   - gender
   - visa_type
   - count

staging_temp_df
   - year
   - month
   - city_code
   - city_name
   - avg_temperature
   - lat
   - long

staging_demo_df
   - city_code
   - state_code
   - city_name
   - median_age
   - pct_male_pop
   - pct_female_pop
   - pct_veterans
   - pct_foreign_born
   - pct_native_american
   - pct_asian
   - pct_black
   - pct_hispanic_or_latino
   - pct_white
   - total_pop
    
#### Dimension Tables 

immigrant_df
   - id
   - gender
   - age
   - visa_type

city_df
   - city_code
   - state_code
   - city_name
   - median_age
   - pct_male_pop
   - pct_female_pop
   - pct_veterans
   - pct_foreign_born
   - pct_native_american
   - pct_asian
   - pct_black
   - pct_hispanic_or_latino
   - pct_white
   - total_pop
   - lat
   - long

monthly_city_temp_df
   - city_code
   - year
   - month
   - avg_temperature

time_df
   - date
   - dayofweek
   - weekofyear
   - month
    
#### Fact Tables 

immigration_df
   - id
   - state_code
   - city_code
   - date
   - count

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Clean the data on nulls, data types, duplicates, etc
2. Load staging tables for staging_i94_df, staging_temp_df and staging_demo_df
3. Create dimension tables for immigrant_df, city_df, monthly_city_temp_df and time_df
4. Create fact table immigration_df with information on immigration count, mapping id in immigrant_df, city_code in city_df and monthly_city_temp_df and date in time_df   ensuring referential integrity
5. Save processed dimension and fact tables in parquet for downstream query



### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [19]:
#Dimension table for immigrant Data

immigrant_df = staging_i94_df.select("id", "gender", "age", "visa_type").drop_duplicates()

In [20]:
#check count
immigrant_df.count()

2435922

In [21]:
immigrant_df.limit(10).toPandas()

KeyboardInterrupt: 

In [45]:
#Dimension table for city 

city_df = staging_demo_df.join(staging_temp_df, "city_code") \
    .select("city_code", "state_code", "city_name", "median_age", "pct_male_pop", "pct_female_pop", "pct_veterans",
           "pct_foreign_born", "pct_native_american", "pct_asian", "pct_black",
           "pct_hispanic_or_latino", "pct_white", "total_pop", "lat", "long").drop_duplicates()

In [46]:
#Check count
city_df.count()

142

In [48]:
city_df.limit(10).toPandas()

Unnamed: 0,city_code,state_code,city_name,median_age,pct_male_pop,pct_female_pop,pct_veterans,pct_foreign_born,pct_native_american,pct_asian,pct_black,pct_hispanic_or_latino,pct_white,total_pop,lat,long
0,BRO,TX,Brownsville,30.6,47.7,52.3,2.3,2.3,0.6,0.9,0.7,92.5,95.0,183888,26.52N,96.72W
1,HSV,WI,Madison,30.7,49.2,50.8,3.9,3.9,0.9,9.6,8.2,7.9,82.1,248956,34.56N,85.62W
2,ATL,GA,Atlanta,33.8,48.3,51.7,4.0,4.0,1.0,5.2,52.9,4.0,42.3,463875,34.56N,83.68W
3,NEW,NJ,Newark,34.6,49.0,51.0,2.1,2.1,0.8,2.6,51.4,35.6,27.1,281913,40.99N,74.56W
4,NWH,CT,New Haven,29.9,48.9,51.1,2.0,2.0,1.7,6.1,33.3,33.4,43.2,130310,40.99N,72.43W


In [51]:
#Dimension table for monthly city temperature 

monthly_city_temp_df = staging_temp_df.select("city_code","year","month","avg_temperature",).drop_duplicates()

In [53]:
#Check count
monthly_city_temp_df.count()

1043

In [54]:
monthly_city_temp_df.limit(10).toPandas()

Unnamed: 0,city_code,year,month,avg_temperature
0,SAA,2013,6,18.6
1,PHI,2013,5,16.6
2,BOS,2013,5,14.3
3,BUR,2013,3,14.5
4,RNO,2013,2,4.7


In [57]:
#Dimension table for time 


time_df = staging_i94_df.withColumn("dayofweek", dayofweek("date"))\
                .withColumn("weekofyear", weekofyear("date"))\
                .withColumn("month", month("date"))
                        
time_df = time_df.select("date", "dayofweek", "weekofyear", "month").drop_duplicates()

In [58]:
#Check count
time_df.count()

30

In [59]:
time_df.limit(10).toPandas()

Unnamed: 0,date,dayofweek,weekofyear,month
0,2016-04-23,7,16,4
1,2016-04-22,6,16,4
2,2016-04-08,6,14,4
3,2016-04-09,7,14,4
4,2016-04-26,3,17,4


In [60]:
# create fact table for immigration 

immigration_df = staging_i94_df.select("id","state_code","city_code", "date","count").drop_duplicates()

In [61]:
#Check count
immigration_df.count()

2435922

In [62]:
immigration_df.limit(10).toPandas()

Unnamed: 0,id,state_code,city_code,date,count
0,25716.0,CA,LOS,2016-04-01,1.0
1,56083.0,HI,HHW,2016-04-01,1.0
2,261977.0,FL,NYC,2016-04-02,1.0
3,290139.0,NY,NYC,2016-04-02,1.0
4,487570.0,HI,HHW,2016-04-03,1.0


In [65]:
#Dimension table to parquet
immigrant_df.write.mode("overwrite").partitionBy("gender", "age").parquet("immigrants")
city_df.write.mode("overwrite").partitionBy("state_code").parquet("cities")
monthly_city_temp_df.write.mode("overwrite").parquet("monthly_city_temperatues")
time_df.write.mode("overwrite").parquet("time")

# Write fact table to parquet
immigration_df.write.mode("overwrite").partitionBy("state_code", "city_code").parquet("immigration")

KeyboardInterrupt: 

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def table_exists(df):
    if df is not None:
        return True
    else:
        return False
        
if table_exists(immigrant_df) & table_exists(city_df) & table_exists(monthly_city_temp_df) & table_exists(time_df) & table_exists(immigration_df):
    print("successful quality check")
    print("dimension and fact tables exist")
    print()
else:
    print("unsuccessful quality check")
    print("check table data again")

In [None]:
# Perform quality checks here
def table_not_empty(df):
    return df.count() != 0 

if table_not_empty(immigrant_df) & table_not_empty(city_df) & table_not_empty(monthly_city_temp_df) & table_not_empty(time_df) & table_not_empty(immigration_df):
    print("successful quality check")
    print("dimension and fact tables have records")
    print()
else:
    print("unsuccessful quality check")
    print("check null records again")

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Overview of data - Dimension and Fact tables:


immigrant_df
   - id: id of immigrant
   - gender: gender of immigrant
   - age: age of immigrant
   - visa_type: immigrant's visa type

city_df
   - city_code: city port code
   - state_code: state code of the city
   - city_name: name of the city
   - median_age: median age of the city
   - pct_male_pop: city's male population in percentage
   - pct_female_pop: city's female population in percentage
   - pct_veterans: city's veteran population in percentage
   - pct_foreign_born: city's foreign born population in percentage
   - pct_native_american: city's native american population in percentage
   - pct_asian: city's asian population in percentage
   - pct_black: city's black population in percentage
   - pct_hispanic_or_latino: city's hispanic or latino population in percentage
   - pct_white: city's white population in percentage
   - total_pop: city's total population
   - lat: latitude of the city
   - long: longitude of the city


monthly_city_temp_df
   - city_code: city port code
   - year: year
   - month: month 
   - avg_temperature: average temperature in city for given month

time_df
   - date: date
   - dayofweek: day of the week
   - weekofyear: week of year
   - month: month
    
### Fact Table 

immigration_df
   - id: id
   - state_code: state code of arrival city
   - city_code: city port code of arrival city
   - date: date of arrival
   - count: count of immigrant's entries into the US





#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.  
 * The database needed to be accessed by 100+ people.
 
 
 
#### Final project conclusion and overview of chosen methods for data manipulation 


Q1:  Clearly state the rationale for the choice of tools and technologies for the project.

A: Spark quickly performs processing tasks on large databases. It can also perform these tasks on different data formats - CSV, SAS and Parquet (required in this project 2 different types). Spark SQL was used to manipulate the files into dataframes and create tables with join operations. 

-------------------

Q2:  Propose how often the data should be updated and why.

A:   This particular type of data set some data could be pulled only when need and the some could be pulled on a monthly basis. For example the monthly city temperature can be pulled on a monthly basis as new data will be available. Where as the immigration dataset would be done later as it requires more time to collect that data(yearly bases). 


-------------------

#### Write a description of how you would approach the problem differently under the following scenarios:

Q3: The data was increased by 100x.

A:  With an increase in data size storage space and accessibility are important. Using Amazon Redshift or EC2 hosting Spark will give enough storage to hold the data sets. 


-------------------

Q4: The data populates a dashboard that must be updated on a daily basis by 7am every day.

A:  Airflow can be automated to schedule regular data tasks(e.g at 7am daily). Setting up the DAG can be done to accomplish a variety of tasks and notification on particular data set. 


-------------------

Q5: The database needed to be accessed by 100+ people.

A: Data warehouse in the cloud could be a possible solution as it gives the data access to more users with larger server capacity and good read performance. 