# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [17]:
# Do all imports and installs here
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [18]:
# Read in the data here


In [19]:
#df.head()

In [20]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [21]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [22]:
# Performing cleaning tasks here

#This cleaning process was achieved using Spark due to its fast, distributed data processing abilities - especially for large datasets.
#Input Files:
#cities
#immigrations



def model_data():
    """
        @description:
            This function performs the data cleaning processes using spark. 
            The cleaned data is then loaded to S3 (bucket specified in the config.cfg)
        @params:
            None.
    """


    sc = spark_session.sparkContext
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", config['AWS']['KEY'] )
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config['AWS']['SECRET'] )
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
    sc._jsc.hadoopConfiguration().set("parquet.enable.summary-metadata", "false")

    IMMIGRATION_DATA  = 'datasets/immigration_data/*.parquet';
    immigration_df = spark_session.read.format('parquet').load(IMMIGRATION_DATA, inferSchema=True , header=True);
    CITIES_DATA = 'datasets/us-cities-demographics.csv';
    cities_df = spark_session.read.format('csv').load(CITIES_DATA, sep=";", inferSchema=True , header=True);

    cleaned_immigration_df = clean_immigration_data(immigration_df)
    cleaned_cities_data = clean_cities_data(cities_df)

    print(f'========================================= WRITING staging_immigrations_table TABLE TO S3 =========================================')
    cleaned_immigration_df.repartition(1).write.mode('overwrite').parquet(f"s3a://{config['S3']['BUCKET']}/{config['S3']['IMMIGRATION_KEY']}")

    print(f'========================================= WRITING staging_cities_table TABLE TO S3 =========================================')
    cleaned_cities_data.repartition(1).write.mode('overwrite').parquet(f"s3a://{config['S3']['BUCKET']}/{config['S3']['CITIES_KEY']}")

    return 'Done'


tables = ['staging_immigrations_table', 'staging_cities_table', ]


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [23]:
# Write code here

#Both datasets are compiled and staged first to an S3 bucket in parquet file format. This is done to have a Data Lake representation of both datasets/tables, 
#and easy access to redshift. Two major tools are used in this case: Spark - for loading the data, and Amazon S3. Having these datasets stored in an S3 bucket 
#allows an opportunity to use the Data Lake method.
#Amazons Redshift tool is used as a warehouse to store these datasets in separate facts and dimensional tables. The S3 to Redshift workflow is managed 
#Airflow which is DAG based Data Engineering workflow management system. Airflow is used in this case to ensure each of the above processes are carried 
#out in the right order, and the right scheduled time, making the ETL process as seamless as possible.


def __init__(
                 self,
                redshift_conn_id = "redshift_conn_id",
                aws_connection_id = "aws_conn_id",
                table = "",
                s3_bucket = "",
                s3_key = "",
                ignore_headers = 1,
                delimeter = ",",
                data_format = "csv",
                *args, **kwargs
                ):
        super(StageTablesOperator, self).__init__(*args, **kwargs)

        self.redshift_conn_id = redshift_conn_id
        self.aws_connection_id = aws_connection_id
        self.table = table
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        self.ignore_headers = ignore_headers
        self.delimeter = delimeter
        self.data_format = data_format

def execute(self, context):
        self.log.info("Fetching credentials")
        aws_hook = AwsHook(self.aws_connection_id, client_type='s3')
        aws_credentials = aws_hook.get_credentials()

        redshift_conn = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        rendered_key = self.s3_key.format(**context)
        s3_bucket_uri = f"s3://{self.s3_bucket}/{rendered_key}"
    
        formatted_sql = f""" 
                COPY {self.table}
                FROM '{s3_bucket_uri}/'
                ACCESS_KEY_ID '{aws_credentials.access_key}'
                SECRET_ACCESS_KEY '{aws_credentials.secret_key}'
                FORMAT AS {self.data_format}
            """

        self.log.info(f"Copying {self.table} data from s3 to redshift")
        redshift_conn.run(formatted_sql)
        return 'Done'

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [24]:
# Perform quality checks here
#Null Value Checks
#Data Count Checks
#Data-types Checks



def data_count_check(*args, **kwargs):
    """
        description: This function runs a data count quality check on the specified table in redshift. 
            It compares the resulting number of rows with the expected value and throws an error if these values do not match.
        params:
            table (STR): The Redshift table needed to be tested.
            expected_row_count (INT): The number of rows expected to be in the specified table
        returns:
            ValueError (Error): if the number of rows do not match.
    """
    table = kwargs["params"]["table"]
    expected_row_count = kwargs["params"]["expected_row_count"]
    redshift_hook = PostgresHook("redshift_conn_id")
    records = redshift_hook.get_records(f"""
        SELECT COUNT(*)
        FROM {table}
    """)

    if records[0][0] < expected_row_count:
        raise ValueError(f"Expected row count for {table} to be {expected_row_count}, found {records[0][0]}");

    logging.info(f"Data count check passed with number of records = ", records[0][0])

def null_value_check(*args, **kwargs):
    """
        description:
            This function runs a null value quality check on the specified table, and column in redshift. 
            It checks if the specified column has a null value in it.
        params:
            table (STR): The Redshift table needed to be tested.
            column_name (STR): The name of the column to check
        returns:
            ValueError (Error): If at least one null value is found in the column
    """
    table = kwargs["params"]["table"]
    column_name = kwargs["params"]["column_name"]
    redshift_hook = PostgresHook("redshift_conn_id")
    records = redshift_hook.get_records(f"""
        SELECT COUNT(*)
        FROM {table}
        WHERE {column_name} IS NULL
    """)

    if records[0][0] > 0:
        raise ValueError(f"Expected null value count for {table} to be 0, found {records[0][0]}");

    logging.info(f"Data count check passed no null values!")

def column_type_check(*args, **kwargs):
    """
        description:
            This function runs a column type quality check on the specified table in redshift. 
            This ensures columns are of the expected data types.
        params:
            table (STR): The Redshift table needed to be tested.
            column_name (STR): The name of the column to check
            data_type (STR): Expected data type of the column. E.g. DATE, VARCHAR, INTEGER
        returns:
            TypeError (Error): if the columns datatype do not match
    """
    table = kwargs["params"]["table"]
    column_name = kwargs["params"]["column_name"]
    data_type = kwargs["params"]["data_type"]
    redshift_hook = PostgresHook("redshift_conn_id")
    records = redshift_hook.get_records(f"""
        select "column", type
        from pg_table_def
        where tablename = '{table}'
    """)
    for index in range(len(records)):
        if records[index] == column_name:
            if records[index][0] != data_type:
                raise TypeError(f"Invalid column data type. Expected {data_type}, found {records[index][0]}")


    logging.info(f"Records", records)

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.