# Project Title
### Data Engineering Capstone Project

#### Project Summary
This project will build up a data warehouse as a single-source-of-truth database by combining four data sets containing immigration data, airport codes, demographics of US cities and global temperature data. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
import pandas as pd
import configparser
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, count, sum, mean, round, upper, lower
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, monotonically_increasing_id
from pyspark.sql.types import *
from pyspark.sql.types import DoubleType, StringType, IntegerType, FloatType, DateType
from pyspark.sql import functions as F

In [4]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']

In [5]:
#spark = SparkSession.builder.\
#config("spark.jars.repositories", "https://repos.spark-packages.org/").\
#config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
#enableHiveSupport().getOrCreate()

#df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [6]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")
#df_spark.show(3)

###### Below SparkSession configurations should be done to be able to write into S3 Buckets (according to the Spark version over the server)

In [5]:
spark = SparkSession.builder\
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
                    .enableHiveSupport().getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

In this project we will gather the data from four differnt data sources. here you are the steps that will be followed:
1. Read and load into staging Dataframes
2. Apply cleansing over all those DFs
3. Perform some ETL trasformation rules using pyspark
4. Model them into Fact & Dimention tables using start schema
5. Write those DFs into S3 buckets
6. Extract again and apply Data Quality checks

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

- i94 Immigration Sample Data: Sample data of immigration records from the US National Tourism and Trade Office. This is the main data source that will serve as the Fact table in the schema. This data comes from https://travel.trade.gov/research/reports/i94/historical/2016.html.
- World Temperature Data world_temperature. This dataset contains temperature data in various cities from the 1700’s to 2013. Although the data is only recorded until 2013, we can use this as an average/gauge of temperature in 2017. This data comes from https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data.
- US City Demographic Data: Data about the demographics of US cities. This dataset includes information on the population of all US cities such as race, household size and gender. This data comes from https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/.
- Airport Codes: This table contains the airport codes for the airports in corresponding cities. This data comes from https://datahub.io/core/airport-codes#data.


##### TEMPERATURE DATA

In [22]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df_orig = pd.read_csv(fname)

In [23]:
temperature_df_orig.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


##### Due to performance, I'll restrich over the Temprature readings in USA

In [24]:
temperature_df = temperature_df_orig[temperature_df_orig['Country'] == 'United States']
temperature_df.count()

dt                               687289
AverageTemperature               661524
AverageTemperatureUncertainty    661524
City                             687289
Country                          687289
Latitude                         687289
Longitude                        687289
dtype: int64

##### AIRPORT CODES

In [25]:
airport_codes = 'airport-codes_csv.csv'
airport_df = pd.read_csv(airport_codes)

In [26]:
airport_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


##### IMMIGRATION DATA

In [27]:
immigration_data = 'immigration_data_sample.csv'
immigration_df = pd.read_csv(immigration_data)

In [28]:
immigration_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


##### US CITIES DEMOGRAPHICS

In [29]:
us_cities_demographics = 'us-cities-demographics.csv'
demographics_df = spark.read.csv(us_cities_demographics, inferSchema=True, header=True, sep=';')

In [30]:
demographics_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data:
- Drop rows containing all values with NULL
- Drop duplicate values


In [31]:
# Performing cleaning tasks here
# Delete those missing all values rows and then duplicates in all DFs
# Temperature DF
temperature_clean = temperature_df.dropna(how='all')
temperature_clean = temperature_clean.drop_duplicates()

# Airport DF
airport_clean = airport_df.dropna(how='all')
airport_clean = airport_clean.drop_duplicates()

# Immigration DF
immigration_clean = immigration_df.dropna(how='all')
immigration_clean = immigration_clean.drop_duplicates()

# Demographics DF
demographics_clean = demographics_df.dropna(how='all')
demographics_clean = demographics_clean.drop_duplicates()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
According to Kimball Modelling and as per the nature of the 4 combined Data Sources. The main target of combining those 4 Data Sources is to track the immigration actions into the USA. Accordingly:

- 1. Identify the Facts:
    - Fact tables focus on the occurrences of a singular business process, and have a one-to-one relationship with the dimention tables.
    - The fact table identified in this project is:
        - immigration_fact
        
- 2. Identify the Dimensions:
    - Dimension tables provide any quantitative or metric around the fact event or business process.
    - The dimensions identified in this project are:
        - migrant_dim
        - status_dim
        - visa_dim
        - state_dim
        - time_dim
        - airport_codes
        - temperature_dim
        - country_codes
        - country_temperature
    
The above mentioned tables will be developed in a Relational Database Management System to form a Star Schema, which can be used in Data analytics and BI insights activities.

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model:
1. Read and load into staging Dataframes
2. Apply cleansing over all those DFs
3. Perform some ETL trasformation rules using pyspark
4. Model them into Fact & Dimention tables using start schema
5. Write those DFs into S3 buckets
6. Extract again and apply Data Quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [32]:
temperature_clean.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 687289 entries, 47555 to 8439246
Data columns (total 7 columns):
dt                               687289 non-null object
AverageTemperature               661524 non-null float64
AverageTemperatureUncertainty    661524 non-null float64
City                             687289 non-null object
Country                          687289 non-null object
Latitude                         687289 non-null object
Longitude                        687289 non-null object
dtypes: float64(2), object(5)
memory usage: 41.9+ MB


In [33]:
# create schema for Temperature
temperature_schema = StructType([StructField("dt", StringType(), True)\
                          ,StructField("AverageTemperature", FloatType(), True)\
                          ,StructField("AverageTemperatureUncertainty", FloatType(), True)\
                          ,StructField("City", StringType(), True)\
                          ,StructField("Country", StringType(), True)\
                          ,StructField("Latitude", StringType(), True)\
                          ,StructField("Longitude", StringType(), True)])

temperature_spark = spark.createDataFrame(temperature_clean, schema=temperature_schema)

temperature_spark.toPandas().head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
1,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
2,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
3,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
4,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [34]:
airport_clean.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.5+ MB


In [35]:
# create schema for Airport codes
airport_codes_schema = StructType([StructField("ident", StringType(), True)\
                        ,StructField("type", StringType(), True)\
                        ,StructField("name", StringType(), True)\
                        ,StructField("elevation_ft", FloatType(), True)\
                        ,StructField("continent", StringType(), True)\
                        ,StructField("iso_country", StringType(), True)\
                        ,StructField("iso_region", StringType(), True)\
                        ,StructField("municipality", StringType(), True)\
                        ,StructField("gps_code", StringType(), True)\
                        ,StructField("iata_code", StringType(), True)\
                        ,StructField("local_code", StringType(), True)\
                        ,StructField("coordinates", StringType(), True)])

airport_codes_spark = spark.createDataFrame(airport_clean, schema=airport_codes_schema)

airport_codes_spark.toPandas().head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [36]:
immigration_clean.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1000 entries, 0 to 999
Data columns (total 29 columns):
Unnamed: 0    1000 non-null int64
cicid         1000 non-null float64
i94yr         1000 non-null float64
i94mon        1000 non-null float64
i94cit        1000 non-null float64
i94res        1000 non-null float64
i94port       1000 non-null object
arrdate       1000 non-null float64
i94mode       1000 non-null float64
i94addr       941 non-null object
depdate       951 non-null float64
i94bir        1000 non-null float64
i94visa       1000 non-null float64
count         1000 non-null float64
dtadfile      1000 non-null int64
visapost      382 non-null object
occup         4 non-null object
entdepa       1000 non-null object
entdepd       954 non-null object
entdepu       0 non-null float64
matflag       954 non-null object
biryear       1000 non-null float64
dtaddto       1000 non-null object
gender        859 non-null object
insnum        35 non-null float64
airline       967 non

In [37]:
# create schema for immigration DF
immigration_schema = StructType([StructField("0", IntegerType(), True)\
                          ,StructField("cicid", FloatType(), True)\
                          ,StructField("i94yr", FloatType(), True)\
                          ,StructField("i94mon", FloatType(), True)\
                          ,StructField("i94cit", FloatType(), True)\
                          ,StructField("i94res", FloatType(), True)\
                          ,StructField("i94port", StringType(), True)\
                          ,StructField("arrdate", FloatType(), True)\
                          ,StructField("i94mode", FloatType(), True)\
                          ,StructField("i94addr", StringType(), True)\
                          ,StructField("depdate", FloatType(), True)\
                          ,StructField("i94bir", FloatType(), True)\
                          ,StructField("i94visa", FloatType(), True)\
                          ,StructField("count", FloatType(), True)\
                          ,StructField("dtadfile", StringType(), True)\
                          ,StructField("visapost", StringType(), True)\
                          ,StructField("occup", StringType(), True)\
                          ,StructField("entdepa", StringType(), True)\
                          ,StructField("entdepd", StringType(), True)\
                          ,StructField("entdepu", FloatType(), True)\
                          ,StructField("matflag", StringType(), True)\
                          ,StructField("biryear", FloatType(), True)\
                          ,StructField("dtaddto", StringType(), True)\
                          ,StructField("gender", StringType(), True)\
                          ,StructField("insnum", FloatType(), True)\
                          ,StructField("airline", StringType(), True)\
                          ,StructField("admnum", FloatType(), True)\
                          ,StructField("fltno", StringType(), True)\
                          ,StructField("visatype", StringType(), True)])

immigration_spark = spark.createDataFrame(immigration_clean, schema=immigration_schema)

immigration_spark.toPandas().head()

Unnamed: 0,0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582680000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94361990000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [38]:
demographics_clean.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int32
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int32
dtypes: float64(6), int32(2), object(4)
memory usage: 248.5+ KB


In [39]:
# create schema for demographic
demographics_schema = StructType([StructField("City", StringType(), True)\
                        ,StructField("State", StringType(), True)\
                        ,StructField("Median Age", FloatType(), True)\
                        ,StructField("Male Population", FloatType(), True)\
                        ,StructField("Female Population", FloatType(), True)\
                        ,StructField("Total Population", IntegerType(), True)\
                        ,StructField("Number of Veterans", FloatType(), True)\
                        ,StructField("Foreign-born", FloatType(), True)\
                        ,StructField("Average Household Size", FloatType(), True)\
                        ,StructField("State Code", StringType(), True)\
                        ,StructField("Race", StringType(), True)\
                        ,StructField("Count", IntegerType(), True)])

demographics_spark = spark.createDataFrame(demographics_clean.toPandas(), schema=demographics_schema)

demographics_spark.toPandas().head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Maple Grove,Minnesota,38.599998,31780.0,36601.0,68381,2943.0,7645.0,2.64,MN,White,59683
1,Concord,California,39.599998,62310.0,66358.0,128668,6287.0,37428.0,2.72,CA,White,92575
2,Highlands Ranch,Colorado,39.599998,49186.0,53281.0,102467,4840.0,8827.0,2.72,CO,Asian,5650
3,Asheville,North Carolina,37.900002,42100.0,46407.0,88507,4973.0,6630.0,2.18,NC,American Indian and Alaska Native,496
4,Westland,Michigan,39.900002,37742.0,44253.0,81995,4756.0,6429.0,2.41,MI,Black or African-American,16422


##### Wrinting the tables into Parquet and upload over S3 bucket

In [40]:
migrant_dim = immigration_spark.withColumn("migrant_id", monotonically_increasing_id()) \
                      .select(["migrant_id", "biryear", "gender"]) \
                      .withColumnRenamed("biryear", "birth_year")\
                      .dropDuplicates(["birth_year", "gender"])

migrant_dim.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/migrant.parquet", mode="overwrite")
print("Migrant dimention table writing has been completed!")

Migrant dimention table writing has been completed!


In [41]:
status_dim = immigration_spark.withColumn("status_flag_id", monotonically_increasing_id()) \
                .select(["status_flag_id", "entdepa", "entdepd", "matflag"]) \
                .withColumnRenamed("entdepa", "arrival_flag")\
                .withColumnRenamed("entdepd", "departure_flag")\
                .withColumnRenamed("matflag", "match_flag")\
                .dropDuplicates(["arrival_flag", "departure_flag", "match_flag"])

status_dim.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/status.parquet", mode="overwrite")
print("Status dimention table writing has been completed!")

Status dimention table writing has been completed!


In [42]:
visa_dim = immigration_spark.withColumn("visa_id", monotonically_increasing_id()) \
                .select(["visa_id","i94visa", "visatype", "visapost"]) \
                .dropDuplicates(["i94visa", "visatype", "visapost"])

visa_dim.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/visa.parquet", mode="overwrite")
print("Visa dimention table writing has been completed!")

Visa dimention table writing has been completed!


In [43]:
state_dim = demographics_spark.select(["State Code", "State", "Median Age", "Male Population", "Female Population", "Total Population", "Average Household Size",\
                          "Foreign-born", "Race", "Count"])\
                .withColumnRenamed("State Code", "state_code")\
                .withColumnRenamed("Median Age", "median_age")\
                .withColumnRenamed("Male Population", "male_population")\
                .withColumnRenamed("Female Population", "female_population")\
                .withColumnRenamed("Total Population", "total_population")\
                .withColumnRenamed("Average Household Size", "average_household_size")\
                .withColumnRenamed("Foreign-born", "foreign_born")

state_dim = state_dim.groupBy(col("state_code"), col("State").alias("state")).agg(
                        round(mean('median_age'), 2).alias("median_age"),\
                        sum("total_population").alias("total_population"),\
                        sum("male_population").alias("male_population"), \
                        sum("female_population").alias("female_population"),\
                        sum("foreign_born").alias("foreign_born"), \
                        round(mean("average_household_size"),2).alias("average_household_size")
                        ).dropna()

state_dim.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/state.parquet", mode="overwrite")
print("State dimention table writing has been completed!")

State dimention table writing has been completed!


In [44]:
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None

udf_datetime_conversion = udf(lambda x: convert_datetime(x), DateType())

time_dim = immigration_spark.select(["arrdate"])\
                .withColumn("arrival_date", udf_datetime_conversion("arrdate")) \
                .withColumn('day', F.dayofmonth('arrival_date')) \
                .withColumn('month', F.month('arrival_date')) \
                .withColumn('year', F.year('arrival_date')) \
                .withColumn('week', F.weekofyear('arrival_date')) \
                .withColumn('weekday', F.dayofweek('arrival_date'))\
                .select(["arrdate", "arrival_date", "day", "month", "year", "week", "weekday"])\
                .dropDuplicates(["arrdate"])

time_dim.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/time.parquet", mode="overwrite")
print("Time dimention table writing has been completed!")

Time dimention table writing has been completed!


In [45]:
airport_codes = airport_codes_spark.select(["ident", "type", "iata_code", "name", "iso_country", "iso_region", "municipality", "gps_code", "coordinates", "elevation_ft"])\
                .dropDuplicates(["ident"])

airport_codes.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/airport_codes.parquet", mode="overwrite")
print("Airport_Codes dimention table writing has been completed!")

Airport_Codes dimention table writing has been completed!


In [46]:
temperature_dim = temperature_spark.groupBy(col("Country").alias("country")).agg(
                round(mean('AverageTemperature'), 2).alias("average_temperature"),\
                round(mean("AverageTemperatureUncertainty"),2).alias("average_temperature_uncertainty")
                ).dropna()\
                .withColumn("temperature_id", monotonically_increasing_id()) \
                .select(["temperature_id", "country", "average_temperature", "average_temperature_uncertainty"])

temperature_dim.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/temperature.parquet", mode="overwrite")
print("Temperature dimention table writing has been completed!")

Temperature dimention table writing has been completed!


In [47]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [48]:
country_code = {}
for countries in contents[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country

In [49]:
df_country_code = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
df_country_code.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [50]:
country_spark = spark.createDataFrame(df_country_code)
country_spark.head()

country_spark.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/country.parquet", mode="overwrite")
print("Country dimention table writing has been completed!")

Country dimention table writing has been completed!


In [54]:
# join country and temperature
country_temperature = country_spark.select(["*"])\
            .join(temperature_dim, (country_spark.country == upper(temperature_dim.country)), how='full')\
            .select([country_spark.code, country_spark.country, temperature_dim.temperature_id, temperature_dim.average_temperature, temperature_dim.average_temperature_uncertainty])

country_temperature.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/country_temperature.parquet", mode="overwrite")
print("Country_Temperature dimention table writing has been completed!")

Country_Temperature dimention table writing has been completed!


###### Fact table writing

In [56]:
migrant = spark.read.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/migrant.parquet")
status = spark.read.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/status.parquet")
visa = spark.read.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/visa.parquet")
state = spark.read.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/state.parquet")
time = spark.read.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/time.parquet")
airport = spark.read.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/airport_codes.parquet")
country = spark.read.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/country.parquet")
country_temperature = spark.read.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/country_temperature.parquet")


immigration_fact = immigration_spark.select(["*"])\
                .join(airport, (immigration_spark.i94port == airport.ident), how='full')\
                .join(country_temperature, (immigration_spark.i94res == country_temperature.code), how='full')\
                .join(migrant, (immigration_spark.biryear == migrant.birth_year) & (immigration_spark.gender == migrant.gender), how='full')\
                .join(status, (immigration_spark.entdepa == status.arrival_flag) & (immigration_spark.entdepd == status.departure_flag) &\
                      (immigration_spark.matflag == status.match_flag), how='full')\
                .join(visa, (immigration_spark.i94visa == visa.i94visa) & (immigration_spark.visatype == visa.visatype)\
                      & (immigration_spark.visapost == visa.visapost), how='full')\
                .join(state, (immigration_spark.i94addr == state.state_code), how='full')\
                .join(time, (immigration_spark.arrdate == time.arrdate), how='full')\
                .where(col('cicid').isNotNull())\
                .select(["cicid", "i94res", "depdate", "i94mode", "i94port", "i94cit", "i94addr", "airline", "fltno", "ident", "code",\
                         "temperature_id", "migrant_id", "status_flag_id", "visa_id", "state_code", time.arrdate.alias("arrdate")])

immigration_fact.write.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/immigration.parquet", mode="overwrite")
print("Immigration fact table writing has been completed!")

Immigration fact table writing has been completed!


In [6]:
immigration = spark.read.parquet("s3a://udacity-datalake-msaied2/Project6_Capstone/immigration.parquet")
immigration.toPandas().head()

Unnamed: 0,cicid,i94res,depdate,i94mode,i94port,i94cit,i94addr,airline,fltno,ident,code,temperature_id,migrant_id,status_flag_id,visa_id,state_code,arrdate
0,4084316.0,209.0,20573.0,1.0,HHW,209.0,HI,JL,00782,,209.0,,0,0,0,HI,20566.0
1,4422636.0,582.0,20568.0,1.0,MCA,582.0,TX,*GA,XBLNG,,,,1,1,1,TX,20567.0
2,1195600.0,112.0,20571.0,1.0,OGG,148.0,FL,LH,00464,,112.0,,2,0,0,,20551.0
3,5291768.0,297.0,20581.0,1.0,LOS,297.0,CA,QR,00739,,297.0,,3,0,3,CA,20572.0
4,985523.0,111.0,20553.0,3.0,CHM,111.0,NY,,LAND,,111.0,,4,4,0,NY,20550.0


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [9]:
tables = {
    "airport": airport,
    "country": country,
    "temperature": temperature,
    "migrant": migrant,
    "state": state,
    "status": status,
    "time": time,
    "visa": visa,
    "immigration": immigration
}

for table_name, table in tables.items():
    record_count = table.count()

    if (record_count == 0):
        print("Data quality check failed for {} with zero records!".format(table_name))
    else:
        print("Data quality check passed for {} with record_count: {} records.".format(table_name, record_count))

Data quality check passed for immigration with record_count: 1000 records.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Please refer to Data_Dictionary.txt file

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Tools and Technologies
1. AWS S3 for data storage
2. Pandas for sample data set exploratory data analysis
3. PySpark for large data set data processing to transform staging table to dimensional table

#### Data Update Frequency
1. All tables should be update in an append-only mode.
2. Demographic readings can be updated annually, while the immigration and temperature readings can be updated on monthly basis

#### Considerations/Assumptions:
##### The data was increased by 100x:
We can use more powerful tools for massive data processing and storage like (Amazon EMR & Amazon RedShift)
or use nosql techniques like (Apache Cassandra)

##### The data populates a dashboard that must be updated on a daily basis by 7am every day.
We can use Apache Airflow to hadle the daily scheduling. Also it can be used to apply data quality checks and get some useful notifications.

##### The database needed to be accessed by 100+ people
AWS Redshift can handle a massive amount of concurrent sessions. I believe it's worthy to convert the whole solution to be on cloud based if such number of hits will be there.
