# Data Engineering Capstone Project

## Project Summary
The Data Engineering Capstone Project represents a comprehensive data engineering initiative, involving the collection, processing, and modeling of data from diverse sources to construct a valuable and insightful data model. The primary dataset for this project encompasses information on immigration to the United States, supplemented by datasets covering airport codes and U.S. city demographics.

The project follows the follow steps:

- Step 1: Scope the Project and Gather Data
- Step 2: Explore and Assess the Data
- Step 3: Define the Data Model
- Step 4: Run ETL to Model the Data
- Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
import pandas as pd
import os
import configparser
from datetime import datetime, timedelta

from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import udf,col, array_contains, split,monotonically_increasing_id
from pyspark.sql.types import DateType, StringType
from pyspark.sql import SQLContext

config = configparser.ConfigParser()
config.read('aws.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_CREDENTIALS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_CREDENTIALS']['AWS_SECRET_ACCESS_KEY']

## Step 1: Scope the Project and Gather Data

### Scope
In this project, we aim to build a data pipeline for analyzing immigration data to the United States, alongside supplementary datasets containing information about airport codes, U.S. city demographics, and temperature data. Our goal is to extract, transform, and load the data into a structured format that can be used for various analytical purposes. We will use Apache Spark for data processing and transformation.

### Describe and Gather Data
Immigration Data: The immigration data originates from the U.S. National Tourism and Trade Office and covers international visitor arrivals in the USA. We will work with data from April 2016.

In [None]:
# Read immigration data using pandas
path_i94 = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_i94 = pd.read_sas(path_i94, 'sas7bdat', encoding="ISO-8859-1")

In [None]:
# Display a sample of the data
df_i94.head(5)

In [None]:
# Read airport data
df_airport = pd.read_csv("./airport-codes_csv.csv")
df_airport.head(5)


In [None]:
# Read U.S. city demographics data
df_demographics = pd.read_csv("./us-cities-demographics.csv", delimiter=";")
df_demographics.head(5)


In [None]:
# Set up a Spark session
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

# Read immigration data into a Spark DataFrame and write to Parquet
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.write.mode("overwrite").parquet("sas_data")
df_spark = spark.read.parquet("sas_data")


## Step 2: Explore and Assess the Data

### Explore the Data
In this step, will identify data quality issues such as missing values, duplicate data, etc., and explore the datasets.

In [None]:
# Explore the immigration data
df_immigration.describe()


In [None]:
# Explore the airport data
df_airport.describe()


In [None]:
# Explore the demographics data
df_demographics.describe()


### Cleaning Steps
will perform data cleaning tasks to address the data quality issues. These tasks include:

In [None]:
# Get port locations from SAS text file
# Remove irregular ports and NaN values
df_i94_filtered = df_i94[~df_i94["i94port"].isin(irr_ports)]
df_i94_filtered.drop(columns=["insnum", "entdepu", "occup", "visapost"], inplace=True)
df_i94_filtered.dropna(inplace=True)


In [None]:
# Remove rows with missing iata_code
df_airport.dropna(subset=['iata_code'], inplace=True)


In [None]:
# Convert SAS date to a more accurate date format
immigration_fact['arrival_date'] = immigration_fact.arrival_date.apply(lambda x: sas_to_date(x))
immigration_fact['departure_date'] = immigration_fact.departure_date.apply(lambda x: sas_to_date(x))


## Step 3: Define the Data Model
### 3.1 Conceptual Data Model
will use a star schema model for the data structure, which provides a simple and clear structure for data analysis and understanding results. The data model will consist of the following tables:

Fact Table:
- Immigration Fact Table: Contains information related to immigration, including arrival and departure dates, mode of transportation, and visa type.

Dimension Tables:
- Demographics Dimension Table: Contains demographic information about U.S. cities.
- Airports Dimension Table: Contains information about airports.

The star schema allows for easy analysis and query optimization.

### 3.2 Mapping Out Data Pipelines
To create the data pipelines, follow these steps:

- Enter your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY into the keys.cfg file.
- Run the etl.py script to trigger a Spark job that processes and transforms the data into a combination of facts and dimension tables.
- Check the output in your S3 data lake to confirm a successful load.

## Step 4: Run Pipelines to Model the Data
### 4.1 Create the Data Model

In [None]:
def createsparksession():
    spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()

    return spark

def read_labels(file, first_row, last_row):
    frame = {}
    frame2 = {}
    print(first_row)
    with open(file) as f:
        file_content = f.readlines()

        for content in file_content[first_row:last_row]:
            content = content.split("=")
            if first_row == 303:
                code, cont = content[0].strip("\t").strip().strip("'"), content[1].strip("\t").strip().strip("''")
            else:
                code, cont = content[0].strip(), content[1].strip().strip("'")
            frame[code] = cont
    return frame



In [None]:
def process_immigration_data(spark, input_data, output_data):
    """
    Loads immigration data and processes it into a fact table
    input_data: source of data
    output_data: destination storage for processed data
    """
    # read/load immigration_data file
    immigration_data = os.path.join(input_data + "sas_data")

    # load data
    df_immigration = spark.read.parquet(immigration_data)

    # extract columns to create a table
    df_immigration = df_immigration.select("*")

    # rename columns to make them intelligible
    rename_cols = [
        "cic_id",
        "year",
        "month",
        "cit",
        "res",
        "port",
        "arrival_date",
        "mode",
        "address",
        "departure_date",
        "age",
        "visa",
        "count",
        "date_logged",
        "dept_visa_issuance",
        "occupation",
        "arrival_flag",
        "departure_flag",
        "update_flag",
        "match_flag",
        "birth_year",
        "max_stay_date",
        "gender",
        "INS_number",
        "airline",
        "admission_number",
        "flight_number",
        "visatype",
    ]

    # create an iterator between columns
    for col, rename_col in zip(df_immigration.columns, rename_cols):
        df_immigration = df_immigration.withColumnRenamed(col, rename_col)

    immigration_fact = df_immigration[
        [
            "cic_id",
            "year",
            "month",
            "arrival_date",
            "departure_date",
            "mode",
            "visatype",
        ]
    ].dropDuplicates(['cic_id'])

    # dimensions from immigration data
    dim_flight_details = df_immigration.select([monotonically_increasing_id().alias('id'), 'cic_id', 'flight_number', 'airline']).dropDuplicates()
    dim_immigrants = df_immigration.select([monotonically_increasing_id().alias('id'), 'cic_id', 'cit', 'res', 'visa', 'age', 'occupation', 'gender', 'address', 'INS_number']).dropDuplicates()

    # organizing data
    udf_func = udf(sas_to_date, DateType())
    immigration_fact = immigration_fact.withColumn("arrival_date", udf_func("arrival_date"))
    immigration_fact = immigration_fact.withColumn("departure_date", udf_func("departure_date"))
    file = os.path.join(input_data + "I94_SAS_Labels_Descriptions.SAS")
    countries = read_labels(file, first_row=10, last_row=298)
    cities = read_labels(file, first_row=303, last_row=962)
    countries_df = spark.createDataFrame(countries.items(), ['code', 'country']).dropDuplicates()
    cities_df = spark.createDataFrame(cities.items(), ['code', 'city']).dropDuplicates()
    cities_df = cities_df.withColumn('state', split(cities_df.city, ',').getItem(1))\
            .withColumn('city', split(cities_df.city, ',').getItem(0))
    cities_df = cities_df.select([monotonically_increasing_id().alias('id'), '*'])
    countries_df = countries_df.select([monotonically_increasing_id().alias('id'), '*'])

    # write parquet
    countries_df.write.parquet(output_data + "countries_df.parquet")
    cities_df.write.parquet(output_data + "cities_df.parquet")
    immigration_fact.write.mode('overwrite').parquet(output_data + "fact_immigration.parquet")
    dim_immigrants.write.parquet(output_data + "dim_immigrants.parquet")
    dim_flight_details.write.parquet(output_data + "dim_flight_details.parquet")


In [None]:
def process_demographics_data(spark, input_data, output_data):
    """
    loads and processes city data
    input_data: path to data source
    output_data: path to data store
    """

    # fetch data path
    city_data = os.path.join(input_data + "us-cities-demographics.csv")

    # load city data
    city_df = spark.read.load(city_data, sep=";", format="csv", header=True)

    # Demography, examine the city based on size, and structure (Gender & Ethnicity composition)
    city_demography = city_df.select(
        [
            "City",
            "State",
            "State Code",
            "Male Population",
            "Female Population",
            "Foreign-born",
            "Number of Veterans",
            "Race",
        ]
    ).dropDuplicates()
    city_demography.write.parquet(output_data + "city_demography.parquet")

    # statistics on city such as total population, and median age
    city_stats = city_df.select(
        ["City", "State Code", "Median Age", "Average Household Size", "Count"]
    ).dropDuplicates()
    city_stats.write.mode('overwrite').parquet(output_data + "city_stats.parquet")


In [None]:
def process_airport_data(spark, input_data, output_data):
    """
    loads and processes city data
    input_data: path to data source
    output_data: path to data store
    """

    # fetch data source
    airport_data = os.path.join(input_data + "airport-codes_csv.csv")
    
    # load data
    airport_df = spark.read.csv(airport_data, header=True)
    airport_df = airport_df.filter(
        (airport_df.iso_country == "US") & ~(airport_df.type == "closed")
    ).dropDuplicates()
    
    # data quality check (unique identity column)
    check_unique_id_column(airport_df, 'idents')
    
    airport_dim = airport_df.select(["ident", "type", "name", "continent", "gps_code", "iata_code", "local_code", "iso_country"]).dropDuplicates()
    airport_stats = airport_df.select(["ident", "elevation_ft", "coordinates"]).dropDuplicates()

    airport_dim.write.parquet(output_data + "airports_dim.parquet")
    airport_stats.write.parquet(output_data + "airports_stats.parquet")

### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:

- Integrity constraints on the relational database (e.g., unique key, data type, etc.)
- Unit tests for the scripts to ensure they are doing the right thing
- Source/Count checks to ensure completeness

### Run Quality Checks
- cities_dtype = [('id', 'bigint'), ('code', 'string'), ('city', 'string'), ('state', 'string')]
- countries_dtype = [('id', 'bigint'), ('code', 'string'), ('country', 'string')]
- city_demography_dtype = [('City', 'string'), ('State', 'string'), ('State Code', 'string'), ('Male Population', 'string'), ('Female Population', 'string'), ('Foreign-born', 'string'), ('Number of Veterans', 'string'), ('Race', 'string')]
- city_stats_dtype = [('City', 'string'), ('State Code', 'string'), ('Median Age', 'string'), ('Average Household Size', 'string'), ('Count', 'string')]
- airports_stats_dtype = [('ident', 'string'), ('elevation_ft', 'string'), ('coordinates', 'string')]
- airports_dim_dtype = [('ident', 'string'), ('type', 'string'), ('name', 'string'), ('continent', 'string'), ('gps_code', 'string'), ('iata_code', 'string'), ('local_code', 'string'), ('iso_country', 'string')]
- fact_immigration_dtype = [('cic_id', 'double'), ('year', 'double'), ('month', 'double'), ('arrival_date', 'date'), ('departure_date', 'date'), ('mode', 'double'), ('visatype', 'string')]

In [None]:
def check_schema(spark, output_path):
    print(Path(output_path))
    output_path = Path(output_path)
    for file in output_path.iterdir():
        if file.is_dir():
            if file == 'fact_immigration.parquet' and (spark.read.parquet(str(file)).dtypes != fact_immigration_dtype):
                raise ValueError("Fact Immigration Table schema not correct")
            elif file == 'airports_dim.parquet' and (spark.read.parquet(str(file)).dtypes != airports_dim_dtype):
                raise ValueError("Airports Dim Table schema not correct")
            elif file == 'airports_stats.parquet' and (spark.read.parquet(str(file)).dtypes != airports_stats_dtype):
                raise ValueError("Airports Stats Table schema not correct")
            elif file == 'city_stats.parquet' and (spark.read.parquet(str(file)).dtypes != city_stats_dtype):
                raise ValueError("City Stats Table schema not correct")
            elif file == 'city_demography.parquet' and (spark.read.parquet(str(file)).dtypes != city_demography_dtype):
                raise ValueError("City Demography Table schema not correct")
            elif file == 'countries.parquet' and (spark.read.parquet(str(file)).dtypes != countries_dtype):
                raise ValueError("Countries Table schema not correct")
            elif file == 'cities.parquet' and (spark.read.parquet(str(file)).dtypes != cities_dtype):
                raise ValueError("Cities Table schema not correct")
            else:
                "Table Schemas are correct"
          


In [None]:
def check_empty_tables(spark, output_path):
    output_path = Path(output_path)
    for file in output_path.iterdir():
        if file.is_dir():
            df = spark.read.parquet(str(file)).count()
            if df == 0:
                print("Empty Table: ", df.split('/')[-1])
            else:
                print("Table: ", df.split('/')[-1], "is not empty")


### 4.3 Data dictionary
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

## Step 5: Complete Project Write Up
- Clearly state the rationale for the choice of tools and technologies for the project.
- Propose how often the data should be updated and why.
- Write a description of how you would approach the problem differently under the following scenarios:
- The data was increased by 100x.
- The data populates a dashboard that must be updated on a daily basis by 7am every day.
- The database needed to be accessed by 100+ people.