# Airflow

## Topics
* General
    * What is a workflow?
    * What is Airflow?
    * What are DAGs?
    * How can Airflow be accessed?
* Important Concepts
* Airflow DAGs
* Airflow Scheduling
* Airflow Web UI
* Airflow DAGs Operators (and Sensors)
    * BashOperator()
    * PythonOperator()
    * EmailOperator()
    * Sensors
* Airflow Executors
* Airflow Troubleshooting and Debugging
* Airflow SLAs & Reporting


## General
* What is a workflow? 
    * In the data engineering context, `workflow` is essentially a set of steps to accomplish a data engineering task: downloading a file, copying data, filtering information, writing to a database, etc. Something similar to the ETL pipeline process.
* What is Airflow?
    * It's an orchestration tool, or a platform, to program `workflows`.
    * Airflow are is responsible for:
        * Creation (of a worflow)
        * Scheduling (a worflow)
        * Monitoring (a worflow)
* What are DAGs?
    * Airflow implements workflows as DAGs (Directed Acyclic Graphs)
    * DAGs are a set of tasks with dependencies between them
* How can Airflow be accessed?
    * Via Code, via command-line, via a built-in web interface, via REST API
* How to run a specific task?
    * airflow tasks test <dag_id> <task_id> <execution_date>
* How to run an entire DAG
    * airflow dags trigger -e <execution_date> <dag_id>
    * this runs a full DAG as if it were running on the specified date

In [None]:
# DAG Code Example
    # Within a python code, you refer to this with the 'etl_pipeline' variable identifier
    # Within airflow shell command, you refer to this with the dag_id
etl_dag = DAG(dag_id='etl_pipeline', default_args={"start_date": "2024-01-08"})


# Running a workflow in Airflow
    # Shell command: airflow tasks test <dag_id> <task_id> [execution date]
    # Example: An Airflow DAG has a dag_id of etl_pipeline. The task_id is download_file and the start_date is 2023-01-08
    airflow tasks test etl_pipeline download_file 2023-01-08

## Important Concepts
* The Airflow System is the highest level of concepts.
* The Airflow System contains "components"
    * Example of "components":
        * Airflow Scheduler: triggers scheduled workflows, or submit Tasks to the executor to run
        * Airflow webserver: user interface to inspect, trigger and debug DAGs and tasks.
* DAGs = Workflows = Collection/Set of Tasks (= Python DAG files because they are python scripts)
    * "DAG run": Instance of a workflow (or a DAG) at a given point in time.
    * Check: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html
    * It is a Python script
        * Check the line `dags_folder` folder setting in the `airflow.cfg` to see the location of the Python DAG files
* Operators, Sensors, and TaskFlow are like "functions" (in the Airflow context) that outputs tasks.
    * Note: We cannot call them airflow "components" because "components" are broader, like an Airflow "Scheduler" or "webserver" are components of the airflow context. 
* Tasks are instances of Operators/Sensors/or TaskFlows and are usually assigned to a python variable.
    * You can perform multiple different tasks by using the same Operator.
    * Different operators have different ideas
        * BashOperator() to run bash/shell commands - expects a `bash_command`
        * EmailOperator() to send emails - expects a `bash_command`
        * PythonOperator() to execute python functions/scripts or callable methods - expects a `python_callable`
        * BranchPythonOperator() to execute different tasks based on conditional logic (an output of a another task) - expects a `python_callable` (which must accept **kwargs) and a `provide_context=True`
        * FileSensor() to check if a file exists - expects a `filepath` and might need `mode` or `poke_interval` attributes
    * Example of tasks: running a command, sending an email, running a python script, etc.
* Dependencies
    * Defines the edges between components in the DAG.
        * Example: 
            * A Component can be a task that was instantiated by a BashOperator(). Suppose you have multiple tasks.
            * We need to define the order of execution for the tasks.
            * For you to go from task_1 to task_2, I need to use `bitshifts` (`>>` or `<<`) between tasks. They create the dependencies.

In [None]:
# Airflow commands
    # In the airflow shell command: 
    airflow
# Airflow help for the dags command:
    airflow dags -h

## Airflow DAGs
* Terminology
    * Directed: represents the inherent flow of the dependencies between components. 
    * Acyclic: each component is executed once per run (does not loop or repeat).
    * Graph: represents the components and the relationships/dependencies between them.
    * Tasks: something in the workflow that needs to be done. Ex: Task is an instance of an Operator.
* Properties
    * Are written in Python, but can use components in other languages (can include bash scripts, spark jobs, etc).
    * Components: are tasks to be executed, such as: operators, sensors, etc. I.e., 
    * Dependencies: define the execution order and can be defined implicitly or explicitly
* Airflow Shell Command line vs. Python command line
    * The airflow command line is used to:
        * start Airflow processes (ex: webserver, scheduler)
        * manually run DAGs or tasks
        * review logging info
    * The python command line:
        * to create or edit a DAG and the data code processing itself (of course)

In [None]:
# DAG Example: Copy a file to a server prior to importing it to a database service

# Step 0: it's good to know what DAGs are already defined: 
    # airflow shell: airflow dags list
# Step 1: import the DAG object from airflow
# Step 2: import other needed modules and create a default_arguments dict in order to organize the workflow
    # owner: represent the name of the owner of the DAG
    # email: owner's email for alerting purposes
    # start_date: represents the earliest time a DAG could be run
# Step 3: create a DAG object for your workflow
    # define the DAG object using a python Context Manager (with DAG(...) as DAG_ALIAS)
    # before Airflow 2, we would create an instance of a DAG without a context manager: etl_dag = DAG(...)
# Step 4: troubleshoot any problem in the creation of a DAG by checking whether it was created:
    # airflow shell: 
    airflow dags list

# Import the DAG object
from airflow import DAG

# Import other modules
from datetime import datetime

# Define the default_args dictionary
default_args = {
  'owner': 'dsmith',
  'start_date': datetime(2023, 1, 14),
  'retries': 2
}

# Instantiate the DAG object
with DAG('example_etl', default_args=default_args) as etl_dag:
  pass

## Airflow Scheduling
* DAG run
    * Instance of a workflow (or a DAG) at a given point in time
    * can be run manually or via the schedule_interval parameter (passed when the DAG is defined)
        * it represents how often to schedule the DAG runs (occurs between start_date and end_date)
        * can be defined by a CRON style syntax or via built-in presets
    * may have multiple states: running, failed, success, queued, skipped, etc
    * you can check in the Airflow Web UI: Browser > DAG runs
* Airflow scheduler presets:
    * schedule_interval presets:
        * @hourly (equivalent to CRON: `0****`)
        * @daily (equivalent to CRON: `00***`)
        * @weekly (equivalent to CRON: `00**0`)
    * special presets:
        * None: used for manually triggered workflows (it will never be scheduled)
        * @once: scheduled only once
* Nuance:
    * Airflow will run the scheduled the DAG at "start_date + schedule interval", i.e., once the schedule interval has passed beyond the start_date
    * Example:
        * 'start_date': datetime(2020, 2, 25), 'schedule_interval': @daily
        * This means the earliest starting time to run the DAG is on February 26th, 2020 

In [None]:
# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2023, 11, 1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

# Configure a schedule of every Wednesday at 12:30pm with CRON syntax
dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')

## Airflow Web UI
* You can use either the Web UI or the command line tool
* You can visualize the current DAGs as well as the code (in read-only mode)
* For logging: Browse > Audit Logs > choose your Event

In [None]:
# Starting the Airflow 'webserver'
# You've successfully created some DAGs within Airflow using the command-line tools, but notice that it can be a bit tricky
# to handle scheduling / troubleshooting / etc. After reading the documentation further, you realize that you'd like to access
# the Airflow web interface. 
# For security reasons, you'd like to start the webserver on port 9090

# Checking for help
airflow webserver -h

# Starting the webserver on port 9090
airflow webserver -p 9090

# How to examine any available DAGs
    # Checking for operators: DAGs > select a DAG > Graph > check which operators are being used 

## Airflow DAGs Operators (and Sensors)
* Tasks are instances of Operators.
    * Example of tasks: running a command, sending an email, running a python script, etc.
* Operators do not share information with each other (it is possible if you want)
* Example of Operators:
    * EmptyOperator()
        * Can be used to represent a task for troubleshooting
        * Can also be used to represent a task that has not been implemented yet
    * BashOperator()
        * Executes a given bash command or script
        * Can specify Environment Variables

In [None]:
# Import the BashOperator
from airflow.operators.bash import BashOperator

# Example of a simple BashOperator()
BashOperator(task_id='bash_example',
            bash_command='echo "Example!"',
            # Next line only for old Airflow
            dag=dag)

In [None]:
# Example of a BashOperator()
    # you've been running some scripts manually to clean data (using a script called cleanup.sh)
    # you've realized it's becoming difficult to keep up with running everything manually, 
    # much less dealing with errors or retries. You'd like to implement a simple script as an Airflow operator.

# Import the BashOperator
from airflow.operators.bash import BashOperator

with DAG(dag_id="test_dag", default_args={"start_date": "2024-01-01"}) as analytics_dag:
  # Define the BashOperator 
  cleanup = BashOperator(
      task_id='cleanup_task',
      # Define the bash_command
      bash_command='cleanup.sh',
  )

# You have two more scripts, consolidate_data.sh and push_data.sh
# These further process your data and copy to its final location

# Define a second operator to run the `consolidate_data.sh` script
consolidate = BashOperator(
    task_id='consolidate_task',
    bash_command='consolidate_data.sh'
    )

# Define a third and final operator to execute the `push_data.sh` script
push_data = BashOperator(
    task_id='pushdata_task',
    bash_command='push_data.sh'
    )



### BashOperator(): Airflow Tasks & Dependencies
* Tasks
    * Are instances of Operators, usually assigned to a python variable.
    * Within Airflow tools, tasks are referred by the task_id.
* Tasks dependencies 
    * Defines an order of execution. If they are not defined, tasks can be executed without guarantee of order
    * Are referred to as:
        * upstream task (must be completed `before` a downstream task): >> (upstream operator)
        * or downstream tasks (must be completed `after` an upstream task): << (downstream operator)
    * Note: >> or << are called "bitshift operators"
    * Using bitshift operator ensures that the order is present in the "DAGs > Graph" bu a line connecting the tasks (or the instances of the Operators)
    
    <img src="sources/datacamp/img/bitshift.png" alt="bitshift" width="400px">
    <img src="sources/datacamp/img/dependencies.png" alt="dependencies" width="400px">

* Troubleshooting

    


In [None]:
# Example of Tasks and Dependencies

# Define the tasks
task1 = BashOperator(task_id='first_task',
                     bash_command='echo 1')
task2 = BashOperator(task_id='second_task',
                     bash_command='echo 2')
# Set first_task to run before second_task 
task1 >> task2   # or task2 << task1


In [None]:
# Example of a more complex DAG (or workflow)

# Define a new pull_sales task
pull_sales = BashOperator(
    task_id='pullsales_task',
    bash_command='wget https://salestracking/latestinfo?json'
)

# Set pull_sales to run prior to cleanup
pull_sales >> cleanup

# Configure consolidate to run after cleanup
cleanup >> consolidate

# Set push_data to run last
consolidate >> push_data

In [None]:
# Troubleshooting
    # Run the airflow dags command to see all subcommands available. Look for a subcommand to read errors and run it.
    airflow dags

    # Check the error message of the associated DAGs with errors
    airflow dags list-import-errors

    # Use cat workspace/dags/codependent.py to view the Python code
        # ls -> airflow  config  start.sh  startup  workspace 
    cat workspace/dags/codependent.py

    # Check the code below for the error at the end
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from datetime import datetime

    default_args = {
    'owner': 'dsmith',
    'start_date': datetime(2023, 2, 12),
    'retries': 1
    }

    with DAG('codependency', default_args=default_args) as codependency_dag:

    task1 = BashOperator(task_id='first_task',
                        bash_command='echo 1',
                        dag=codependency_dag)

    task2 = BashOperator(task_id='second_task',
                        bash_command='echo 2',
                        dag=codependency_dag)

    task3 = BashOperator(task_id='third_task',
                        bash_command='echo 3',
                        dag=codependency_dag)

    # task1 must run before task2 which must run before task3
    task1 >> task2
    task2 >> task3
    task3 >> task1 # THIS IS THE ERROR, SO IT MUST BE REMOVED. A DAG CANNOT HAVE A LOOP.

### PythonOperator() and EmailOperator()
* You've implemented several Airflow tasks using the BashOperator but realize that a couple of specific tasks would be better implemented using Python. You'll implement a task to download and save a file to the system within Airflow.
* PythonOperator(): Executes Python functions or a callable method.
* EmailOperator(): Send an email from within an Airflow task


In [None]:
# Example of PythonOperator() that writes messages to the task logs
# Notes: 
    # The DAG process_sales_dag is already defined.
    # this Python function is already defined: parse_file(inputfile, outputfile)
import requests
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

# Define the method
def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")


# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'}
)

# Add another Python task
parse_file_task = PythonOperator(
    task_id='parse_file',
    # Set the function to call
    python_callable=parse_file,
    # Add the arguments
    op_kwargs={'inputfile':'latestsales.json', 'outputfile':'parsedfile.json'},
)

# Import the Operator
from airflow.operators.email import EmailOperator

# Define the task
email_manager_task = EmailOperator(
    task_id='email_manager',
    to='manager@datacamp.com',
    subject='Latest sales JSON',
    html_content='Attached is the latest sales JSON file as requested.',
    files='parsedfile.json',
    dag=process_sales_dag
)

# Set the order of tasks
pull_file_task >> parse_file_task >> email_manager_task


### Airflow Sensors
* Sensor = An Airflow operator that waits for a condition to be `True`.
    * Examples of conditions:
        * Creation of a file
        * Upload of a database record
        * Response from a web request
    * Since they can also instantiate tasks (just like Operators), you can use `bitshifts` (`>>` or `<<`) to create dependencies between them as well or between them.
    * Examples of sensors:
        * FileSensor: check for the existence of a file
        * ExternalTaskSensor: wait for a task in another DAG to be done
        * HttpSensor: request a web URL and check for content
        * SQLSensor: request a SQL query and check for content
* Sensor vs. Operator:
    * Always use an Operator, unless you are:
        * Ex: Uncertain when it will be true (you can use a sensor to check a task until it is completed)

## Airflow Executors
They are res* Determine the level of parallelism available on this system.
* They run tasks
* Examples:
    * SequentialExecutor
        * the default executor in airflow
        * runs one task at a time (thus, having multiple workflows will take longer than expected)
        * simple for debugging as it goes one by one
        * however, not recommended for production (due to the limitations of task resources)
    * LocalExecutor
        * runs entirely on a single system
        * treat tasks as processes, so can run concurrent tasks as permitted by your machine CPU/memory/etc
        * This concurrency is the `parallelism` of the system, and it is defined by the User by using a limited or unlimited number of simultaneous tasks 
    * KubernetesExecutor
        * Kubernetes is a Container orchestration system that allow tasks to be run on a cluster of machines.
        * with KubernetesExecutor, multiple Airflow systems can be configured as `Workers` for a given set of Workflows/tasks.
        * More complex as it requires methods that share DAGs between systems (git server, Network File System, etc)


In [None]:
# An Example of Executors: Determine the level of parallelism available on this system.

# Checking what is the executor being used
    # Note that we assume that an Airflow instance is already done in your machine and that the `airflow.cfg`, where most
# airflow configuration and settings are defined, already exists.
    # Go to the command line and check the `airflow.cfg` file and look for "executor ="
    cat airflow/airflow.cfg | grep "executor ="
    # Ex of the output: "executor = SequentialExecutor"

    # you can also do `airflow info` and look for "executor"
    airflow info

In [None]:
# Example of debugging the FileSensor
# From https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html:
    # Something that is checking every second should be in poke mode, while something that is checking every minute should be in reschedule mode.

# Suppose your manager has mentioned that on some days, the workflows are taking a lot longer to finish and asks you to investigate.
# She also mentions that the salesdata_ready.csv file is taking longer to generate these days and the time of day it is completed
# is variable.

# Determine the level of parallelism available on this system.
    # We do 'airflow info' and we see it is using a SequentialExecutor
 
# Then, we go to the DAG file and check the code
    # The order is: precheck >> generate_report_task
    # The code check if the file is available everyday at midnight (CRON: 00 ***)
    # Then, it generates a report.
    # However, the manager said the file can be there at any point in the day, so we need to use a mode that allows for resources to be available after it started checking.
    # Notice that the mode in the FileSensor is 'poke' and not 'reschedule'. Thus, we need to put in reschedule for this reason above.
    # From here (https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html) see that they say:
        # "reschedule: The Sensor takes up a worker slot only when it is checking, and sleeps for a set duration between checks"
# This will optimize resource usage.
# Also, note that this is true given that the Executor being used is the SequentialExecutor

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime

report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *"
)

precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2024,1,20),
    mode='reschedule',  # HERE SHOULD BE 'reschedule' and not 'poke' like it was before!
    dag=report_dag
)

generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2024,1,20),
    dag=report_dag
)

precheck >> generate_report_task


## Airflow Troubleshooting and Debugging
* Common Issues:
    * DAG won't run on schedule
        * Potential Issue 1: the Airflow Scheduler is not running
            * Solution: Check if it is running. If not, go to the command line: `airflow scheduler`
        * Potential Issue 2: 'schedule_interval' has not passed since either the 'start_date' or the last `DAG run`
            * Solution: Modify either 'schedule_interval' or 'start_date'
        * Potential Issue 3: the Executor does not have enough free slots to run tasks
            * Solution 1: Change the Executor type to another one capable of more tasks (LocalExecutor, KubernetesExecutor)
            * Solution 2: Add more system resources (RAM, CPUs)
            * Solution 3: Change the scheduling of your DAGs
    * DAG won't load into the system 
        * Potential Issue 1: DAG won't appear in the DAG View of the Airflow Web UI or in the `airflow dags list`output
            * Solution 1: Check if the python file is in the expected DAGs folder
                * Check the line `dags_folder` folder setting in the `airflow.cfg` to see the location of the Python DAG files
                * The folder should be `absolute path`
            * Potential Issue 1: Syntax errors in your Python code in the Python DAG files
                * Solution 1: `airflow dags list-import-errors`
                * Solution 2: Running the python DAG script: `python dagfile.py`
                    * If there are no error, nothing will appear.

In [None]:
# Debugging - General Steps

# Example 1 - Issue: Where is the dag code?
# Check Airflow Information via `airflow info`
airflow info
    # Apache Airflow
    # version                | 2.7.1
    # executor               | SequentialExecutor
    # task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler
    # sql_alchemy_conn       | sqlite:////home/repl/airflow/airflow.db
    # dags_folder            | /home/repl/workspace/dags
    # plugins_folder         | /home/repl/workspace
    # base_log_folder        | /home/repl/airflow/logs
    # remote_base_log_folder |

# 1) What is the Executor: SequentialExecutor
# 2) Where are the python DAG scripts -> dags_folder: /home/repl/workspace/dags

# Check the information via the configuration file:
# 1) Check the airflow.cfg and find where the python DAG scripts are 
cd airflow
cat airflow/airflow.cfg | grep "dags" # -> dags_folder = /home/repl/workspace/dags

# 2) Check the python DAG script via shell
cd workspace/dags/
nano test_dag.py

# Example 2 - Issue: Where is the execute_report DAG (it does not appear in the system)?

# Checking the DAGs
airflow dags list
    # dag_id     | filepath      | owner   | paused
    # ===========+===============+=========+=======
    # sample_dag | sample_dag.py | airflow | None

    # Yes, it does not appear in the list.

# Checking why is has not been loaded
    # Potential Solution 1: Check if the python file is in the expected DAGs folder
        # Expected dags folder: airflow info -> dags_folder = /home/repl/workspace/dags
        cd workspace # -> dags  execute_report_dag.py
        # the dag should be inside the dags folder, this is the problem
        # now, move the file to inside the dags folder
        # now, try: airflow dags list
        # there is an error:
            # Error: Failed to load all files. For details, run `airflow dags list-import-errors`
            # dag_id     | filepath      | owner   | paused
            # ===========+===============+=========+=======
            # sample_dag | sample_dag.py | airflow | True 
        airflow dags list-import-errors
            # File "<frozen importlib._bootstrap>", line 228,
            #                                         | in _call_with_frames_removed                     
            #                                         |   File                                           
            #                                         | "/home/repl/workspace/dags/execute_report_dag.py"
            #                                         | , line 18, in <module>                           
            #                                         |     generate_report_task = BashOperator(         
            #                                         | NameError: name 'BashOperator' is not defined
        # now, go to the python DAG script
        cd workspace/dags
        nano execute_report_dag.py
        # it seems that we forgot to add: from airflow.operators.bash import BashOperator
        # now, check if the dag is on the list
        airflow dags list
            # dag_id         | filepath              | owner   | paused                                          
            # ===============+=======================+=========+=======                                          
            # execute_report | execute_report_dag.py | airflow | True  
            # sample_dag     | sample_dag.py         | airflow | True 
        # Done! The mistake was corrected.


## Airflow SLAs & Reporting

* SLA = Service Level Agreement 
    * In the Airflow context, SLA is the amount of time a task or DAG should require to run
    * Used to monitor and ensure that tasks complete within a certain time frame
    * SLA Miss = a situation where a task or DAG does not meet the expected timing for the SLA
        * Handling SLA Misses: If the SLA is missed, an email alert is sent out via the system configuration and a note is made in the log
        * You can access them in the Airflow Web UI: Browse > SLA Misses
    * Two example to define SLAs
        * 1) In the task itself: `sla` argument that takes a `timedelta` object with the amount of time to pass
        * 2) On the `default_args` dictionary: define an sla key with the value as a `timedelta` object
    * `timedelta`: representing the allowed duration for the task
        * it's found in the `datetime` library along with the datetime object
        * use: from datetime import timedelta
        * Examples:
            * timedelta(seconds=30)
            * timedelta(weeks=2)
            * timedelta(days=4, hours=10, minutes=20, seconds=30)
* Reporting
    * Email alerting built into Airflow, with options for success/failure/error
        * these are done in `default_args`
        * it can also be done by using the EmailOperator
    * Note: For this, you need to set up the Global Email Configuration, which is not on the scope here. 

In [None]:
# You've successfully implemented several Airflow workflows into production, but you don't currently have any method of determining
# if a workflow takes too long to run. After consulting with your manager and your team, you decide to implement an SLA
# at the DAG level on a test workflow.

# Recall that the SLA applies to the entire workflow and not to an individual task

# Example 1 - Creating a default_args with SLA
# Import the timedelta object
from datetime import timedelta

# Create the dictionary entry
default_args = {
  'start_date': datetime(2024, 1, 20),
  'sla': timedelta(minutes=30)
}

# Add to the DAG
test_dag = DAG('test_workflow', default_args=default_args, schedule_interval=None)

# Example 2 - Creating a task with the SLA
# Import the timedelta object
from datetime import timedelta


test_dag = DAG('test_workflow', start_date=datetime(2024,1,20), schedule_interval=None)

# Create the task with the SLA
task1 = BashOperator(task_id='first_task',
                     sla=timedelta(hours=3),
                     bash_command='initialize_data.sh',
                     dag=test_dag)

In [None]:
# Define the email task
# Airflow will email you with an attached report file "monthly_report.pdf" after the generate_report task completes.

email_report = EmailOperator(
        task_id='email_report',
        to='airflow@datacamp.com',
        subject='Airflow Monthly Report',
        html_content="""Attached is your monthly workflow report - please refer to it for more detail""",
        files=["monthly_report.pdf"],
        dag=report_dag
)

# Set the email task to run after the report is generated
generate_report >> email_report

# You've worked through most of the Airflow configuration for setting up your workflows,
# but you realize you're not getting any notifications when DAG runs complete or fail. 
# You'd like to setup email alerting for the success and failure cases, but you want to send it to two addresses.

# Example of python DAG script with default_args including the email definition
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime

default_args={
    'email': ['airflowalerts@datacamp.com', 'airflowadmin@datacamp.com'],
    'email_on_failure': True,
    'email_on_success': True
}
report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *",
    default_args=default_args
)

precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2023,2,20),
    mode='reschedule',
    dag=report_dag)

generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2023,2,20),
    dag=report_dag
)

precheck >> generate_report_task

## Airflow Templates

* Every time a DAG with 'templated' information is executed, information is interpreted and included in the `DAG run`
    * Recall: `DAG run`: Instance of a workflow (or a DAG) at a given point in time.
* Thus, using Airflow Template allows for substituting information in a `DAG run`
* It uses `JINJA` templating language
* It allows for Airflow built-in runtime variables, for example:
    * {{ ds }} for Execution Data in the YYYY-MM-DD format (as string and not as python datetime object)
    * {{ ds_nodash }} for Execution Data with No Dashes in the YYYYMMDD format (as string and not as python datetime object)
    * {{ dag }} for accessing a full DAG object
    * {{ conf }} for accessing the current Airflow configuration within code
    * {{ macros }} variable (a reference to the Airflow macros package which provides various useful objects /methods for Airflow templates)
        * {{ macros.datetime }}: The datetime.datetime object
        * {{ macros.timedelta }}: The timedelta object
        * etc
* Note: Why would you want to create individual tasks (ie, BashOperators) with specific parameters vs a list of files?
    * When using a single task, all entries would succeed or fail as a single task. 
    * Separate operators allow for better monitoring and scheduling of these tasks.

In [None]:
# Example of using Airflow Template: 
# We want to do something simple: to echo the word "Reading file_XYZ" to a log or output

# 1) First option: create multiple tasks using the BashOperator
t1 = BashOperator(
    task_id='first_task',
    bash_command='echo "Reading file1.txt"',
    dag=dag)

t2 = BashOperator(
    task_id='second_task',
    bash_command='echo "Reading file2.txt"',
    dag=dag)

# Note: if we needed to process 100 files, we would have 100 BashOperator
# Let's change to the Airflow Templated BashOperator, but still multiple tasks
templated_command="""  
    echo "Reading {{ params.filename }}"
"""

t1 = BashOperator(task_id='template_task',
                  bash_command=templated_command,
                  params={'filename': 'file1.txt'},
                  dag=example_dag)
t2 = BashOperator(task_id='template_task',
                  bash_command=templated_command,
                  params={'filename': 'file2.txt'},
                  dag=example_dag)

# 2) Second option create one task and use a loop
# Let's use a for loop to iterate over a list and output what  we need

templated_command="""
{% for filename in params.filenames %}  
    echo "Reading {{ filename }}"
{% endfor %}"""

t1 = BashOperator(task_id='template_task',
                  bash_command=templated_command,
                  params={'filenames': ['file1.txt', 'file2.txt']},
                  dag=example_dag)

# Output:
    # Reading file1.txt
    # Reading file2.txt

In [None]:
# Example: you decide to make some modifications to the design of your 'cleandata' workflow

# You've successfully created a BashOperator that cleans a given data file by executing a script called cleandata.sh. 
# This works, but unfortunately requires the script to be run only for the current day. 
# Some of your data sources are occasionally behind by a couple of days and need to be run manually.

# You successfully modify the cleandata.sh script to take one argument - the date in YYYYMMDD format. 
# Your testing works at the command-line, but you now need to implement this into your Airflow DAG. 
# For now, use the term {{ ds_nodash }} in your template - you'll see exactly what this is means later on.
# Note that for now, we didn't need to define a params argument in the BashOperator - this is ok as Airflow
# handles passing some data into templates automatically for us

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
  'start_date': datetime(2023, 4, 15),
}

cleandata_dag = DAG('cleandata',
                    default_args=default_args,
                    schedule_interval='@daily')

# Create a templated command to execute
# 'bash cleandata.sh datestring'
templated_command="""  
    bash cleandata.sh {{ ds_nodash }}
"""

# Modify clean_task to use the templated command
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          dag=cleandata_dag)

In [None]:
# You wish to build upon your previous DAG and modify the code to support two arguments - the date in YYYYMMDD format, 
# and a file name passed to the cleandata.sh script.
# Modify the templated command to handle a second argument called filename.
templated_command = """
  bash cleandata.sh {{ ds_nodash }} {{ params.filename }}
"""

# Modify clean_task to pass the new argument
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          params={'filename': 'salesdata.txt'},
                          dag=cleandata_dag)

# Create a new BashOperator clean_task2
clean_task2 = BashOperator(task_id='cleandata_task2',
                          bash_command=templated_command,
                          params={'filename': 'supportdata.txt'},
                          dag=cleandata_dag)
                           
# Set the operator dependencies
clean_task >> clean_task2

In [None]:
# Let's implement a Jinja template to iterate over the files in a list and execute a bash command for each file.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

filelist = [f'file{x}.txt' for x in range(30)]

default_args = {
  'start_date': datetime(2020, 4, 15),
}

cleandata_dag = DAG('cleandata',
                    default_args=default_args,
                    schedule_interval='@daily')

# Modify the template to handle multiple files in a 
# single run.
templated_command = """
  {% for filename in params.filenames %}
  bash cleandata.sh {{ ds_nodash }} {{ filename }};
  {% endfor %}
"""

# Modify clean_task to use the templated command
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          params={'filenames': filelist},
                          dag=cleandata_dag)


In [None]:
# Example of a Airflow Template for EmailOperator()
from airflow import DAG
from airflow.operators.email import EmailOperator
from datetime import datetime

# Create the string representing the html email content (no dashes for the date and username from the params)
html_email_str = """
    Date: {{ ds }}
    Username: {{ params.username }}
"""

email_dag = DAG('template_email_test',
                default_args={'start_date': datetime(2023, 4, 15)},
                schedule_interval='@weekly')
                
email_task = EmailOperator(task_id='email_task',
                           to='testuser@datacamp.com',
                           subject="{{ macros.uuid.uuid4() }}",
                           html_content=html_email_str,
                           params={'username': 'testemailuser'},
                           dag=email_dag)

## Airflow Branching
* Provides conditional logic based on Operators' output.
* Uses the BranchPythonOperator()
 

In [None]:
from airflow.operators.python import BranchPythonOperator

# Recall:
    # **kwargs: reference to a keyword dictionary passed into the function

# Defines a function that takes a data in the YYYYMMDD and check if it is an odd or even day
def branch_test(**kwargs):
    if int(kwargs['ds_nodash']) % 2 == 0:
        return'even_day_task'
    else:return'odd_day_task'

branch_task = BranchPythonOperator(task_id='branch_task',dag=dag,
                                   provide_context=True,            # This tells Airflow to provide access to runtime variables and macros 
                                   python_callable=branch_test)

start_task >> branch_task >> even_day_task >> even_day_task2
              branch_task >> odd_day_task >> odd_day_task2

# Below, see in the picture a case for even days.
# Step 1: the start_task executes normally
# Step 2: then, the branch_task checks the ds_nodash value and determines if it's an even or odd day and returns 'even_day_task'
# Step 3: thn the even_day_task is executed, and then the even_day_task2

<img src="/workspace/sources/datacamp/img/airflow_branching.png" alt="airflow_branching" width="400px">

In [None]:
# Example of Airflow Branching
# You'd like to run a different code path if the current execution date represents a new year (ie, 2020 vs 2019)

# Create a function to determine if years are different
def year_check(**kwargs):
    current_year = int(kwargs['ds_nodash'][0:4])  # takes only the YYYY from the current year
    previous_year = int(kwargs['prev_ds_nodash'][0:4])
    if current_year == previous_year:
        return 'current_year_task'
    else:
        return 'new_year_task'

# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
                                   python_callable=year_check, provide_context=True)
# Define the dependencies
branch_task >> current_year_task
branch_task >> new_year_task

## Creating a Production Pipeline

In [None]:
# Example

# There is sales data that will be uploaded to the system. Once the data is uploaded, 
# a new file should be created to kick off the full processing, but something isn't working correctly.

# 1) Note that there are 3 tasks, given by the task_id in the respective Operators: sense_file, cleanup_tempfiles, run_processing.
    # Note that the dag_id is: dag_id='etl_update'
# 2) Let's run the sense_file in the airflow shell and look for errors
    # airflow tasks test <dag_id> <task_id> <execution_date>: 
        # use -1 instead of a date (a shorthand for testing the task with the most recent execution date for which the task has been scheduled.
        # Essentially, it tells Airflow to use the latest available run for that task in the DAG.)
    airflow tasks test etl_update sense_file -1
    # output: "WARNING - cannot record queued_duration for task sense_file because previous state change time has not been saved"

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from dags.process import process_data
from datetime import timedelta, datetime

# Update the default arguments and apply them to the DAG
default_args = {
  'start_date': datetime(2023,1,1),
  'sla': timedelta(minutes=90)
}

dag = DAG(dag_id='etl_update', default_args=default_args)

sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45, # the file sensor object to only look for its file every 45 seconds.
                    dag=dag)

bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)

python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)

sensor >> bash_task >> python_task


In [None]:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.email import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta

# Update the default arguments and apply them to the DAG.

default_args = {
  'start_date': datetime(2023,1,1),
  'sla': timedelta(minutes=90)
}
    
dag = DAG(dag_id='etl_update', default_args=default_args)

sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=dag)

bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)

python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)

email_subject="""
  Email report for {{ params.department }} on {{ ds_nodash }}
"""

email_report_task = EmailOperator(task_id='email_report_task',
                                  to='sales@mycompany.com',
                                  subject=email_subject,
                                  html_content='',
                                  params={'department': 'Data subscription services'},
                                  dag=dag)

no_email_task = EmptyOperator(task_id='no_email_task', dag=dag)

def check_weekend(**kwargs):
    dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
    # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
    if (dt.weekday() < 5):
        return 'email_report_task'
    else:
        return 'no_email_task'
    
branch_task = BranchPythonOperator(task_id='check_if_weekend',
                                   python_callable=check_weekend,
                                   provide_context=True,
                                   dag=dag)

    
sensor >> bash_task >> python_task

python_task >> branch_task >> [email_report_task, no_email_task]
