In [2]:
# Do all imports and installs here
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from datetime import datetime
import os
from pyspark.sql import SparkSession

# Immigration data model

#### Project Summary
Data model based on immigration, airports and cities_demographics data, 
where immigration will be the fact table and airports and cites_dem the dimension tables

### Step 1: Scope of the Project

The idea is to create a fact table on the basis of the immigration dataset that can be joined with the airports and cities_dem data on
a common foreign key/primary key. Thereby, we can get more information on the airport, such as the full airport name, the city and the city
demographics, such as the total population count and the count of persons of foreign origin etc.

The immigration dataset comes from the national-travel-and-tourism-office, 
https://www.trade.gov/national-travel-and-tourism-office
    
The cities demographics data has been extracted from here: 
https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
    
The airports dataset can be found here:
https://datahub.io/core/airport-codes#data

Example query airports:
What are the 10 most popular travellers destinations by airport name and city, realized by joining the fact table immigration to the aiports
table on the key immigration_fact_table.aiports_id = airports.id.

In [3]:
from IPython.display import Image
Image(filename='sample_query_airports.JPG')

<IPython.core.display.Image object>

For the scope of this project, the Spark session is run locally. For a higher performance, an EMR cluster could be created on AWS and
this notebook could be run therein. The generated parquet files are stored in the directory "spark-warehouse" of this project and could
be stored to a S3 bucket within AWS if more persons are to access the output.

In [4]:
# Create Spark session

def create_spark_session():
    """
    Description: This function creates a Spark session.
    """
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
    os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
    os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"

    spark = SparkSession.builder.getOrCreate()
    return spark

In [5]:
spark = create_spark_session()

### Step 2: Define functions to process the cities demographics, airport and immigration data

In [6]:
#  Define funtion to process cities demographics data
def process_cities_dem_data(input_path):
    """
    Function reads in the cities demographics dataset into a Spark dataframe, 
    explores the data, generates the cites_dem dimension table and writes the data
    into parquet files
    
    Arguments:
        input_path: path where the input data is stored
        
    Returns:
        dataframe df_cities_dem, dimension table cities_dem_table
    """
     # read in the cities demographics dataset
    df_cities_dem = spark.read.csv(input_path, sep=";", inferSchema=True, header=True)
    print("Completed, ran successfully")
    df_cities_dem.printSchema()
    print(df_cities_dem.count())
    
    # exploring and cleaning
    if df_cities_dem.count() == df_cities_dem.distinct().count():
        print('No duplicates found')
    else:
        print('Dataset contains duplicates')
    df_cities_dem = df_cities_dem.dropna(how = "any", subset = ["State Code"])

    # create cities_dem table
    df_cities_dem.createOrReplaceTempView("cities_dem")
    cities_dem_table = spark.sql('''
                    SELECT DISTINCT
                    City as city,
                    State as state,
                    `State Code` as state_code,
                    `Median Age` as median_age,
                    `Male Population` as male_population,
                    `Female Population` as female_population,
                    `Total Population` as total_population,
                    `Foreign-born` as foreign_born,
                    Race as race,
                    Count as count
                    from cities_dem
                    ''')
    cities_dem_table = cities_dem_table.withColumn('id', F.monotonically_increasing_id())
    cities_dem_table.show(3)
    
    # write cities_dem table to parquet files
    cities_dem_data = os.path.join("spark-warehouse", "cities_dem")
    cities_dem_table.write.parquet(cities_dem_data, mode="overwrite")
    return df_cities_dem, cities_dem_table

In [7]:
#  Define funtion to process airport data
def process_airport_data(input_path):
    """
    Function reads in the airport codes dataset into a Spark dataframe, 
    explores the data, generates the airports dimension table and writes the data
    into parquet files
    
    Arguments:
        input_path: path where the input data is stored
        
    Returns:
        dataframe df_airports, dimension table airports_table
    """
     # read in the cities demographics dataset
    df_airports = spark.read.csv(input_path, sep=",", inferSchema=True, header=True)
    print("Completed, ran successfully")
    df_airports.printSchema()
    print(df_airports.count())
    
    # exploring and cleaning
    if df_airports.count() == df_airports.distinct().count():
        print('No duplicates found')
    else:
        print('Dataset contains duplicates')
    df_airports = df_airports.where("iso_country = 'US' and iata_code is not null and ident is not null")

    # create airports table
    df_airports.createOrReplaceTempView("airports")


    airports_table = spark.sql('''
                    SELECT DISTINCT
                    ident as id,
                    name as airport_name, 
                    iso_region,
                    municipality,
                    iata_code
                    from airports
                    ''')
    airports_table.show(10)
    
    # write airports table to parquet files
    airports_data = os.path.join("spark-warehouse", "airports")
    airports_table.write.parquet(airports_data, mode="overwrite")
    return df_airports, airports_table

In [8]:
# Define funtion to process immigration data
def process_immigration_data(input_path, df_airports, cities_dem_table):
    """
    Function reads in the immigration dataset into a Spark dataframe,
    explores and cleans the data, 
    generates the immigration fact table based
    on the immigration dataframe, 
    the airport dataframe and the cities_dem dimension table
    
    Arguments:
        input_path: path where the input data is stored
        df_airports: airport dataframe
        cities_dem_table: dimension table for US cities demographics
        
    Returns:
        None
    """
    # read in the immigration dataset
    df_immigration = spark.read.load(input_path)
    print("Completed, ran successfully")
    df_immigration.printSchema()
    
    # exploring and cleaning
    print(df_immigration.count())
    if df_immigration.count() == df_immigration.distinct().count():
        print('No duplicates found')
    df_immigration = df_immigration.dropna(how = "any", subset = ["cicid", "i94port", "i94addr"])
    
    # create immigration fact table
    df_immigration.createOrReplaceTempView("immigration")
    df_airports.createOrReplaceTempView("airports")
    cities_dem_table.createOrReplaceTempView("cities_dem")
    immigration_fact_table = spark.sql('''
                    SELECT DISTINCT
                    int(cicid),
                    airports.ident as airports_id,
                    cities_dem.id as cities_dem_id,
                    arrdate as arrival_date,
                    depdate as departure_date,
                    i94port,
                    i94addr,
                    int(biryear)
                    FROM immigration
                    join airports
                    ON immigration.i94port = airports.iata_code
                    join cities_dem
                    ON immigration.i94addr = cities_dem.state_code
                    ''')
    #immigration_fact_table.show(5)
    
    # write immigration fact table to parquet files
    immigration_data = os.path.join("spark-warehouse", "immigration")
    immigration_fact_table.write.parquet(immigration_data, mode="overwrite")
    
    # sample query joining immigration fact table and airports table
    # What are the top 10 locations people travel to, by airport name and location?
    immigration_fact_table.createOrReplaceTempView("immigration_fact")
    immigration_fact_table.show(5)
    # spark.sql('''
    #                 SELECT DISTINCT
    #                 airports.name,
    #                 airports.municipality,
    #                 count(*) as count
    #                 from immigration_fact
    #                 join airports
    #                 on immigration_fact.airports_id = airports.ident
    #                 group by airports.name, airports.municipality
    #                 order by count desc
    #                 LIMIT 10
    #           ''').show()

### Step 3: Define the Data Model

In [9]:
from IPython.display import Image
Image(filename='dimmodel.JPG')

<IPython.core.display.Image object>

#### 3.1 Conceptual Data Model
The data model is a star schema with the fact table immigration and the two dimension tables
airports and cities_dem. The fact table is linked with the airports table through the primary key
"id" in the airports table = the foreign key "airports_id" in the fact table.
The fact table can be joined with the dimensional table "cities_dem" through the foreign key
"cities_dem_id" = the Primary Key "id" in the cities_dem table.
The model allows to expand the traveller events data gathered in the fact table and get further
info both about the airport (from the airports table) and about the city demographic data
(from the cites_dem table).

#### 3.2 Mapping Out Data Pipelines
1. create the cities_dem_table, airports_table and immigration_fact_table by creating views of
the dataframes and selecting the relevant columns
2. write the newly created files to a parquet file to be located in the output path
"spark-warehouse"

### Step 4: Run Pipelines to Model the Data, as defined in the above functions

#### 4.1. Creating the data model

In [10]:
# run pipeline for cities demographics data
df_cities_dem, cities_dem_table = process_cities_dem_data(input_path= 'us-cities-demographics.csv')

Completed, ran successfully
root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)

2891
No duplicates found
+-----------+------------+----------+----------+---------------+-----------------+----------------+------------+--------------------+-----+---+
|       city|       state|state_code|median_age|male_population|female_population|total_population|foreign_born|                race|count| id|
+-----------+------------+----------+----------+---------------+-----------------+----------------+------------+-------------

In [11]:
# run pipeline for airport data
df_airports, airports_table = process_airport_data(input_path = 'airport-codes_csv.csv')

Completed, ran successfully
root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

55075
No duplicates found
+----+--------------------+----------+------------+---------+
|  id|        airport_name|iso_region|municipality|iata_code|
+----+--------------------+----------+------------+---------+
|AK97| Boswell Bay Airport|     US-AK| Boswell Bay|      BSW|
|KATW|Appleton Internat...|     US-WI|    Appleton|      ATW|
|KBDG|Blanding Municipa...|     US-UT|    Blanding|      BDG|
|KEUF|        Weedon Field|     US-AL|     Eufaula|      EUF|
|KFLO|Florence R

In [12]:
# run pipeline for immigration data
process_immigration_data(
                            input_path='./sas_data', 
                            df_airports=df_airports,
                            cities_dem_table=cities_dem_table
                         )

Completed, ran successfully
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |--

#### 4.2 Data Quality Checks

 * Check if the tables have at least one row
 * Check if the primary keys are unique


In [13]:
# 1. check if the tables have at least one row

def get_df_name(df):
    """
    Description: This function gets the name of a table.
    Arguments: table
    Returns: name of the table
    """
    name =[x for x in globals() if globals()[x] is df][0]
    return name

def at_least_one_row(table):
    """
    Description: This function checks if the table has at least one row.
    Arguments: table
    Returns: None
    """
    if table.count() < 1:
        raise ValueError(f"Data quality check failed. {get_df_name(table)} returned no results")
    else:
        print(f'{get_df_name(table)} looks correct')

# 2. check if primary keys are unique

def unique_primary_keys(table):
    """
    Description: This function checks if the primary keys of a given table are unique.
    Arguments: table
    Returns: None
    """
    if table.select('id').distinct().count() != table.count():
        raise ValueError(f"Primary key check failed. {get_df_name(table)}'s primary key values are not unique")
    else:
        print(f'Correct: primary keys in {get_df_name(table)} are unique')


In [14]:
# call function for at least one row check on airport and cities_dem table
at_least_one_row(airports_table)
at_least_one_row(cities_dem_table)

# call function for primary key check on airport and cities_dem table
unique_primary_keys(airports_table)
unique_primary_keys(cities_dem_table)

airports_table looks correct
cities_dem_table looks correct
Correct: primary keys in airports_table are unique
Correct: primary keys in cities_dem_table are unique


#### 4.3 Data dictionary 

In [15]:
from IPython.display import Image
Image(filename='datadict.JPG')

<IPython.core.display.Image object>

#### Step 5: Project Write Up
The data lake architecture with Spark has been chosen for this project, because this methodology allowed for reading in the datasets into
a preliminary dataframe and model the data tables after viewing and assessing the dataframes. Furthermore, this notebook can be run within an
EMR Cluster in AWS and the architecture can be adapted so that the modelled data can be stored in an S3 bucket on AWS.

Columns referring to the date are only contained in the immigration dataset and the whole dataset relates to the year 2016.
Therefore, it would make sense to update the pipeline if further immigration data for other years is available.
The dimension tables airports and cities_dem don't contain any time data.

Let's assume the scenario was changed and the requirements were the following:

 * The data was increased by 100x.
    
    By partitioning, the performance when querying the data can be improved by up to 83.33% faster compared to unpartitioned columns.
    Therefore, the immigration dataset could be partitioned by the columns for the year (i94yr) and the month (i94mon)
    if data for different years and months is available.
    
    However, if you have partitioned the parquet file
    but want to perform a query outside of the partitioned columns, the query will take longer than running it on an unpartitioned file.
    So partitioning the data by year and month would only make sense if the queries were to be run on these columns.
    
    I would recommend storing the data as parquet files within Amazon Simple Storage Service (Amazon S3).
    After the Data Lake EMR cluster is shut down, the parquet files still exist within Amazon S3.
    

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.

   Set up a Redshift Cluster either via the Amazon Web Services interface or via Infrastructure as Code.
   Within Airflow, through the tab "Admin" create a connnection to AWS Services and to the Redshift cluster.
   Drive the data pipeline by a schedule, for example
   dag = DAG (
              start_date = datetime.datetime(2021, 1, 1, 0, 0, 0, 0)
              schedule_interval: "@daily"
              )
   Use a Service Level Agreement, SLA, that can be set within the task, for example:
   immigration_task = PythonOperator(
                                      task_id = 'immigration',
                                      dag=dag,
                                      python_callable=function_name,
                                      provide_context=True,
                                      sla=datetime.timedelta(hours=7)
                                     )
   
   The business depends on the task to be run on time, in other words the timeliness of the task is critical for the business.
   The operating cost corresponds to the operating cost of the Redshift Cluster, depending on the type and amount of nodes.
   
   Apache Airflow does not require a separate Redshift instance, it populates the Redshift cluster that it is connected to.
    

 * The database needed to be accessed by 100+ people.

   It makes sense to replicate the data into a data warehouse with Amazon Redshift.

   I would choose the replication type "log replication" or "change data capture (CDC)", which is the fastest replication method that
   involves querying the database’s internal change log every few seconds, copying the changes into the data warehouse.
   Besides being a faster, more reliable method,
   log replication also has a much lower impact on database performance during querying and helps to avoid loading duplicate events.

   Within Amazon Redshift, there are now a variety of tools to further scale compute and storage separately. 
   Amazon Redshift Managed Storage (the RA3 node family) can help accommodate massive influx by
   focusing on using the right amount of compute, without worrying about sizing for storage.
   Concurrency scaling lets you specify entire additional clusters of compute to be applied dynamically as needed. 
   Amazon Redshift Spectrum uses the capacity of Amazon Simple Storage Service (Amazon S3) to 
   support an on-demand compute layer up to 10 times the power of the main cluster.

   The ability to access database objects, such as tables and views, is controlled by user accounts in the Amazon Redshift database. 
   Users can only access resources in the database that their user accounts have been granted permission to access. 
   My suggestion is to create Amazon Redshift user accounts and manage permissions by using the 
   CREATE USER, CREATE GROUP, GRANT and REVOKE SQL statements. 

   The dynamic addition of clusters with concurrency scaling helps to optimze the costs.

