In [None]:
from airflow.models import DAG
from datetime import datetime

In [None]:
default_arguments = {
    'owner': 'jdoe',
    'email': 'jdoe@datacamp.com',
    'start_date': datetime(2020,1,20) # Represents the earliest date that a DAG could run
}

etl_dag = DAG(
    dag_id='etl_workflow',
    default_args=default_arguments
)

BashOperator

In [None]:
from airflow.operators.bash_operator import BashOperator

example_task = BashOperator(
    task_id='bash_example',
    bash_operator='echo "Example"',
    dag=ml_dag # The dag it belogns to
)

bash_task = BashOperator(
    task_id= 'clean_addresses',
    bash_operator='cat addresses.tx | awk "NF==10" > cleaned.txt',
    dag=dag
)

BashOperator(
    task_id='bash_script_example',
    bash_operator='runcleanup.sh',
    dag=ml_dag
)

Task Dependency

In [None]:
task1 = BashOperator(
    task_id='first_task',
    bash_command='echo 1',
    dag=example_dag
)

task2 = BashOperator(
    task_id='second_task',
    bash_command='echo 2',
    dag=example_dag
)

# Set first_task to run before second_task
task1 >> task2 # or task2 << task1

PythonOperator

In [None]:
from airflow.operators.python_operator import PythonOperator

def printme():
    print('this goes in the logs!')

python_task = PythonOperator(
    task_id='simple_print',
    python_callable=printme,
    dag=example_dag
)

def sleep(length_of_time):
    time.sleep(length_of_time)

sleep_task = PythonOperator(
    task_id='sleep',
    python_callable=sleep,
    op_kwargs={'length_of_time':5}
    dag=example_dag
)

EmailOperator

In [None]:
from airflow.operators.email_operator import EmailOperator

email_task = EmailOperator(
    task_id='email_sales_report',
    to='sales_manager@example.com',
    subject='Automated Sales Rport',
    html_content='Attached is the latest sales report',
    files='latest_sales.xlsx',
    dag=example_dag
)


Sensors

FileSensor

In [None]:
#from airflow.sensors.base_sensor_operator import 

from airflow.contrib.sensors.file_sensor import FileSensor

file_sensor_task = FileSensor(
    taskid='file_sense',
    filepath='salesdata.csv',
    poke_interval=300, # 300 second = 5 mins
    dag=sales_report_dag
)

init_sales_cleanup >> file_sensor_task >> generate_report

SLA

In [None]:
BashOperator(
    task_id='sla_task',
    bash_command='runcode.sh',
    sla=timedelta(seconds=30),
    dag=dag
)

In [None]:
# Import the timedelta object
from datetime import timedelta

# Create the dictionary entry
default_args = {
  'start_date': datetime(2020, 2, 20),
  'sla': timedelta(minutes=30)
}

# Add to the DAG
test_dag = DAG('test_workflow', default_args=default_args, schedule_interval='@None')

In [None]:
default_args ={
    'email':['airflowalers@datacamp.com'],
    'email_on_failure':True,
    'email_on_retry':False,
    'email_on_sucess':True
}

Example of DAG

In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime

default_args={
    'email': ['airflowalerts@datacamp.com','airflowadmin@datacamp.com'],
    'email_on_failure': True,
    'email_on_success':True
}
report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *",
    default_args=default_args
)

precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='reschedule',
    dag=report_dag)

generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2020,2,20),
    dag=report_dag
)

precheck >> generate_report_task


Templates

In [None]:
templated_command="""
    echo "Reading {{ params.filename }}"
"""

t1 = BashOperator(
    task_id='template_task',
    bash_command=templated_command,
    params={'filename': 'file1.txt'},
    dag=example_dag
)

t2 = BashOperator(
    task_id='template_task2',
    bash_command=templated_command,
    params={'filename': 'file2.txt'},
    dag=example_dag
)

t1 >> t2

Jinja PL for more advanced templates

In [None]:
templated_command = """
{% for filename in params.filenames %}
    echo "Reading {{ filename }}"
{% endfor %}
"""

t1 = BashOperator(
    task_id='template_task',
    bash_command=templated_command,
    params={'filenames': ['file1.txt', 'fil2.txt']},
    dag=example_dag
)

Big Example

In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

filelist = [f'file{x}.txt' for x in range(30)]

default_args = {
  'start_date': datetime(2020, 4, 15),
}

cleandata_dag = DAG('cleandata',
                    default_args=default_args,
                    schedule_interval='@daily')

# Modify the template to handle multiple files in a 
# single run.
templated_command = """
  <% for filename in params.filenames %>
  bash cleandata.sh {{ ds_nodash }} {{ filename }};
  <% endfor %>
"""

# Modify clean_task to use the templated command
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          params={'filenames': filelist},
                          dag=cleandata_dag)


In [None]:
from airflow.models import DAG
from airflow.operators.email_operator import EmailOperator
from datetime import datetime

# Create the string representing the html email content
html_email_str = """
Date: {{ ds }}
Username: {{ params.username }}
"""

email_dag = DAG('template_email_test',
                default_args={'start_date': datetime(2020, 4, 15)},
                schedule_interval='@weekly')
                
email_task = EmailOperator(task_id='email_task',
                           to='testuser@datacamp.com',
                           subject="{{ macros.uuid.uuid4() }}",
                           html_content=html_email_str,
                           params={'username': 'testemailuser'},
                           dag=email_dag)


Branching

In [None]:
def branch_test(**kwargs):
    if int(kwargs['ds_nodash']) % 2 == 0:
        return 'even_day_task'
    else:
        return 'odd_day_task'

branch_task = BranchPythonOperator(
    task_id='branch_task',
    provide_context=True,
    python_callable=branch_test,
    dag=dag
)

start_task >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2
