# Introduction
This notebook will connect to Redshift (IaC) using Spark and JDBC driver and put everything together by building an ETL pipeline

In [12]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import configparser
import logging
from collections import defaultdict
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

In [3]:
# get config object from config file
config = configparser.ConfigParser()
config.read("../config.cfg")

['../config.cfg']

In [4]:
# retrieve access key and secret key
aws_access_key_id = config['AWS']['aws_access_key_id']
aws_secret_access_key = config['AWS']['aws_secret_access_key']

role_arn = config['Redshift']['role_arn']
region_name = config['Redshift']['region']
bucket_name = config['S3']['bucket_name']
region_name = config['Redshift']['region']

user_name = config['Redshift']['user_name']
password = config['Redshift']['password']
host = config['Redshift']['host']
port = config['Redshift']['port']
schema = config['Redshift']['schema']
database = config['Redshift']['database']

## Put it all together - Build an ETL pipeline

In [15]:
# start an Airflow session
!airflow standalone

[37mstandalone[0m | Starting Airflow Standalone
[37mstandalone[0m | Checking database is initialized
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
ERROR [root] Failed to execute 'udac_example_dag' DAG
WARNI [unusual_prefix_daf6bd5d8d86e554f15a2b38f848b0c451715405_example_kubernetes_executor_config] Could not import DAGs in example_kubernetes_executor_config.py: No module named 'kubernetes'
WARNI [unusual_prefix_daf6bd5d8d86e554f15a2b38f848b0c451715405_example_kubernetes_executor_config] Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']
[37mstandalone[0m | Database ready

[33mIn previous versions, all subclasses of BaseOperator must use apply_default decorator for the `default_args` feature to work properly.[0m

[33mIn current version, it is optional. The decorator is appli

[32m webserver[0m | 127.0.0.1 - - [26/Nov/2021:17:38:03 -0600] "GET /home HTTP/1.1" 200 27586 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36"
[32m webserver[0m | 127.0.0.1 - - [26/Nov/2021:17:38:03 -0600] "GET /static/pin_32.png HTTP/1.1" 304 0 "http://localhost:8080/home" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.55 Safari/537.36"
[36m triggerer[0m | [[34m2021-11-26 17:38:48,079[0m] {[34mtriggerer_job.py:[0m251} INFO[0m - 0 triggers currently running[0m
[36m triggerer[0m | [[34m2021-11-26 17:39:48,181[0m] {[34mtriggerer_job.py:[0m251} INFO[0m - 0 triggers currently running[0m
[36m triggerer[0m | [[34m2021-11-26 17:40:48,342[0m] {[34mtriggerer_job.py:[0m251} INFO[0m - 0 triggers currently running[0m
[36m triggerer[0m | [[34m2021-11-26 17:41:48,471[0m] {[34mtriggerer_job.py:[0m251} INFO[0m - 0 triggers currently 

In [15]:
# add default arguments
default_args = {
    'owner': 'udacity',
    'start_date': datetime(2019, 1, 12),
    'depends_on_past': False,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5),
    'catchup': False,
    'on_failure_callback': logging.error("Failed to execute 'udac_example_dag' DAG")
}

# create the main DAG
dag = DAG('nyse-stock',
          default_args=default_args,
          description='an end-to-end ETL Pipeline from feching API to S3 to Redshift',
          schedule_interval='0 * * * *'
        )

# create a dummy operator
start_operator = DummyOperator(task_id='Begin_execution',  dag=dag)

# fetch stock data from RapidAPI
fetch_stock_data_task = PythonOperator(
    task_id = "fetch_stock_data_task",
    dag = dag,
    python_callable = fetch_stock_data
)

# process indicators table
process_indicators_table_task = PythonOperator(
    task_id = "process_indicators_table_task",
    dag = dag,
    python_callable = process_indicators_table
)

# process metadata table
process_metadata_table_task = PythonOperator(
    task_id = "process_metadata_table_task",
    dag = dag,
    python_callable = process_metadata_table 
)

# upload indicators table to S3
upload_indicators_to_S3_task = PythonOperator(
    task_id = "upload_indicators_S3_task",
    dag = dag,
    python_callable = pload_indicators_to_S3
)

# upload metadata table to S3
upload_metadata_to_S3_task = PythonOperator(
    task_id = "upload_metadata_S3_task",
    dag = dag,
    python_callable = upload_metadata_to_S3
)

# migrate S3 to redshift
migrate_S3_to_redshift_task = PythonOperator(
    task_id = "migrate_S3_to_redshift_task",
    dag = dag,
    python_callable = migrate_S3_to_redshift
)

# include data quality
data_quality_check_task = PythonOperator(
    task_id = "data_quality_check_task",
    dag = dag,
    python_callable = data_quality_check
)

# create a dummy operator
end_operator = DummyOperator(task_id='Stop_execution',  dag=dag)

# Add the dependencies
start_operator >> fetch_stock_data_task
fetch_stock_data_task >> [process_indicators_table_task, process_metadata_table_task]
process_indicators_table_task >> upload_indicators_to_S3_task
process_metadata_table_task >> upload_metadata_to_S3_task
[upload_indicators_to_S3_task, upload_metadata_to_S3_task] >> migrate_S3_to_redshift_task
migrate_S3_to_redshift_task >> data_quality_check_task
data_quality_check_task >> end_operator