## Airflow operators

In [None]:
# Operators 
# Represent a single task in a workflow.
# Run independently (usually).
# Generally do not share information.
# Various operators to perform different tasks.

# New way, Airflow 2.x+ 
EmptyOperator(task_id='example')

# Old way, Airflow <2.0
EmptyOperator(task_id='example', dag=dag_name)

In [None]:
# BashOperator
BashOperator(task_id='bash_example', 
             bash_command='echo "Example!"',
             # Next line only for Airflow before v 
             dag=dag)

BashOperator(task_id='bash_script_example', 
             bash_command='runcleanup.sh',)

In [None]:
from airflow.operators.bash import BashOperator 

example_task = BashOperator(task_id='bash_ex',
                            bash_command='echo 1',
                           )

bash_task = BashOperator(task_id='clean_addresses',  
                         bash_command='cat addresses.txt | awk "NF==10" > cleaned.txt',)

### Defining a BashOperator task
The BashOperator allows you to specify any given Shell command or script and add it to an Airflow workflow. This can be a great start to implementing Airflow in your environment.

As such, you've been running some scripts manually to clean data (using a script called cleanup.sh) prior to delivery to your colleagues in the Data Analytics group. As you get more of these tasks assigned, you've realized it's becoming difficult to keep up with running everything manually, much less dealing with errors or retries. You'd like to implement a simple script as an Airflow operator.

The Airflow DAG analytics_dag is already defined for you and has the appropriate configurations in place.

In [None]:
# Import the BashOperator
from airflow.operators.bash import BashOperator

with DAG(dag_id="test_dag", default_args={"start_date": "2024-01-01"}) as analytics_dag:
  # Define the BashOperator 
  cleanup = BashOperator(
      task_id='cleanup_task',
      # Define the bash_command
      bash_command='cleanup.sh',
  )

### Multiple BashOperators
Airflow DAGs can contain many operators, each performing their defined tasks.

You've successfully implemented one of your scripts as an Airflow task and have decided to continue migrating your individual scripts to a full Airflow DAG. You now want to add more components to the workflow. In addition to the cleanup.sh used in the previous exercise you have two more scripts, consolidate_data.sh and push_data.sh. These further process your data and copy to its final location.

The DAG analytics_dag is defined (meaning you do not need to add the with DAG(...) statement, and your cleanup task is still defined. The BashOperator is already imported.

In [None]:
# Define a second operator to run the `consolidate_data.sh` script
consolidate = BashOperator(
    task_id='consolidate_task',
    bash_command='consolidate_data.sh'
    )

# Define a final operator to execute the `push_data.sh` script
push_data = BashOperator(
    task_id='pushdata_task',
    bash_command='push_data.sh'
    )

## Airflow tasks

In [None]:
example_task = BashOperator(task_id='bash_example',       
                            bash_command='echo "Example!"')

In [None]:
Upstream means before
Downstream means after

In [None]:
# Define the tasks 
task1 = BashOperator(task_id='first_task',     
                     bash_command='echo 1'         
                    )

task2 = BashOperator(task_id='second_task', 
                     bash_command='echo 2'    
                    ) 

# Set first_task to run before second_task
task1 >> task2   # or task2 << task1

In [None]:
# Multiple dependencies 
# Chained dependencies: 
task1 >> task2 >> task3 >> task4 

# Mixed dependencies:
task1 >> task2 << task3

# or:
task1 >> task2
task3 >> task2

### Define order of BashOperators
Now that you've learned about the bitshift operators, it's time to modify your workflow to include a pull step and to include the task ordering. You have three currently defined components, cleanup, consolidate, and push_data.

The DAG analytics_dag is available as before and the BashOperator is already imported.

In [None]:
# Define a new pull_sales task
pull_sales = BashOperator(
    task_id='pullsales_task',
    bash_command='wget https://salestracking/latestinfo?json'
)

# Set pull_sales to run prior to cleanup
pull_sales >> cleanup 

# Configure consolidate to run after cleanup
cleanup >> consolidate 

# Set push_data to run last
consolidate >> push_data 

## Additional operators

In [None]:
# PythonOperator
from airflow.operators.python import PythonOperator 

def printme():
    print("This goes in the logs!")
python_task = PythonOperator(
    task_id='simple_print',
    python_callable=printme
)

In [None]:
# op_kwargs example 
def sleep(length_of_time):  
    time.sleep(length_of_time)
    sleep_task = PythonOperator( 
        task_id='sleep',  
        python_callable=sleep,  
        op_kwargs={'length_of_time': 5}
    )

In [None]:
# Email Operator example 
from airflow.operators.email import EmailOperator 

email_task = EmailOperator(   
    task_id='email_sales_report', 
    to='sales_manager@example.com',  
    subject='Automated Sales Report',
    html_content='Attached is the latest sales report',   
    files='latest_sales.xlsx'
)

### Using the PythonOperator
You've implemented several Airflow tasks using the BashOperator but realize that a couple of specific tasks would be better implemented using Python. You'll implement a task to download and save a file to the system within Airflow.

The requests library is imported for you, and the DAG process_sales_dag is already defined.

In [None]:
# Define the method
def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)    
    # Use the print method for logging
print(f"File pulled from {'URL'} and saved to {'savepath'}")

from airflow.operators.python import PythonOperator

# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'}
)

### More PythonOperators
To continue implementing your workflow, you need to add another step to parse and save the changes of the downloaded file. The DAG process_sales_dag is defined and has the pull_file task already added. In this case, the Python function is already defined for you, parse_file(inputfile, outputfile).

Note that often when implementing Airflow tasks, you won't necessarily understand the individual steps given to you. As long as you understand how to wrap the steps within Airflow's structure, you'll be able to implement a desired workflow.

In [None]:
# Add another Python task
parse_file_task = PythonOperator(
    task_id='parse_file',
    # Set the function to call
    python_callable=parse_file,
    # Add the arguments
    op_kwargs={'inputfile':'latestsales.json','outputfile':'parsedfile.json'},
)

### EmailOperator and dependencies
Now that you've successfully defined the PythonOperators for your workflow, your manager would like to receive a copy of the parsed JSON file via email when the workflow completes. The previous tasks are still defined and the DAG process_sales_dag is configured. Please note that this task uses the older DAG definition method and is added for you.

In [None]:
# Import the Operator
from airflow.operators.email import EmailOperator

# Define the task
email_manager_task = EmailOperator(
    task_id='email_manager',
    to='manager@datacamp.com',
    subject='Latest sales JSON',
    html_content='Attached is the latest sales JSON file as requested.',
    files='parsedfile.json',
    dag=process_sales_dag
)

# Set the order of tasks
pull_file_task >> parse_file_task >> email_manager_task

## Airflow scheduling

In [None]:
Airflow scheduler presets 

Preset:    cron equivalent:
@hourly.   0 * * * *
@daily.    0 0 * * *
@weekly    0 0 * * 0
@monthly   0 0 1 * *
@yearly    0 0 1 1 *

### Schedule a DAG via Python
You've learned quite a bit about creating DAGs, but now you would like to schedule a specific DAG on a specific day of the week at a certain time. You'd like the code include this information in case a colleague needs to reinstall the DAG to a different server.

The Airflow DAG object and the appropriate datetime methods have been imported for you.

In [None]:
# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2023,11,1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')