# Project Title
### Data Engineering Capstone Project

#### Project Summary
In the scope of this project we investigate a dataset called US I94 about immigration under consideration of further data about demographics and temperature for a more comprehensive analysis of causes of immigration. We extract transform the data from different sources and load it into a data model using a star schema with a central fact table. The purpose of the generated data model is to allow an comprehensive analysis of the immigration to the United States under consideration of demographics and temperature. The model should furthermore be extenable with additional data.

The project follows the follow steps:
1. Loading of data
2. Exploration and cleaning of data
3. Definition of data model
4. Loading of data into model
5. Quality checks


In [1]:
# Imports
import pandas as pd
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import DateType
from pyspark.sql import functions as F

### Step 1: Scope the Project and Gather Data

#### Scope 
In the scope of this project we investigate a dataset called US I94 about immigration under consideration of further data about demographics and temperature for a more comprehensive analysis of causes of immigration.

#### Describe and Gather Data 
We use data from four different datasets:

- I94 Immigration Data: This data comes from the [US National Tourism and Trade Office](https://www.trade.gov/national-travel-and-tourism-office); it includes information about immigrants and their departure airport. 
- World Temperature Data: This dataset came from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data); contains information about surface temperature of countries in the world.
- U.S. City Demographic Data: This data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/); contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 
- Airport Code Table: This is a simple table of IATA airport codes and corresponding cities. Comes from [datahub.io](https://datahub.io/core/airport-codes#data)

## Read in all datasets with Pyspark for faster use and sample with pandas and SQL temp view

In [2]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

# Create Spark Session
spark = SparkSession.builder.getOrCreate()

# Read in datasets

# I94
immigrations_spark =spark.read.load('./sas_data')
immigrations_df = immigrations_spark.limit(500).toPandas()

immigrations_spark.createOrReplaceTempView('immigrations')

# Airports
airports_spark = spark.read.option("header",True).csv("airport-codes_csv.csv")
airports_df = airports_spark.limit(500).toPandas()

airports_spark.createOrReplaceTempView('airports')

# us-cities-demographics
us_demographics_spark = spark.read.option("header",True).option("delimiter", ';').csv("us-cities-demographics.csv")
us_demographics_df = us_demographics_spark.limit(500).toPandas()

us_demographics_spark.createOrReplaceTempView('us_demographics')

# city temperatures
city_temperatures_spark = spark.read.option("header",True).csv('../../data2/GlobalLandTemperaturesByCity.csv')
city_temperatures_df = city_temperatures_spark.limit(500).toPandas()

city_temperatures_spark.createOrReplaceTempView('city_temperatures')

In [3]:
df_list = [immigrations_df, airports_df, us_demographics_df, city_temperatures_df]

for dataset in df_list:
    print(dataset.head(5))
    print(dataset.columns)

       cicid   i94yr  i94mon  i94cit  i94res i94port  arrdate  i94mode  \
0  5748517.0  2016.0     4.0   245.0   438.0     LOS  20574.0      1.0   
1  5748518.0  2016.0     4.0   245.0   438.0     LOS  20574.0      1.0   
2  5748519.0  2016.0     4.0   245.0   438.0     LOS  20574.0      1.0   
3  5748520.0  2016.0     4.0   245.0   438.0     LOS  20574.0      1.0   
4  5748521.0  2016.0     4.0   245.0   438.0     LOS  20574.0      1.0   

  i94addr  depdate   ...     entdepu  matflag  biryear   dtaddto gender  \
0      CA  20582.0   ...        None        M   1976.0  10292016      F   
1      NV  20591.0   ...        None        M   1984.0  10292016      F   
2      WA  20582.0   ...        None        M   1987.0  10292016      M   
3      WA  20588.0   ...        None        M   1987.0  10292016      F   
4      WA  20588.0   ...        None        M   1988.0  10292016      M   

  insnum airline        admnum  fltno visatype  
0   None      QF  9.495387e+10  00011       B1  
1   No

### Step 2: Explore and Assess the Data

#### City Temperatures

Exploring:
- Table contains temperature measurements for cities at different time stamps
- Many Null values for the temperatures that need to be removed

Clearning:
- For city_temperatures we get all rows for data form the United States that is not null, convert the string date to datetime and drop the dublicates and drop rows with any nulls

In [4]:
city_temperatures_df.head(5)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [5]:
from clean_data import clean_city_temperatures, clean_us_demographics, clean_airports, clean_immigrations

In [6]:
city_temperatures_spark_cleaned = clean_city_temperatures(city_temperatures_spark, spark)
city_temperatures_spark_cleaned.show(5)

+-----+-------+--------+---------+------------------+-----------------------------+----------+
| City|Country|Latitude|Longitude|AverageTemperature|AverageTemperatureUncertainty|        dt|
+-----+-------+--------+---------+------------------+-----------------------------+----------+
|Århus|Denmark|  57.05N|   10.33E|            16.824|                         3.62|1771-06-01|
|Århus|Denmark|  57.05N|   10.33E|            -6.918|                        7.193|1776-01-01|
|Århus|Denmark|  57.05N|   10.33E|            12.606|                        2.666|1790-05-01|
|Århus|Denmark|  57.05N|   10.33E|            16.656|                        2.203|1792-08-01|
|Århus|Denmark|  57.05N|   10.33E|             1.309|                        3.533|1803-03-01|
+-----+-------+--------+---------+------------------+-----------------------------+----------+
only showing top 5 rows



#### US demographics

Exploring:
- We have multiple rows for a city since counts are given for different races, these values need to be pivoted

Clearning:
- For us_demographics we drop dublicates and rows with nulls, we then pivot the Race column and add 0 for missing race counts

In [7]:
us_demographics_spark.orderBy(['City','State']).show(5, truncate=False)

+-------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+
|City   |State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race                             |Count|
+-------+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+
|Abilene|Texas|31.3      |65212          |60664            |125876          |9367              |8129        |2.64                  |TX        |Asian                            |2929 |
|Abilene|Texas|31.3      |65212          |60664            |125876          |9367              |8129        |2.64                  |TX        |Hispanic or Latino               |33222|
|Abilene|Texas|31.3      |65212          |60664            |125876          |936

In [8]:
us_demographics_spark_cleaned = clean_us_demographics(us_demographics_spark, spark)
us_demographics_spark_cleaned.show(5)

+----------+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|     State|   City|Median_Age|Male_Population|Female_Population|Total_Population|Number_of_Veterans|Foregin_born|Average_Household_Size|State_Code|American_Indian_and_Alaska_Native|Asian|Black_or_African_American|Hispanic_or_Latino| White|
+----------+-------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+---------------------------------+-----+-------------------------+------------------+------+
|     Texas|Abilene|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|                             1813| 2929|                    14449|             33222| 95487|
|      Ohio|  Akron|      38.1|     

#### Airports

Exploring:
- Contains data from multiple airports, only international ones with IATA code are interessting

Cleaning:
- For airports we get only airports with a IATA code, again dublicates and rows with Null are droped

In [9]:
airports_df.head(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [10]:
airports_spark_cleaned = clean_airports(airports_spark, spark)
airports_spark_cleaned.show(5)

+------+-------------------+-----------+-----+---------+
| ident|               name|       City|State|iata_code|
+------+-------------------+-----------+-----+---------+
|  AYWF|Wawoi Falls Airport|Wavoi Falls|  WPD|      WAJ|
|  BIRF|        Rif Airport|        Rif|    3|      OLI|
|BZ-CYC|Caye Chapel Airport|Caye Chapel|   BZ|      CYC|
|   CGA|Craig Seaplane Base|      Craig|   AK|      CGA|
|  CNT3| Ogoki Post Airport| Ogoki Post|   ON|      YOG|
+------+-------------------+-----------+-----+---------+
only showing top 5 rows



#### Immigrations

Exploring:
- Colume names need to be renamed with understanable, intuitive names
- arrival and departure date need to be converted to actual date with the 01.01.1960 as base date
- Only rows with meaningful values should be selected
- Need to add City as additional foregin key

Clearning:
- We convert arrival and departure date in dates, rename and select meaningful columns and drop dublicates and rows with Null values
- Merge with airports data on iata_code to get city as column

In [11]:
immigrations_df.head(5)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


In [12]:
immigrations_spark_cleaned = clean_immigrations(immigrations_spark, spark)
immigrations_spark_cleaned.show(5)

+-------+----+-----+---------+---------+------+-----+-------+---------+------+----------+-------+--------+--------+-------------+------------+--------------+
|city_id|year|month|iata_code|city_code|res_id|State|matflag|birthyear|gender|   dtaddto|airline|visatype|visapost|flight_number|arrival_date|departure_date|
+-------+----+-----+---------+---------+------+-----+-------+---------+------+----------+-------+--------+--------+-------------+------------+--------------+
|6063613|2016|    4|      AGA|      268|   268|   GU|      M|     1988|     M|2016-12-06|     CI|     GMT|     TAI|        00026|  2016-04-29|    2016-05-03|
| 473260|2016|    4|      SFR|      135|   135|   CA|      M|     1962|     M|2016-02-10|     SQ|      B2|     HNK|        00002|  2016-04-03|    2016-04-08|
| 474080|2016|    4|      TAM|      135|   135|   FL|      M|     1958|     M|2016-02-10|     BA|      B2|     LND|        02167|  2016-04-03|    2016-05-09|
| 474838|2016|    4|      HAM|      135|   509|   NY

In [13]:
# Join immigrations and airports on iata_code

airports_sec = airports_spark_cleaned.select(['iata_code', 'City'])
airports_sec = airports_sec.withColumnRenamed('iata_code', 'iata_code_air')
immigrations_spark_cleaned = immigrations_spark_cleaned.join(airports_sec, immigrations_spark_cleaned.iata_code == airports_sec.iata_code_air, 'left').select('*')
immigrations_spark_cleaned = immigrations_spark_cleaned.drop('iata_code_air')

airports_spark_cleaned = airports_spark_cleaned.drop('City')

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We use 4 tables with a star schema 3 dimension tables (city_temperature, airports, us_demographics) and the immigrations as fact table, see table below

|Table |Columns  | Table kind| Foregin keys| Description|
--- | --- | --- | --- | ---
|city_temperature|City, Country, Latitude, Longitude, AverageTemperature, AverageTemperatureUncertainty, dt| Dimension table| City| temperature information from cities in different countries|
|airports|ident, name, State, iata_code| Dimension table| iata_code| airport information|
|us_demographics|State, City, Median_Age, Male_Population, Female_Population, Total_Population, Number_of_Veterans, Foreign_born, Average_Household_Size, State_Code, American_Indian_and_Alaska Native, Asian, Black_or_African_American, Hispanic_or_Latino, White| Dimension table City| City| demographics information|
|immigratins|city_id, year,month, iata_code, city_code, res_id, State, matflag, birthyear, gender, dtaddto,  airline, visatype, visapost, flight_number, arrival_date, departure_date| Fact table| City| immigration information|

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [14]:
import psycopg2
from tqdm import tqdm
import sys
import shutil
from create_tables import create_tables, delete_tables

conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
conn.set_session(autocommit=True)
conn.set_client_encoding('UTF8')
cur = conn.cursor()

# Drop Tables
for delete_table in delete_tables:
    try:
        cur.execute(delete_table)
    except:
        print(f"Table with query {create_table} could not be dropped")
        
# Create Tables
for create_table in create_tables:
    try:
        cur.execute(create_table)
    except:
        print(f"Table with query {create_table} could not created")

In [15]:
# write tables to csv
spark_tables = [city_temperatures_spark_cleaned, airports_spark_cleaned, us_demographics_spark_cleaned, immigrations_spark_cleaned]
table_names = ['city_temperatures','airports','us_demographics', 'immigrations']

for table_name, spark_table in zip(table_names, spark_tables):
    print(f'Writing {table_name} to csv')
    shutil.rmtree(f'csv/{table_name}', ignore_errors=True)
    spark_table.write.option("sep", ";").csv(f"csv/{table_name}")
    
    csv_list = [f for f in os.listdir(f"csv/{table_name}") if '.csv' in f and '.crc' not in f]
    pbar = tqdm(csv_list)
    for f in pbar:
        pbar.set_description(f'{table_name} processed')
        with open(f'csv/{table_name}/'+f, 'r', encoding="utf-8") as file_in:
            next(file_in)
            columns = spark_table.schema.names
            try:
                cur.copy_from(file_in, table_name, columns=columns, sep=";")
            except Exception as e:
                print(e)
                
shutil.rmtree(f'csv', ignore_errors=True)

Writing city_temperatures to csv


city_temperatures processed: 100%|██████████| 200/200 [01:39<00:00,  1.08s/it]


Writing airports to csv


airports processed: 100%|██████████| 200/200 [00:00<00:00, 586.03it/s]


Writing us_demographics to csv


us_demographics processed: 100%|██████████| 200/200 [00:00<00:00, 597.95it/s]


Writing immigrations to csv


immigrations processed: 100%|██████████| 200/200 [00:02<00:00, 83.25it/s]


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [16]:
from check_quality import check_row_number, check_dtypes, check_column_names

In [17]:
# Check number of rows / tables not to be empty
check_row_number(cur, table_names)

# Check column names of tables
column_dict = {
    'city_temperatures':['city', 'country', 'latitude', 'longitude', 'averagetemperature', 'averagetemperatureuncertainty', 'dt'],
    'airports':['ident', 'name', 'state', 'iata_code'],
    'us_demographics':['state', 'city', 'median_age', 'male_population', 'female_population', 'total_population', 'number_of_veterans', 'foregin_born', 'average_household_size', 'state_code', 'american_indian_and_alaska_native', 'asian', 'black_or_african_american', 'hispanic_or_latino', 'white'],
    'immigrations':['city_id', 'year', 'month', 'iata_code', 'city_code', 'res_id', 'state', 'matflag', 'birthyear', 'gender', 'dtaddto', 'visatype', 'visapost', 'flight_number', 'airline', 'arrival_date', 'departure_date', 'city']
}

check_column_names(cur, table_names, column_dict)

dtypes_dict = {
    'city_temperatures':['character varying', 'character varying', 'character varying', 'character varying', 'double precision', 'double precision', 'date'],
    'airports':['character varying', 'character varying', 'character varying', 'character varying'],
    'us_demographics':['character varying', 'character varying', 'double precision', 'integer', 'integer', 'integer', 'integer', 'integer', 'double precision', 'character varying', 'integer', 'integer', 'integer', 'integer', 'character varying'],
    'immigrations':['integer', 'integer', 'integer', 'character varying', 'integer', 'integer', 'character varying', 'character varying', 'integer', 'character varying', 'date', 'character varying', 'character varying', 'character varying', 'character varying', 'date', 'date', 'character varying'],
}

check_dtypes(cur, table_names, dtypes_dict)

Quality check for city_temperatures successfull
Quality check for airports successfull
Quality check for us_demographics successfull
Quality check for immigrations successfull
Successfal column names test for city_temperatures
Successfal column names test for airports
Successfal column names test for us_demographics
Successfal column names test for immigrations
Successfal data types test for city_temperatures
Successfal data types test for airports
Successfal data types test for us_demographics
Successfal data types test for immigrations


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

## city_temperatures
    |-- City: string (nullable = false) - city name from city_temperatures --FOREGIN KEY
    |-- Country: string (nullable = false) - country name from city_temperatures
    |-- Latitude: string (nullable = false) - latitude of city name from city_temperatures
    |-- Longitude: string (nullable = false) - longitude of city name from city_temperatures
    |-- AverageTemperature: float (nullable = false) - average temperature in city name from city_temperatures
    |-- AverageTemperatureUncertainty: float (nullable = false) - average temperature uncertainty in the city name from city_temperatures
    |-- dt: date (nullable = false) - date the temperature was recorded name from city_temperatures
    
## airports
    |-- ident: string (nullable = false) - identifier for airport from airports
    |-- name: string (nullable = false) - name of airport from airports
    |-- State: string (nullable = false) - state the airport is located in from airports
    |-- iata_code: string (nullable = false) - IATA code of the airport from airports --FOREGIN KEY
    
## us_demographics
    |-- State: string (nullable = false) - state name of the US state from us_demographics --FOREGIN KEY
    |-- City: string (nullable = false) - city name from us_demographics
    |-- Median_Age: float (nullable = false) - median age in city from us_demographics
    |-- Male_Population: integer (nullable = false) - Male population in city from us_demographics
    |-- Female_Population: integer (nullable = false) - Female population in city from us_demographics
    |-- Total_Population: integer (nullable = false) - Total population in city from us_demographics
    |-- Number_of_Veterans: integer (nullable = false) - Number of veterans in city from us_demographics
    |-- Foregin_born: integer (nullable = false) - Number of foregin born in city from us_demographics
    |-- Average_Household_Size: float (nullable = false) - Average household size in city from us_demographics
    |-- State_Code: string (nullable = false) - state code of city from us_demographics
    |-- American_Indian_and_Alaska_Native: integer (nullable = false) - Count of people from native american or alskan native race in city from us_demographics
    |-- Asian: integer (nullable = false) - Count of people from asian race in city from us_demographics
    |-- Black_or_African_American: integer (nullable = false) - Count of people from black or afican-american race in city from us_demographics
    |-- Hispanic_or_Latino: integer (nullable = false) - Count of people from hispanic or latino race in city from us_demographics
    |-- White: integer (nullable = false) - Count of people from white race in city from us_demographics
    
## immigrations
    |-- city_id: double (nullable = false) - Id of city from immigrations
    |-- year: double (nullable = false) - Year the data was recorded from immigrations
    |-- month: double (nullable = false) - Month the data was recorded from immigrations
    |-- iata_code: string (nullable = false) - IATA code of airport from immigrations --FOREGIN KEY
    |-- city_code: double (nullable = false) - city code from immigrations
    |-- res_id: double (nullable = false) - id of residence from immigrations
    |-- State: string (nullable = false) - State from immigrations
    |-- matflag: string (nullable = false) - matchflag between arrival and departure from immigrations
    |-- birthyear: double (nullable = false) - birthyear of passager from immigrations
    |-- gender: string (nullable = false) - gender of the passager from immigtions
    |-- dtaddto: string (nullable = false) - allowed days to stay from immigrations
    |-- airline: string (nullable = false) - airline from immigrations
    |-- visatype: string (nullable = false) - type of visa from immigrations
    |-- visapost: string (nullable = false) - visa post from immigrations
    |-- flight_number: string (nullable = false) - flight number from immigrations
    |-- arrival_date: date (nullable = false) - arrival date from immigrations
    |-- departure_date: date (nullable = false) - departure date from immigrations
    |-- City: string (nullable = false) - city name from city_temperatures --FOREGIN KEY

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
  - Spark is used for data processing and PostgresSQL for the model
  - Spark was chosen because for intuitive data processing and ability to handle and process large datasets
  - PostgreSQL was chosen because its a widely used database that allows fast queries in the intuitive SQL language
  - We use a star schema because is allows for fast queries and still keeping the schema initive and easy to understand,
      compard to a snowflake schema for example
* Propose how often the data should be updated and why.
   -  Since the data is recorded monthly, monthly updates are recommanded
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - Spark can handle the increase
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - Implement a Airflow pipeline that fetches the data and displays it to the dashbord
 * The database needed to be accessed by 100+ people.
     - The transition of the data from a local PostgreSQL database to a Cloud based option like Amazon Redshift would be recommended