# Creation of a Full Dag
### Part 1. Full code
### Part 2. Detailed Explanation of the full code.

#### -----------------------------------------------------------------------------------

#### Part 1. Full code

In [None]:
import datetime
import logging

from airflow import DAG
from airflow.contrib.hooks.aws_hook import AwsHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator

import sql_statements

## function definition for the task for loading_trip_data_to_redshift
def load_trip_data_to_redshift(*args, **kwargs):
    aws_hook = AwsHook("aws_credentials")
    credentials = aws_hook.get_credentials()
    redshift_hook = PostgresHook("redshift")
    execution_date = kwargs["execution_date"]
    sql_stmt = sql_statements.COPY_MONTHLY_TRIPS_SQL.format(
        credentials.access_key,
        credentials.secret_key,
        year=execution_date.year,
        month=execution_date.month
    )
    redshift_hook.run(sql_stmt)

## function definition for the task for loading_station_data_to_redshift
def load_station_data_to_redshift(*args, **kwargs):
    aws_hook = AwsHook("aws_credentials")
    credentials = aws_hook.get_credentials()
    redshift_hook = PostgresHook("redshift")
    sql_stmt = sql_statements.COPY_STATIONS_SQL.format(
        credentials.access_key,
        credentials.secret_key,
    )
    redshift_hook.run(sql_stmt)


## function definition for the code quality task for checking_greater_than_zero  
def check_greater_than_zero(*args, **kwargs):
    table = kwargs["params"]["table"]
    redshift_hook = PostgresHook("redshift")
    records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {table}")
    if len(records) < 1 or len(records[0]) < 1:
        raise ValueError(f"Data quality check failed. {table} returned no results")
    num_records = records[0][0]
    if num_records < 1:
        raise ValueError(f"Data quality check failed. {table} contained 0 rows")
    logging.info(f"Data quality on table {table} check passed with {records[0][0]} records")


## Dag definition
dag = DAG(
    'lesson2.exercise4',
    start_date=datetime.datetime(2018, 1, 1, 0, 0, 0, 0),
    end_date=datetime.datetime(2018, 12, 1, 0, 0, 0, 0),
    schedule_interval='@monthly',
    max_active_runs=1
)


## Task to create the trips table in redshift
create_trips_table = PostgresOperator(
    task_id="create_trips_table",
    dag=dag,
    postgres_conn_id="redshift",
    sql=sql_statements.CREATE_TRIPS_TABLE_SQL
)


## Task to load_trips_data_from_s3_to_redshift
copy_trips_task = PythonOperator(
    task_id='load_trips_from_s3_to_redshift',
    dag=dag,
    python_callable=load_trip_data_to_redshift,
    provide_context=True,
)

## Task to check data quality
check_trips = PythonOperator(
    task_id='check_trips_data',
    dag=dag,
    python_callable=check_greater_than_zero,
    provide_context=True,
    params={
        'table': 'trips',
    }
)

## Task to create_stations_table
create_stations_table = PostgresOperator(
    task_id="create_stations_table",
    dag=dag,
    postgres_conn_id="redshift",
    sql=sql_statements.CREATE_STATIONS_TABLE_SQL,
)


## Task to load_stations_data_from_s3_to_redshift
copy_stations_task = PythonOperator(
    task_id='load_stations_from_s3_to_redshift',
    dag=dag,
    python_callable=load_station_data_to_redshift,
)


## Task to check_stations_data 
check_stations = PythonOperator(
    task_id='check_stations_data',
    dag=dag,
    python_callable=check_greater_than_zero,
    provide_context=True,
    params={
        'table': 'stations',
    }
)


## dependencies
create_trips_table >> copy_trips_task
create_stations_table >> copy_stations_task
copy_stations_task >> check_stations
copy_trips_task >> check_trips

### Part 2. Detailed Explanations

#### Libraries 

In [None]:
import datetime  # Used for datetime calculations.


import logging   # It is used to include information (generated by user) into the airflow logs. 
# Helpful in debugging the code


from airflow import DAG  # This Class helps us to generate dags.


# Hooks:
# Hooks are basically used to connect to other applications through their APIs. 
# For instance we can connect Airflow application to AWS, Azure, GCP , redshift , Gcp storage, S3 etc.
# We can also use the associated methods/functions related to the hooks.
# There are two types of hooks: 1. created by Apache Airflow ; 2. Hooks created and contributed by the users.

from airflow.contrib.hooks.aws_hook import AwsHook #  This is meant to interact with AWS. 
# This class is a thin wrapper around the boto3 python library.
# The airflow contributors have created several custom hooks.
# These hooks have been shared and opensourced for all the airflow users. Similarly there are several 
# - custom operators created by airflow contributors. 
# We can check available hooks and operators as mentioned below:
# hooks     :   https://airflow.apache.org/docs/stable/_api/airflow/contrib/hooks/index.html
# operators :   https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/index.html

    

from airflow.hooks.postgres_hook import PostgresHook # Postgrehook is a hook created by Apache Airflow.
# It can be used to connect postgres database as well as AWS redshift.


# Operators: 
# While DAGs describe how to run a workflow, Operators determine what actually gets done.
# Operators are a class which when instantiated with params then it becomes a task.
# An operator describes a single task in a workflow. Operators are usually (but not always) atomic, 
# - meaning they can stand on their own and don’t need to share resources with any other operators. 
# The DAG will make sure that operators run in the correct certain order; other than those dependencies, 
# - operators generally run independently. In fact, they may run on two completely different machines.

from airflow.operators.postgres_operator import PostgresOperator # This operator is built by Apache Airflow itself.
# This is used to create tasks, which deals not only  with postgresql, but also AWS redshift.

from airflow.operators.python_operator import PythonOperator # Python operator is used to create python related tasks.

# Note: Since we know that Apapche airflow is written in python, so why are using operators other than pythonoperator?
# Yes , we can do it solely with python operator but there are several operator classes 
# - that have been prebuilt using python to assist us in creaing the tasks efficiently
# - with less coding. It helps in reducing the repeatbale tasks in airflow by creating custom classes
# - and associated methods for making our coding efficient. That is also the reason
# - why airflow community is constantly contributing custom operators and hooks.

In [None]:
import sql_statements  # sql statements have been written in this python file and has been imported.

## DAG Definition

In [None]:
dag = DAG(
          'Dag_Name_abc',
           start_date=datetime.datetime(2018, 1, 1, 0, 0, 0, 0),
           end_date=datetime.datetime(2018, 12, 1, 0, 0, 0, 0),
           schedule_interval='@monthly',
           max_active_runs= 1
         )

# start_date :datetime.datetime(year, month, day, hour, minutes, seconds, microSeconds); start date of dag

# end_date   :datetime.datetime(year, month, day, hour, minutes, seconds, microSeconds); end date of dag

# schedule_interval: Interval at which the dag will run between start and end date.

# max_active_run : During Backfilling Jobs, if we want our dags to run one after another only then we assign

# - its value as 1. This happens when we have more than one workers to run our dags in parallel.

# Note: Backfilling means that when our start date is less than our current date then  
# - airflow tries to backfill all the dag runs which were pending.

### Task related Function Definitions

In [None]:
## function definition for the task for loading_trip_data_to_redshift
def load_trip_data_to_redshift(*args, **kwargs):
    aws_hook = AwsHook("aws_credentials")
    credentials = aws_hook.get_credentials()
    redshift_hook = PostgresHook("redshift")
    execution_date = kwargs["execution_date"]
    sql_stmt = sql_statements.COPY_MONTHLY_TRIPS_SQL.format(
        credentials.access_key,
        credentials.secret_key,
        year=execution_date.year,
        month=execution_date.month
    )
    redshift_hook.run(sql_stmt)
    

# *args and **kwargs: are default variables, which can be accessed by the functions  
# -related to an operator only when in the operator definition we state that 'provide_context=True'.

# AwsHook("aws_credentials"): is a hook imported above. We have passed "aws_credentials" to AwsHook.
# "aws_credentials" is the name of the credentials data (aws secret key, aws access key etc) 
# - fed into the connections using connection tab.

# aws_hook.get_credentials() : get_credentials() is a method available for aws_hook instance.
# This is ment to Get the underlying `botocore.Credentials` object.
# This contains the following authentication attributes: access_key, secret_key and token 
# - which can be acessed by dot operator (credentials.access_key, credentials.secret_key).

# PostgresHook("redshift"): Postgresql hook is meant for connecting to AWS redshift.  
# - Here "redshift" is the name of the connection created in the connection tab.

# kwargs["execution_date"]: Here "execution_date" is the default context variable 
# - available for the operator with 'provide_context=True'.
# - This returns an object with two attributes year , month. 
# A very good article to understand this is 
# -  https://blog.godatadriven.com/zen-of-python-and-apache-airflow

# sql_stmt: description is given in below cell

#  redshift_hook.run(sql_stmt): We can run our sql query using the run() method 
# - on the object redshift_hook created above.

In [None]:
# sql_stmt:
# Below mentioned query is imported from 'sql_statements.py'
# sql_stmt: The sql_stmt above is the formatted SQL query to which 
# - we are passing the params to the SQL query.
# COPY_SQL: the query written here is the format in which aws sql query command 
# - for copy is written when copying data to redshift from S3.


COPY_SQL = """
COPY {}
FROM '{}'
ACCESS_KEY_ID '{{}}'
SECRET_ACCESS_KEY '{{}}'
IGNOREHEADER 1
DELIMITER ','
"""

COPY_MONTHLY_TRIPS_SQL = COPY_SQL.format(
    "trips",
    "s3://udacity-dend/data-pipelines/divvy/partitioned/{year}/{month}/divvy_trips.csv"
)

In [None]:
# Keyword arguments available when using 'provide_context=True' , with some random sample values:

# {
# 'END_DATE': '2019-01-01',
# 'conf': <module 'airflow.configuration' from '/opt/conda/lib/python3.6/site-packages/airflow/configuration.py'>,
# 'dag': <DAG: context_demo>,
# 'dag_run': None,
# 'ds': '2019-01-01',
# 'ds_nodash': '20190101',
# 'end_date': '2019-01-01',
# 'execution_date': <Pendulum [2019-01-01T00:00:00+00:00]>,
# 'inlets': [],
# 'latest_date': '2019-01-01',
# 'macros': <module 'airflow.macros' from '/opt/conda/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
# 'next_ds': '2019-01-02',
# 'next_ds_nodash': '20190102',
# 'next_execution_date': datetime.datetime(2019, 1, 2, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
# 'outlets': [],
# 'params': {},
# 'prev_ds': '2018-12-31',
# 'prev_ds_nodash': '20181231',
# 'prev_execution_date': datetime.datetime(2018, 12, 31, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
# 'run_id': None,
# 'tables': None,
# 'task': <Task(PythonOperator): print_exec_date>,
# 'task_instance': <TaskInstance: context_demo.print_exec_date 2019-01-01T00:00:00+00:00 [None]>,
# 'task_instance_key_str': 'context_demo__print_exec_date__20190101',
# 'templates_dict': None,
# 'test_mode': True,
# 'ti': <TaskInstance: context_demo.print_exec_date 2019-01-01T00:00:00+00:00 [None]>,
# 'tomorrow_ds': '2019-01-02',
# 'tomorrow_ds_nodash': '20190102',
# 'ts': '2019-01-01T00:00:00+00:00',
# 'ts_nodash': '20190101T000000',
# 'ts_nodash_with_tz': '20190101T000000+0000',
# 'var': {'json': None, 'value': None},
# 'yesterday_ds': '2018-12-31',
# 'yesterday_ds_nodash': '20181231'
# }



#### Note: We can pass more parameters value in a dictionary format to the key word 'params' above in the the operator definition directly. We will see that example in below codes.

In [None]:
## This function is very similar to the above function. So no explanation needed in this case.

## function definition for the task for loading_station_data_to_redshift
def load_station_data_to_redshift(*args, **kwargs):
    aws_hook = AwsHook("aws_credentials")
    credentials = aws_hook.get_credentials()
    redshift_hook = PostgresHook("redshift")
    sql_stmt = sql_statements.COPY_STATIONS_SQL.format(
        credentials.access_key,
        credentials.secret_key,
    )
    redshift_hook.run(sql_stmt)

### Code Quality Checks Function 

In [None]:
## function definition for the code quality task for checking_greater_than_zero  
def check_greater_than_zero(*args, **kwargs):
    table = kwargs["params"]["table"]
    redshift_hook = PostgresHook("redshift")
    records = redshift_hook.get_records(f"SELECT COUNT(*) FROM {table}")
    if len(records) < 1 or len(records[0]) < 1:
        raise ValueError(f"Data quality check failed. {table} returned no results")
    num_records = records[0][0]
    if num_records < 1:
        raise ValueError(f"Data quality check failed. {table} contained 0 rows")
    logging.info(f"Data quality on table {table} check passed with {records[0][0]} records")
    
    
# kwargs["params"]["table"]: As mentioned above , we have passed a dictionary for table 
# - name in the keyword 'params' of the context variables. Thus to get the table 
# - name we are calling a key of dictionary stored within a  larger dictionary kwargs. 


# PostgresHook("redshift"): We have already defined above. 

# redshift_hook.get_records(f"SELECT COUNT(*) FROM {table}") : The method get_records(f"SELECT COUNT(*) FROM {table}")
# - gives us the number of records.

### PostgreSql Operator and Tasks

In [None]:
## Task to create the trips table in redshift
create_trips_table = PostgresOperator(
    task_id="create_trips_table",
    dag=dag,
    postgres_conn_id="redshift",
    sql=sql_statements.CREATE_TRIPS_TABLE_SQL
)

# Instantiate the PostgresOperator with following parameters:

# task_id : task name

# dag: name of the dag with which this task is connected to.

# postgres_conn_id: Name of the connection created through connection tab of airflow web UI. I
# t contains Credentials required to connect redshift with the airflow.

# sql: is the sql query which is to be ran.

In [None]:
## Task to create_stations_table
create_stations_table = PostgresOperator(
    task_id="create_stations_table",
    dag=dag,
    postgres_conn_id="redshift",
    sql=sql_statements.CREATE_STATIONS_TABLE_SQL,
)

### Python Operator and Tasks

In [None]:
## Task to load_trips_data_from_s3_to_redshift
copy_trips_task = PythonOperator(
    task_id='load_trips_from_s3_to_redshift',
    dag=dag,
    python_callable=load_trip_data_to_redshift,
    provide_context=True,
)

# Instantiate the PythonOperator with following parameters:

# task_id : task name

# dag : name of the dag with which this task is connected to.

# python_callable: name of the function which is the part of this task and needed to be called for the task.

# provide_context : If this is true then we can get all the context variables accessible in the form of **kwargs.

In [None]:
## Task to check data quality
check_trips = PythonOperator(
    task_id='check_trips_data',
    dag=dag,
    python_callable=check_greater_than_zero,
    provide_context=True,
    params={
        'table': 'trips',
    }
)

# Instantiate the PythonOperator with following parameters:

# task_id : task name

# dag : name of the dag with which this task is connected to.

# python_callable: name of the function which is the part of this task and needed to be called for the task.

# provide_context : If this is true then we can get all the context variables accessible in the form of **kwargs.

# params: Once provide_context = True , then we can add some more parameters  
# - in dictionary format  to the **kwargs (context variables) 
# - using params. Here we are passing a dictioanry {'table': 'trips'}.

In [None]:
# Explanation is same as the above Python Operators

## Task to load_stations_data_from_s3_to_redshift
copy_stations_task = PythonOperator(
    task_id='load_stations_from_s3_to_redshift',
    dag=dag,
    python_callable=load_station_data_to_redshift,
)


## Task to check_stations_data 
check_stations = PythonOperator(
    task_id='check_stations_data',
    dag=dag,
    python_callable=check_greater_than_zero,
    provide_context=True,
    params={
        'table': 'stations',
    }
)