Note: This is a continuation of the Capstone Project. The first part of the projects (steps 1 and 2) have been done in the Capstone_Project_Steps_1_2.ipynb Notebook

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

The database will consist of four dimension tables and one fact table. Starting with the fact table (fact_inmig), the data model is as follows:
##### fact_inmig:

* CID: double PRIMARY KEY
* citizenship_country: string 
* residence_country: string 
* arrival_airport: string 
* arrival_date: date 
* departure_date: date 
* age: double 
* generic_visa_type: string 
* arrival_flag: string 
* departure_flag: string 
* gender: string 
* airline: string
* arrival_flight_number: string 
* visa_type: string 
* arrival_state: string

The dimension tables are:

##### dim_airports:

* ident: string PRIMARY KEY
* type: string 
* elevation_ft: string 
* municipality: string
* iata_code: string
* State: string 

##### dim_demographics:

* City_State: string PRIMARY KEY
* City: string
* Median_Age: string
* Male_Population: string
* Female_Population: string 
* Total_Population: string 
* Veterans: string 
* Foreign: string 
* Avg_House_Size: string
* State: string 

##### dim_temperature:

* dt: string 
* AverageTemperature: string
* AverageTemperatureUncertainty: string
* Latitude: string
* Longitude: string
* year: integer
* City: string 
* PRIMARY KEY (dt, Latitude, Longitude)

##### dim_time:

* date: date PRIMARY KEY
* day: integer 
* week: integer 
* weekday: integer
* year_day: integer 
* year: integer 
* month: integer 

#### 3.2 Mapping Out Data Pipelines

The process has already been partially completed in Section 2, but will be summarised here.

#### Data Extraction:

Load all the datasets from CSV and SAS data files;

#### Data Transformation and Loading:

##### fact_immigration:
* Drop unwanted columns
* Convert citizenship country from code to name
* Convert residence country from code to name
* Convert arrival date to proper date format
* Keep only US addresses
* Convert departure date to proper date format and keep rows where this is after the arrival date only
* Convert visa ID to visa type description
* Select only arrivals to airports
* Select only desired columns
* Save as parquet files 

##### dim_temperature:
* Select data from 2000 onwards
* Select only USA cities
* Remove all null data points
* Select only desired columns
* Save as parquet files 

##### dim_time:
* Get distinct values from the fact table to act as primary keys for time dimension data
* Get time data and proper format
* Select only desired columns
* Save as parquet files 

##### dim_airports:
* Select USA airports
* Get airport USA state
* Select only airports
* Remove Null IATA codes
* Prepare proper format and naming
* Select only desired columns
* Save as parquet files 

##### dim_city_demographics:
* Remove unwanted rows and change names
* Join city and state to generate unique column to act as primary key
* Select only desired columns
* Save as parquet files 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Extract Data

In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [13]:
# Temperature Dimension Data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = spark.read.format("csv").option("header","true").load(fname)
temp_df.createOrReplaceTempView("dim_temperature")


In [24]:
# City Demographics Dimension Data
demo_df =spark.read.format("csv").option("header", "true").option("delimiter", ";").load('us-cities-demographics.csv')
demo_df = demo_df.na.drop(subset=demo_df.columns)
demo_df.createOrReplaceTempView("dim_demographics")

In [15]:
# Airport Dimension Data
airports_df =spark.read.format("csv").option("header", "true").load('airport-codes_csv.csv')
airports_df.createOrReplaceTempView("dim_airports")

In [16]:
# Inmigration Fact Data
inmig_df=spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
inmig_df.createOrReplaceTempView("fact_inmig")


In [17]:
# Country Codes required for Inmigration Data transformation
country_codes_df =spark.read.format("csv").option("header", "true").load('country_codes.csv')
country_codes_df.createOrReplaceTempView("country_codes")
country_codes_df.head()

Row(_c0='582', Country='MEXICO Air Sea, and Not Reported (I-94, no land arrivals)', Index='582')

## Transform Data

### Temperature Data

In [18]:
# Select only data from 2000 onwards
spark.sql("""
SELECT *, year(dt) as year
FROM dim_temperature
WHERE year(dt) >=2000
""").createOrReplaceTempView("dim_temperature")

In [21]:
# Select cities from USA
spark.sql("""
SELECT *
FROM dim_temperature
WHERE Country = 'United States'
""").createOrReplaceTempView("dim_temperature")

In [20]:
# Remove all Null datapoints
spark.sql("""
SELECT *
FROM dim_temperature
WHERE AverageTemperature IS NOT NULL
AND AverageTemperatureUncertainty IS NOT NULL
""").createOrReplaceTempView("dim_temperature")

### City Demographics Data

In [25]:
# Remove unwanted rows and change names

spark.sql("""
SELECT lower(City) as City, 
        `Median Age` as Median_Age, 
        `Male Population` as Male_Population, 
        `Female Population` as Female_Population, 
        `Total Population` as Total_Population, 
        `Number of Veterans` as Veterans,
        `Foreign-born` as Foreign,
        `Average Household Size` as Avg_House_Size,
        `State Code` as State
FROM dim_demographics
""").createOrReplaceTempView("dim_demographics")

In [26]:
# Join city and state to generate unique column to act as primary key
spark.sql("""
SELECT *,concat(City,"_",State) as City_State
FROM dim_demographics
""").createOrReplaceTempView("dim_demographics")

### Airport Data

In [28]:
# Select USA airports
spark.sql("""
SELECT * 
FROM dim_airports
WHERE iso_country='US'
""").createOrReplaceTempView("dim_airports")

In [29]:
# Get airport USA state
spark.sql("""
SELECT *,RIGHT(iso_region, 2) AS Airport_State
FROM dim_airports
""").createOrReplaceTempView("dim_airports")

In [30]:
# Select only airports
spark.sql("""
SELECT *
FROM dim_airports
WHERE type IN ('small_airport','medium_airport','large_airport')
""").createOrReplaceTempView("dim_airports")

In [31]:
# Remove Null IATA codes
spark.sql("""
SELECT *
FROM dim_airports
WHERE iata_code IS NOT NULL
""").createOrReplaceTempView("dim_airports")

In [32]:
# Prepare proper format and naming
spark.sql("""
SELECT ident, type, elevation_ft, Airport_State as State, lower(municipality) as municipality, iata_code
FROM dim_airports
""").createOrReplaceTempView("dim_airports")

In [33]:
# Remove null elevations
spark.sql("""
SELECT *
FROM dim_airports
WHERE elevation_ft IS NOT NULL
""").createOrReplaceTempView("dim_airports")

### Inmigration Data

In [34]:
# Drop unwanted columns

inmig_df = inmig_df.drop("visapost","occup","entdepu","insnum","count","dtadfile","dtaddto","admnum","matflag","biryear")
inmig_df = inmig_df.na.drop(subset=inmig_df.columns)
inmig_df.createOrReplaceTempView("fact_inmig")

In [35]:
# Convert citizenship country from code to name
spark.sql("SELECT *, lower(country_codes.Country) as citizenship_country\
          FROM fact_inmig \
          INNER JOIN country_codes \
          ON fact_inmig.i94cit = country_codes.Index \
         ").createOrReplaceTempView("fact_inmig")

In [36]:
# Convert residence country from code to name

spark.sql("SELECT *, lower(country_codes.Country) as residence_country\
          FROM fact_inmig \
          INNER JOIN country_codes \
          ON fact_inmig.i94res = country_codes.Index \
         ").createOrReplaceTempView("fact_inmig")

In [37]:
# Convert arrival date to proper date format
spark.sql("SELECT *, date_add(to_date('1960-01-01'), arrdate) AS arrival_date\
          FROM fact_inmig"
         ).createOrReplaceTempView("fact_inmig")

In [38]:
# Keep only US addresses

spark.sql("SELECT *\
          FROM fact_inmig\
          WHERE i94addr <> '99'").createOrReplaceTempView("fact_inmig")

In [39]:
# Convert departure date to proper date format and keep rows where this is after the arrival date only
spark.sql("SELECT *, date_add(to_date('1960-01-01'), depdate) AS departure_date\
          FROM fact_inmig\
          WHERE date_add(to_date('1960-01-01'), depdate)>=arrival_date"
         ).createOrReplaceTempView("fact_inmig")

In [40]:
# Convert visa ID to visa type description
spark.sql("""SELECT *, CASE 
                        WHEN i94visa = 1.0 THEN 'Business' 
                        WHEN i94visa = 2.0 THEN 'Pleasure'
                        WHEN i94visa = 3.0 THEN 'Student'
                        ELSE '0' END AS generic_visa_type 
                FROM fact_inmig""").createOrReplaceTempView("fact_inmig")

In [41]:
# Select only arrivals to airports
spark.sql("""
SELECT *
FROM fact_inmig
WHERE i94mode = 1
""").createOrReplaceTempView("fact_inmig")

### Time Dimension Data

In [42]:
# Get distinct values from the fact table to act as primary keys for time dimension data
dim_time = spark.sql("""
SELECT DISTINCT arrival_date AS date
FROM fact_inmig
""")
dim_time.createOrReplaceTempView("dim_time")

In [43]:
# Get time data and proper format
spark.sql("""
SELECT date, YEAR(date) AS year, MONTH(date) AS month, DAY(date) AS day, WEEKOFYEAR(date) AS week, DAYOFWEEK(date) as weekday, DAYOFYEAR(date) year_day
FROM dim_time
ORDER BY date ASC
""").createOrReplaceTempView("dim_time")

## Load Data

### Temperature Data

In [44]:
# Select only desired temperature columns
dim_temperature = spark.sql("""
SELECT dt, AverageTemperature,AverageTemperatureUncertainty, lower(City) as City, Latitude, Longitude, year
FROM dim_temperature
""")

In [None]:
# Save as parquet files
dim_temperature.write.partitionBy("year","City").parquet("dim_temperature")

### City Demographics Data

In [45]:
# Select all columns
dim_demographics = spark.sql("""
SELECT *
FROM dim_demographics
""")

In [46]:
dim_demographics = dim_demographics.dropDuplicates()

In [None]:
dim_demographics.write.partitionBy("State").parquet("dim_demographics")

### Airport Data

In [48]:
# Select all columns
dim_airports = spark.sql("""
SELECT *
FROM dim_airports
""")

In [None]:
dim_airports.write.partitionBy("State").parquet("dim_airports")

### Inmigration Data

In [49]:
# Select only desired columns
fact_inmig = spark.sql("""
SELECT cicid as CID,
        citizenship_country,
        residence_country,
        i94port as arrival_airport,
        arrival_date,
        i94addr as arrival_state,
        departure_date,
        i94bir as age,
        generic_visa_type,
        entdepa as arrival_flag,
        entdepd as departure_flag,
        gender,
        airline,
        fltno as arrival_flight_number,
        visatype as visa_type
FROM fact_inmig
""")

In [None]:
fact_inmig.write.partitionBy("arrival_date").parquet("fact_inmig")

## Time Dimension Data

In [50]:
# Select only desired columns
dim_time = spark.sql("""
SELECT *
FROM dim_time
""")

In [None]:
dim_time.write.partitionBy("year","month").parquet("dim_time")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [51]:
# First check that data looks like it should
ETL_inmigration = spark.read.parquet("fact_inmig")
ETL_inmigration.createOrReplaceTempView("check_inm")
ETL_inmigration.show(n=2)

+---------+-------------------+-----------------+---------------+-------------+--------------+----+-----------------+------------+--------------+------+-------+---------------------+---------+------------+
|      CID|citizenship_country|residence_country|arrival_airport|arrival_state|departure_date| age|generic_visa_type|arrival_flag|departure_flag|gender|airline|arrival_flight_number|visa_type|arrival_date|
+---------+-------------------+-----------------+---------------+-------------+--------------+----+-----------------+------------+--------------+------+-------+---------------------+---------+------------+
|5417720.0|            albania|          albania|            CHI|           MI|    2016-05-28|31.0|         Pleasure|           T|             O|     M|     OS|                   65|       B2|  2016-04-29|
|5417721.0|            albania|          albania|            CLG|           TX|    2016-05-01|20.0|         Pleasure|           H|             O|     F|     QK|                

In [52]:
# First check that data looks like it should
ETL_airports = spark.read.parquet("dim_airports")
ETL_airports.createOrReplaceTempView("check_airports")
ETL_airports.show(n=2)

+-----+-------------+------------+-------------+---------+-----+
|ident|         type|elevation_ft| municipality|iata_code|State|
+-----+-------------+------------+-------------+---------+-----+
|  0AK|small_airport|         305|pilot station|      PQS|   AK|
|  16A|small_airport|          12|  nunapitchuk|      NUP|   AK|
+-----+-------------+------------+-------------+---------+-----+
only showing top 2 rows



In [53]:
# First check that data looks like it should
ETL_demographics = spark.read.parquet("dim_demographics")
ETL_demographics.createOrReplaceTempView("check_demographics")

ETL_demographics.show(n=2)

+--------------------+----------+---------------+-----------------+----------------+--------+-------+--------------+--------------------+-----+
|                City|Median_Age|Male_Population|Female_Population|Total_Population|Veterans|Foreign|Avg_House_Size|          City_State|State|
+--------------------+----------+---------------+-----------------+----------------+--------+-------+--------------+--------------------+-----+
|louisville/jeffer...|      37.5|         298451|           316938|          615389|   39364|  37875|          2.45|louisville/jeffer...|   KY|
|athens-clarke cou...|      26.5|          57415|            65148|          122563|    3953|  12868|          2.44|athens-clarke cou...|   GA|
+--------------------+----------+---------------+-----------------+----------------+--------+-------+--------------+--------------------+-----+
only showing top 2 rows



In [54]:
# First check that data looks like it should
ETL_temperature = spark.read.parquet("dim_temperature")
ETL_temperature.createOrReplaceTempView("check_temperature")

ETL_temperature.show(n=2)

+----------+--------------------+-----------------------------+--------+---------+----+-----------+
|        dt|  AverageTemperature|AverageTemperatureUncertainty|Latitude|Longitude|year|       City|
+----------+--------------------+-----------------------------+--------+---------+----+-----------+
|2007-01-01|-0.01499999999999...|                        0.295|  37.78N|   93.56W|2007|springfield|
|2007-02-01|-0.14900000000000002|                        0.466|  37.78N|   93.56W|2007|springfield|
+----------+--------------------+-----------------------------+--------+---------+----+-----------+
only showing top 2 rows



In [55]:
# First check that data looks like it should
ETL_time = spark.read.parquet("dim_time")
ETL_time.createOrReplaceTempView("check_time")

ETL_time.show(n=2)

+----------+---+----+-------+--------+----+-----+
|      date|day|week|weekday|year_day|year|month|
+----------+---+----+-------+--------+----+-----+
|2016-04-10| 10|  14|      1|     101|2016|    4|
|2016-04-15| 15|  15|      6|     106|2016|    4|
+----------+---+----+-------+--------+----+-----+
only showing top 2 rows



In [56]:
# Perform quality checks here
# Check unique keys
spark.sql("""
SELECT COUNT(DISTINCT(CID)), COUNT(*)
FROM check_inm
""").show()

+-------------------+--------+
|count(DISTINCT CID)|count(1)|
+-------------------+--------+
|            2087669| 2087669|
+-------------------+--------+



These are the same, which means that the primary key is working fine

In [57]:
# Check unique keys
spark.sql("""
SELECT COUNT(DISTINCT(ident)), COUNT(*)
FROM check_airports
""").show()

+---------------------+--------+
|count(DISTINCT ident)|count(1)|
+---------------------+--------+
|                 1864|    1864|
+---------------------+--------+



These are the same, which means that the primary key is working fine

In [58]:
# Check unique keys
spark.sql("""
SELECT COUNT(DISTINCT(City_State)), COUNT(*)
FROM check_demographics
""").show()

+--------------------------+--------+
|count(DISTINCT City_State)|count(1)|
+--------------------------+--------+
|                       588|     588|
+--------------------------+--------+



These are the same, which means that the primary key is working fine

In [59]:
# Check unique keys
spark.sql("""
SELECT COUNT(DISTINCT(date)), COUNT(*)
FROM check_time
""").show()

+--------------------+--------+
|count(DISTINCT date)|count(1)|
+--------------------+--------+
|                  30|      30|
+--------------------+--------+



These are the same, which means that the primary key is working fine

In [60]:
def check_null(table, column):
    """ Check if any NULL value in the given table for the given column
        Args: table - table to analyse
              column - column from table to analyse
    """
    check_val = spark.sql(f"""SELECT COUNT(*) as nbr FROM {table} WHERE {column} IS NULL""").head()[0]
    if check_val >0:
        raise ValueError(f"Error in data quality. Found{check_val} NULL values in {column} column")
    else:
        print(f"Correct data quality for {table} in column {column}.")

In [61]:
print("Checking Fact Inmigration table")
for column in ETL_inmigration.columns:
    check_null("check_inm", column)
print(" ")
print("Checking Dimension Airport table")
for column in ETL_airports.columns:
    check_null("check_airports", column)
print(" ")
print("Checking Dimension Demographics table")
for column in ETL_demographics.columns:
    check_null("check_demographics", column)
print(" ")
print("Checking Dimension Temperature table")
for column in ETL_temperature.columns:
    check_null("check_temperature", column)
print(" ")
print("Checking Dimension Time table")
for column in ETL_time.columns:
    check_null("check_time", column)
print(" ")

Checking Fact Inmigration table
Correct data quality for check_inm in column CID.
Correct data quality for check_inm in column citizenship_country.
Correct data quality for check_inm in column residence_country.
Correct data quality for check_inm in column arrival_airport.
Correct data quality for check_inm in column arrival_state.
Correct data quality for check_inm in column departure_date.
Correct data quality for check_inm in column age.
Correct data quality for check_inm in column generic_visa_type.
Correct data quality for check_inm in column arrival_flag.
Correct data quality for check_inm in column departure_flag.
Correct data quality for check_inm in column gender.
Correct data quality for check_inm in column airline.
Correct data quality for check_inm in column arrival_flight_number.
Correct data quality for check_inm in column visa_type.
Correct data quality for check_inm in column arrival_date.
 
Checking Dimension Airport table
Correct data quality for check_airports in col

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

    The decision to work locally using Spark rather than on AWS, Airflow or other distributed tools was based on the fact that although the data is large (more than 3 million rows) it is small enough that it fits in the memory of the local machine I'm using. On top of this, this is a project that doesn't require collaboration from other teammates nor constant data updating, as this is just a demonsrator. Thus is it valid enough to work locally. 

* Propose how often the data should be updated and why.

    If this dataset were to be properly used by other people throughout time, it would have to be constantly updated. The latency of the data will vary depending on the end use of the data and the computational power, varying from hourly if the data were to be used by the police and Border Control to analyse airport occupation, to daily or weekly if used for other purposes, such as a more generic analysis of the inflow of passengers into the USA to assess demographic changes. Personally I would suggest updating this data weekly, as it is the proper balance between computational power and size of data that nees to be updated (several thousand inmigrants come into the USA every week).

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 
    In this case, the local machine I'm using would run out of memory, so I would need to transfer the data to a cloud warehouse (Amazon's S3 is a good example), and utilising several nodes in these buckets. Having the data stored in S3 the ETL pipeline could be prepared using Redshift instead, but still using Spark and SQL as the languages, as these are the best suited for big data analysis.
 
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 
     The ETL pipeline could be migrated into Apache Airflow, which allows for automated updated and proper data quality checks. Once in Airflow, the ETL pipeline would have to be converted into DAGs, that follow the same idea and meet the same purposes.
 * The database needed to be accessed by 100+ people.
 
     Once the ETL pipeline is done the data can be stored in a Redshift cluster, which can be accessed by anyone who had the proper access to the cluster (key and password)