# Data Engineering Capstone Project

#### Project Summary
The purpose of the data engineering capstone project is to demonstrate what I've've learned throughout the data engineering nanodegree program. 


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
# import configparser
# import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, countDistinct
from pyspark.sql import functions as F
from pyspark.sql.functions import avg, isnan, when, count, col, udf, dayofmonth, dayofweek, month, year, weekofyear, monotonically_increasing_id
import psycopg2
import datetime as dt

In [2]:
# Intended to write parquet to s3 bucket but cannot do due to not having any more credit therefore done to local tables folder below

# config = configparser.ConfigParser()
# config.read('dl.cfg')

# os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

### Step 1: Scope the Project and Gather Data

**Scope**

There will be 1 fact table with 5 dimension tables processed from the datasets below. This data model will be used for a further data analysis using SQL in order to see the relationship between the people immigrating to the US, the demmographics of the state / city they immigrated to and the average country average temprature.
The main dataset is data on immigration to the United States, and supplementary datasets used are U.S. city demographics, and temperature data.

Once the data is processed analytical queries can be executed on immigrate data, demographic data of city and state and average temprature.


**Data Used:**

- I94 Immigration Data
- Temperature Data
- Demographics Data


**Tools Used:**

- AWS
    - Redshift
- Python
    - PySpark
- SQL



**I94 Immigration Data**

This data comes from National Travel and Trourism Office(NTTO). The subject of the data is the immigrants going to the U.S. and the information is gives are where they come from, birth year, gender, visa type, etc.

**Temperature Data**

This data comes from Kaggle, and it shows the temperature in different cities around the world. It's recorded monthly, and the values are the average temperature of that month.

**US Cities Demographics Data**

This data comes from OpenSoft, and it shows the demographics of the cities in the U.S. including the population, median age, the state the city belongs to, etc.

In [3]:
# Read in the data here
# fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
# df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [4]:
# df.head()

In [5]:
# df.columns

In [6]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_immigration = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [7]:
df_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [8]:
df_immigration.count()

3096313

## US-CITIES-DEMOGRAPHICS

In [9]:
fname = './us-cities-demographics.csv'
#df_demo = pd.read_csv(fname, delimiter=';')
df_demo = spark.read.csv(fname, inferSchema=True, header=True, sep=';')

In [10]:
df_demo.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


In [11]:
df_demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [12]:
df_demo.count()

2891

## Temperature Data

In [13]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
# df_temp = pd.read_csv(fname)
df_temp = spark.read.csv(fname, header=True, inferSchema=True)

In [14]:
df_temp.show(n=5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [15]:
df_temp.count()

8599212

In [16]:
df_temp.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



### Step 2: Explore and Assess the Data

### Clean Immigration Dataset 
Identify data quality issues, like missing values, duplicate data, etc.

In [17]:
# drop duplicate entries
df_immigration = df_immigration.dropDuplicates(['cicid'])

In [18]:
df_immigration.count()

3096313

- no duplicates

In [19]:
# drop rows with missing values
df_immigration = df_immigration.dropna(how='all', subset=['cicid'])

In [20]:
df_immigration.count()

3096313

- no missing id records

### Clean Demographic Dataset
Identify data quality issues, like missing values, duplicate data, etc.

In [21]:
df_demo = df_demo.dropna(subset=['Male Population','Female Population','Number of Veterans','Foreign-born','Average Household Size'])

In [22]:
df_demo.count()

2875

- 16 records dropped with missing values

In [23]:
df_demo = df_demo.drop_duplicates(subset=['City', 'State', 'State Code', 'Race'])

In [24]:
df_demo.count()

2875

- no duplicates

### Clean Temprature Dataset
Identify data quality issues, like missing values, duplicate data, etc.

In [25]:
# temperature data
df_temp.dropna()
df_temp = df_temp.dropna(subset=['AverageTemperature'])

In [26]:
df_temp.count()

8235082

- 364,130 records dropped with missing average temprature

In [27]:
# drop duplicate rows
df_temp = df_temp.drop_duplicates(subset=['dt', 'City', 'Country'])

In [28]:
df_temp.count()

8190783

- 44,299 duplicate records dropped

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model



<img src="../capstone image.PNG" />

- The immigration fact table is the heart of the data model. This table's data comes from the immigration data sets and contains keys that links to the dimension tables. The data dictionary of the immigration dataset contains detailed information on the data that makes up the fact table.
- The visa table holds dimension data related to the immigration
- The person table holds dimension data related to the person
- The travel table holds dimension data related to the travel
- The demo table holds demographic information about the state which the person is residing in
    - This links to the immigration data by state code
- The temp table holds average temprature for each country  
    - This links to the immigration data by country code



#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### The pipeline steps are as follows:

- Load the immigration dataset.
- Load the demographic dataset.
- Load the temprature dataset.
- Clean the immigration dataset (drop duplicates / missing data).
- Clean the demographic dataset (drop duplicates / missing data).
- Clean the temprature dataset (drop duplicates / missing data).
- Transform into fact and dimension tables.
- Convert dates and rename columns.
- Aggregate temprature data.
- Merge on country code to temprature data to be able to join.
- Parquet datasets into S3 bucket.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [29]:
def transf_immigration(df):
    # create a udf to convert arrival date in SAS format to datetime object
    get_datetime = udf(lambda x: (dt.datetime(1960, 1, 1).date() + dt.timedelta(x)).isoformat() if x else None)
    
    # convert arrival date into datetime object
    df = df.withColumn("arrdate", get_datetime(df.arrdate))  
    df = df.withColumn("depdate", get_datetime(df.depdate))    
    return df

In [30]:
fact_immigration = df_immigration[['cicid', 'i94yr', 'i94mon', 'i94port', 'arrdate', 'i94addr', 'i94res', 'depdate','dtadfile','occup', 'entdepa', 'entdepd', 'entdepu','matflag','dtaddto' ]]
#rename
fact_immigration = fact_immigration.withColumnRenamed('cicid','record_id') \
            .withColumnRenamed('i94res', 'country_res_code') \
            .withColumnRenamed('i94addr', 'state_code') \
            .withColumnRenamed('i94port', 'port') \
            .withColumnRenamed('i94yr', 'year') \
            .withColumnRenamed('i94mon', 'month') 

#Convert Dates
fact_immigration = transf_immigration(fact_immigration)

fact_immigration.limit(5).toPandas()

Unnamed: 0,record_id,year,month,port,arrdate,state_code,country_res_code,depdate,dtadfile,occup,entdepa,entdepd,entdepu,matflag,dtaddto
0,299.0,2016.0,4.0,NYC,2016-04-01,NY,103.0,2016-04-06,20160401,,O,O,,M,6292016
1,305.0,2016.0,4.0,NYC,2016-04-01,NY,103.0,2016-04-11,20160401,,O,O,,M,6292016
2,496.0,2016.0,4.0,CHI,2016-04-01,IL,103.0,2016-04-04,20160401,,O,O,,M,6292016
3,558.0,2016.0,4.0,SFR,2016-04-01,CA,103.0,2016-04-03,20160401,,G,O,,M,6292016
4,596.0,2016.0,4.0,NAS,2016-04-01,FL,103.0,2016-04-03,20160401,,G,N,,M,6292016


In [31]:
dim_travel = df_immigration[['cicid', 'airline', 'i94mode', 'admnum', 'fltno',]]
dim_travel = dim_travel.withColumnRenamed('cicid','record_id') \
            .withColumnRenamed('i94mode', 'transport_mode') \
            .withColumnRenamed('admnum', 'admission_num')


dim_travel.limit(5).toPandas()

Unnamed: 0,record_id,airline,transport_mode,admission_num,fltno
0,299.0,OS,1.0,55425870000.0,87
1,305.0,OS,1.0,55425820000.0,87
2,496.0,OS,1.0,55428620000.0,65
3,558.0,LH,1.0,55433310000.0,454
4,596.0,UP,1.0,55406110000.0,221


In [32]:
dim_visa = df_immigration[['cicid', 'i94visa', 'visapost', 'visatype']]
dim_visa = dim_visa.withColumnRenamed('cicid','record_id') \
                   .withColumnRenamed('i94visa', 'visa_cat') 

# drop data with no visa info
dim_visa.dropna(subset=['visa_cat'])
dim_visa.limit(5).toPandas()

Unnamed: 0,record_id,visa_cat,visapost,visatype
0,299.0,2.0,,WT
1,305.0,2.0,,WT
2,496.0,1.0,,WB
3,558.0,1.0,,WB
4,596.0,2.0,,WT


In [33]:
dim_personal = df_immigration[['cicid', 'i94cit',  'i94bir', 'count', 'biryear', 'gender', 'insnum']]
dim_personal = dim_personal.withColumnRenamed('cicid','record_id') \
            .withColumnRenamed('i94cit', 'birth_country') \
            .withColumnRenamed('i94bir', 'age') \
            .withColumnRenamed('biryear', 'birth_year') 

dim_personal.limit(5).toPandas()

Unnamed: 0,record_id,birth_country,age,count,birth_year,gender,insnum
0,299.0,103.0,54.0,1.0,1962.0,,
1,305.0,103.0,63.0,1.0,1953.0,,
2,496.0,103.0,64.0,1.0,1952.0,,
3,558.0,103.0,42.0,1.0,1974.0,M,
4,596.0,103.0,24.0,1.0,1992.0,M,


In [34]:
dim_demographics = df_demo[['City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code', 'Race']]
# dim_demographics.columns = ['city', 'state', 'median_age', 'male_population', 'female_population', 'total_population', 'num_veterans', 'foreign_born', 'average_household_size', 'state_code', 'race']

dim_demographics = dim_demographics.withColumnRenamed('Median Age','median_age') \
            .withColumnRenamed('Male Population', 'male_population') \
            .withColumnRenamed('Female Population', 'female_population') \
            .withColumnRenamed('Total Population', 'total_population') \
            .withColumnRenamed('Number of Veterans', 'number_of_veterans') \
            .withColumnRenamed('Foreign-born', 'foreign_born') \
            .withColumnRenamed('Average Household Size', 'average_household_size') \
            .withColumnRenamed('State Code', 'state_code')

dim_demographics.limit(5).toPandas()

Unnamed: 0,City,State,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,Race
0,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White
1,Wilmington,North Carolina,35.5,52346,63601,115947,5908,7401,2.24,NC,Asian
2,Tampa,Florida,35.3,175517,193511,369028,20636,58795,2.47,FL,Hispanic or Latino
3,Gastonia,North Carolina,36.9,35527,39023,74550,3537,5715,2.67,NC,Asian
4,Tyler,Texas,33.9,50422,53283,103705,4813,8225,2.59,TX,American Indian and Alaska Native


In [35]:
dim_temp = df_temp[['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country']]
dim_temp.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country
0,1760-01-01,-4.787,5.421,Århus,Denmark
1,1759-12-01,2.315,5.97,Çorlu,Turkey
2,1768-04-01,10.641,3.148,Çorlu,Turkey
3,1817-04-01,8.482,2.367,Çorlu,Turkey
4,1779-08-01,19.461,2.876,Çorum,Turkey


In [36]:
#create average for country
dim_temp.createOrReplaceTempView("dim_temp_avg")
dim_agg_temp = spark.sql("""
                        SELECT 
                        AVG(AverageTemperature) AS Country_Avg_Temprature,
                        Country
                        FROM dim_temp_avg
                        GROUP BY Country
                        """)

In [37]:
from pyspark.sql.functions import trim

dim_agg_temp = dim_agg_temp.withColumn("Country", trim(col("Country")))
dim_agg_temp.limit(5).toPandas()

Unnamed: 0,Country_Avg_Temprature,Country
0,27.189829,Chad
1,3.347268,Russia
2,22.784014,Paraguay
3,25.768408,Yemen
4,25.984177,Senegal


In [38]:
#need to get country code added onto tables
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [39]:
countries = contents[9:298]
country = [x.strip().split('=') for x in countries]
country_code = [x[0].replace("'","") for x in country]
country_name = [x[1].replace("'","") for x in country]
df_country = pd.DataFrame({'country_code':country_code, 'country_name':country_name})
df_country["country_name"]= df_country["country_name"].str.title()

sparkcountryDF=spark.createDataFrame(df_country) 

In [40]:
sparkcountryDF = sparkcountryDF.withColumn("country_name", trim(col("country_name")))
sparkcountryDF.limit(5).toPandas()

Unnamed: 0,country_code,country_name
0,582,"Mexico Air Sea, And Not Reported (I-94, No Lan..."
1,236,Afghanistan
2,101,Albania
3,316,Algeria
4,102,Andorra


In [41]:
#create average for country
dim_agg_temp.createOrReplaceTempView("aggtem")
sparkcountryDF.createOrReplaceTempView("sparkcountry")
dim_agg_temp = spark.sql("""
                        SELECT
                        aggtem.Country_Avg_Temprature,
                        aggtem.Country,
                        sparkcountry.country_code,
                        sparkcountry.country_name
                        FROM aggtem 
                        JOIN sparkcountry 
                        on aggtem.Country = sparkcountry.country_name
                        """)

In [42]:
dim_agg_temp.limit(5).toPandas()

Unnamed: 0,Country_Avg_Temprature,Country,country_code,country_name
0,27.189829,Chad,384,Chad
1,22.784014,Paraguay,693,Paraguay
2,3.347268,Russia,158,Russia
3,25.768408,Yemen,216,Yemen
4,25.984177,Senegal,391,Senegal


In [43]:
dim_agg_temp = dim_agg_temp[['Country', 'country_code', 'Country_Avg_Temprature']]

In [44]:
dim_agg_temp.limit(5).toPandas()

Unnamed: 0,Country,country_code,Country_Avg_Temprature
0,Chad,384,27.189829
1,Paraguay,693,22.784014
2,Russia,158,3.347268
3,Yemen,216,25.768408
4,Senegal,391,25.984177


In [45]:
# Intended to write parquet to s3 bucket but cannot do due to not having any more credit therefore done to local tables folder below

# output_data = "s3a://1stawbucket/"
# # write users table to parquet files
# fact_immigration.write.mode('overwrite').partitionBy("year").parquet(output_data+'fact_immigration')
# dim_travel.write.mode('overwrite').partitionBy("airline").parquet(output_data+'dim_travel') 
# dim_visa.write.mode('overwrite').partitionBy("visa_cat").parquet(output_data+'dim_visa') 
# dim_personal.write.mode('overwrite').partitionBy("birth_year").parquet(output_data+'dim_personal') 
# dim_demographics.write.mode('overwrite').parquet(output_data+'dim_demographics') 
# dim_agg_temp.write.mode('overwrite').parquet(output_data+'dim_agg_temp') 

In [46]:
#Parquet data function
def parquetfiles(df, partition, dfstr):
    output_data = "tables/"
    
    if partition == None:
        df.write.mode('overwrite').parquet(output_data+dfstr)
    else:
        df.write.mode('overwrite').partitionBy(partition).parquet(output_data+dfstr)

In [47]:
parquetfiles(fact_immigration,"year",'fact_immigration')
parquetfiles(dim_travel,"airline",'dim_travel')
parquetfiles(dim_visa,"visa_cat",'dim_visa')
parquetfiles(dim_personal,"birth_year",'dim_personal')
parquetfiles(dim_agg_temp,None,'dim_agg_temp')
parquetfiles(dim_demographics,None,'dim_demographics')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [48]:
def quality_checks(df, table_name):
    """
    Count checks on fact and dimension table to ensure completeness of data and there are no empty datasets
    """
    total_count = df.count()

    if total_count == 0:
        print(f"Data quality check failed : {table_name} with zero records")
    else:
        print(f"Data quality check passed : {table_name} with {total_count:,} records.")
    return 0

In [49]:
# Perform quality checks here
table_dfs = {
    'immigration_fact': fact_immigration,
    'dim_travel': dim_travel,
    'dim_personal': dim_personal,
    'dim_visa': dim_visa,   
    'dim_demographics': dim_demographics,
    'dim_agg_temp': dim_agg_temp
}
for table_name, table_df in table_dfs.items():
    # check number of records for all tables
    quality_checks(table_df, table_name)


Data quality check passed : immigration_fact with 3,096,313 records.
Data quality check passed : dim_travel with 3,096,313 records.
Data quality check passed : dim_personal with 3,096,313 records.
Data quality check passed : dim_visa with 3,096,313 records.
Data quality check passed : dim_demographics with 2,875 records.
Data quality check passed : dim_agg_temp with 149 records.


In [50]:
def dupcheck(df,tname,pk):
    """
    Count checks on fact and dimension table to ensure completeness of data,
    generally the count distinct should be the same as the number of records in the table but it isn't always the case.
    """
    total_count = df.select(countDistinct(pk))
    print(tname)
    total_count.show()

In [51]:
# Perform quality checks here
dupcheck(fact_immigration,'immigration_fact',"record_id")
dupcheck(dim_travel,'dim_travel' ,"record_id")
dupcheck(dim_personal,'dim_personal',"record_id")
dupcheck(dim_visa,'dim_visa',"record_id")
dupcheck(dim_demographics,'dim_demographics',"city")
dupcheck(dim_agg_temp,'dim_agg_temp',"country_code")

immigration_fact
+-------------------------+
|count(DISTINCT record_id)|
+-------------------------+
|                  3096313|
+-------------------------+

dim_travel
+-------------------------+
|count(DISTINCT record_id)|
+-------------------------+
|                  3096313|
+-------------------------+

dim_personal
+-------------------------+
|count(DISTINCT record_id)|
+-------------------------+
|                  3096313|
+-------------------------+

dim_visa
+-------------------------+
|count(DISTINCT record_id)|
+-------------------------+
|                  3096313|
+-------------------------+

dim_demographics
+--------------------+
|count(DISTINCT city)|
+--------------------+
|                 559|
+--------------------+

dim_agg_temp
+----------------------------+
|count(DISTINCT country_code)|
+----------------------------+
|                         149|
+----------------------------+



### ETL Result Queries
Once the data is processed queries such as: 
- data on immigrates and demographic information about the state immigrated to.
- data on immigrants, average temprature by country of residence and demographic information about the state immigrated to.
- data on immigrants, average temprature by country of residence and demographic information about the state immigrated to where visa type is WB

In [52]:
dim_demographics.createOrReplaceTempView("dem")
dim_personal.createOrReplaceTempView("person")
fact_immigration.createOrReplaceTempView("fact")
Result1 = spark.sql("""
                SELECT d.median_age,
                d.average_household_size,
                d.male_population,
                d.female_population,
                dp.age,
                f.state_code,
                d.State
                FROM fact as f
                JOIN dem as d ON (f.state_code = d.state_code)
                JOIN person as dp ON (f.record_id = dp.record_id)               
                    """)
Result1.limit(5).toPandas()

Unnamed: 0,median_age,average_household_size,male_population,female_population,age,state_code,State
0,36.0,2.68,4081698,4468707,54.0,NY,New York
1,38.5,2.85,31876,36745,54.0,NY,New York
2,33.1,2.27,124537,133529,54.0,NY,New York
3,38.0,2.8,96580,104538,54.0,NY,New York
4,33.1,2.27,124537,133529,54.0,NY,New York


In [53]:
dim_demographics.createOrReplaceTempView("dem")
fact_immigration.createOrReplaceTempView("fact")
dim_agg_temp.createOrReplaceTempView("agg")
Result1 = spark.sql("""
                SELECT d.median_age,
                d.average_household_size,
                d.foreign_born,
                dat.Country_Avg_Temprature,
                dat.Country,
                f.state_code,
                d.State
                FROM fact as f
                JOIN dem as d ON (f.state_code = d.state_code)
                JOIN agg as dat ON (f.country_res_code = dat.country_code)
                
                    """)
Result1.limit(5).toPandas()

Unnamed: 0,median_age,average_household_size,foreign_born,Country_Avg_Temprature,Country,state_code,State
0,32.6,2.59,326825,-3.365485,Mongolia,TX,Texas
1,38.2,2.72,2829,-3.365485,Mongolia,VA,Virginia
2,32.7,2.5,181686,-3.365485,Mongolia,TX,Texas
3,30.2,2.51,16702,-3.365485,Mongolia,VA,Virginia
4,37.2,3.03,18556,-3.365485,Mongolia,TX,Texas


In [None]:
dim_demographics.createOrReplaceTempView("dem")
fact_immigration.createOrReplaceTempView("fact")
dim_visa.createOrReplaceTempView("visa")
dim_agg_temp.createOrReplaceTempView("agg")
Result1 = spark.sql("""
                SELECT d.median_age,
                d.average_household_size,
                d.foreign_born,
                dat.Country_Avg_Temprature,
                dat.Country,
                f.state_code,
                v.visatype,
                d.State
                FROM fact as f
                JOIN visa as v ON (f.record_id = v.record_id)
                JOIN dem as d ON (f.state_code = d.state_code)
                JOIN agg as dat ON (f.country_res_code = dat.country_code)
                WHERE v.visatype = 'WT'
                
                    """)
Result1.limit(5).toPandas()

#### 4.3 Data dictionary 

## Data Dictionary

<img src="../capstone image dd.PNG" />

#### Step 5: Complete Project Write Up

**Rationale for the choice of tools and technologies for the project**

- Star Schema was used because:
    - Fact and dimension tables have been used for a star schema for a particular analytic focus.
    - Using this model enables end users to query their data simply ( this is because we can denormalise the data making joining on tables easier) and do aggregations on the data at pace.
- Apache spark was used because of:
    - it's ability to handle multiple file formats 
    - it's ability to handle large amounts of data.
    - Apache Spark offers a analytics engine for big data.
    - Spark APIs for operating on large datasets.
- Propose how often the data should be updated and why.
    - The current I94 immigration data is updated monthly, and hence the data will be updated monthly.
    
    
**How I would approach the problem differently under the following scenarios:**
- The data was increased by 100x.
    - Spark can handle the increase but we would consider increasing the number of nodes in our cluster.
- The data populates a dashboard that must be updated on a daily basis by 7am every day.
    - In this scenario, Apache Airflow will be used to schedule and run data pipelines.
- The database needed to be accessed by 100+ people.
    - In this scenario, we would move our analytics database into Amazon Redshift.