# Maintaining and monitoring Airflow workflows
Learn more about Airflow components such as sensors and executors while monitoring and troubleshooting Airflow workflows.


## Determining the executor
While developing your DAGs in Airflow, you realize you're not certain the configuration of the system. Using the commands you've learned, determine which of the following statements is true.

```bash
repl:~$ airflow list_dags
[2020-08-20 16:16:31,601] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-08-20 16:16:31,924] {dagbag.py:90} INFO - Filling up the DagBag from /home/repl/workspace/dags
```

### This system can run one task at a time.

## Executor implications
You're learning quite a bit about running Airflow DAGs and are gaining some confidence at developing new workflows. That said, your manager has mentioned that on some days, the workflows are taking a lot longer to finish and asks you to investigate. She also mentions that the salesdata_ready.csv file is taking longer to generate these days and the time of day it is completed is variable.

This exercise requires information from the previous two lessons - remember the implications of the available arguments
and modify the workflow accordingly. Note that for this exercise, you're expected to modify one line of code, not add
any extra code.

```bash
repl:~/workspace$ airflow list_dags
[2020-08-20 16:38:45,726] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-08-20 16:38:46,068] {dagbag.py:90} INFO - Filling up the DagBag from /home/repl/workspace/dags
```

In [1]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime

report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *"
)

precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='reschedule', # mode='poke' is wrong
    dag=report_dag
)

generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2020,2,20),
    dag=report_dag
)

precheck >> generate_report_task


<Task(BashOperator): generate_report>

You've successfully modified the sensor properties to give Airflow a chance to run another task while waiting for the salesdata_ready.csv file. This required recognizing the connection between an executor and the number and type of tasks in a workflow. As you've probably realized, you could also modify the executor type to something with a parallelism greater than 1 to allow the tasks to complete.

## Defining an SLA
You've successfully implemented several Airflow workflows into production, but you don't currently have any method of determining if a workflow takes too long to run. After consulting with your manager and your team, you decide to implement an SLA at the DAG level on a test workflow.

In [2]:
# Import the timedelta object
from datetime import timedelta

# Create the dictionary entry
default_args = {
  'start_date': datetime(2020, 2, 20),
  'sla': timedelta(minutes=30)
}

# Add to the DAG
test_dag = DAG('test_workflow', default_args=default_args, schedule_interval='@None')

You've successfully defined an SLA on your workflow. Remember that this type of SLA applies for the entire workflow, not just an individual task.

## Defining a task SLA
After completing the SLA on the entire workflow, you realize you really only need the SLA timing on a specific task instead of the full workflow.

In [3]:
test_dag = DAG('test_workflow', start_date=datetime(2020,2,20), schedule_interval='@None')

# Create the task with the SLA
task1 = BashOperator(task_id='first_task',
                     sla=timedelta(hours=3),
                     bash_command='initialize_data.sh',
                     dag=test_dag)

## Generate and email a report
Airflow provides the ability to automate almost any style of workflow. You would like to receive a report from Airflow
when tasks complete without requiring constant monitoring of the UI or log files. You decide to use the email
functionality within Airflow to provide this message.

In [4]:
from airflow.operators.email_operator import EmailOperator

# Define the email task
email_report = EmailOperator(
        task_id='email_report',
        to='airflow@datacamp.com',
        subject='Airflow Monthly Report',
        html_content="""Attached is your monthly workflow report - please refer to it for more detail""",
        files=['monthly_report.pdf'],
        start_date=datetime(2019, 1, 1),
        dag=report_dag
)

# Set the email task to run after the report is generated
email_report << generate_report_task

<Task(BashOperator): generate_report>

Airflow will now email you with an attached report file after the generate_report task completes. You can use Airflow's functionality to send updates via many methods in addition to email. Make sure to look through the documentation for other ideas on monitoring your workflows.

## Adding status emails
You've worked through most of the Airflow configuration for setting up your workflows, but you realize you're not getting any notifications when DAG runs complete or fail. You'd like to setup email alerting for the success and failure cases, but you want to send it to two addresses.

In [5]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime

default_args={
    'email': ['airflowalerts@datacamp.com', 'airflowadmin@datacamp.com'],
    'email_on_failure': True,
    'email_on_success': True
}

report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *",
    default_args=default_args
)

precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='reschedule',
    dag=report_dag)

generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2020,2,20),
    dag=report_dag
)

precheck >> generate_report_task

<Task(BashOperator): generate_report>

You've successfully configured the workflow to send you email alerts when the DAG completes successfully or fails. Use these options in production to monitor the state of your workflows to help avoid surprises.