## Introduction

### DAG example

In [None]:
import datetime
from airflow import DAG

reporting_dag = DAG(
    dag_id="publish_EMEA_sales_report",
    # Insert the cron expression
    schedule_interval="0 7 * * 1",
    start_date=datetime.datetime(2019, 11, 24),
    default_args={
        "owner": "sales"
    }
)

In [None]:
import datetime
from airflow.models import DAG

default_arguments = {
    'owner': 'jdoe',
    'email': 'jdoe@datacamp.com',
    'start_date': datetime.datetime(2020, 1, 20)
}

etl_dag = DAG(
    dag_id='etl_pipeline',
    schedule_interval="* * * * *",
    default_args=default_arguments
)

In [None]:
from airflow.models import DAG
from airflow.operators.bash import BashOperator

dag = DAG(dag_id='example_dag',
    default_args={"start_date": "2021-07-31"}
)
task01 = BashOperator(task_id='generate_random_number',
    bash_command='echo $RANDOM',
    dag=dag
)

In [None]:
%%bash

# descriptions
airflow -h

# Run specific task
airflow run <dag_id> <task_id> <start_date>

# test specific task
airflow test <dag_id> <task_id> -1

# Run full DAG
airflow trigger_dag -e <date> <dag_id>

# start webserver on port
airflow webserver -p <port>

## Airflow DAGs

### Airflow operators

In [None]:
# BashOperator
from airflow.operators.bash_operator import BashOperator

example_task = BashOperator(
    task_id='bash_example',
    bash_command='cat addresses.txt | awk "NF==10" > cleaned.txt',
    dag=dag
)

bash_task = BashOperator(
    task_id='bash_script_example',
    bash_command='cleanup.sh',
    dag=dag
)

In [None]:
# PythonOperator
from airflow.operators.python_operator import PythonOperator

def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")
    
# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable.
    python_callable=pull_file,
    # Define the arguments.
    op_kwargs={'URL':'http://dataserver/sales.json', 
        'savepath':'latestsales.json'},
    dag=process_sales_dag
)

In [None]:
# PythonOperator
from airflow.operators.python_operator import PythonOperator
from my_library import my_magic_function

python_task = PythonOperator(
    dag=dag,
    task_id='perform_magic',
    python_callable=my_magic_function,
    op_kwargs={
        "snowflake": '*",
        "amount": 42
    }
)

In [None]:
# EmailOperator
from airflow.operators.email_operator import EmailOperator

email_task = EmailOperator(
    task_id='email_sales_report',
    to='sales_manager@example.com',
    subject='Automated Sales Report',
    html_content='Attached is the latest sales report',
    files='latest_sales.xlsx',
    dag=dag
)

In [None]:
# SSHOperator
from airflow.contrib.operators.ssh_operator import SSHOperator

spark_master = (
    "spark://"
    "spark_standalone_cluster_ip"
    ":7077"
)
command = (
    "spark-submit "
    "--master {master} "
    "--py-files package1.zip "
    "/path/to/app.py"
).format(master=spark_master)

task = SSHOperator(
    task_id='ssh_spark_submit',
    dag=dag,
    command=command,
    ssh_conn_id='spark_master_ssh'
)

In [None]:
# SparkSubmitOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

spark_task = SparkSubmitOperator(
    task_id='spark_submit_id',
    dag=dag,
    applicaton="/path/to/app.py",
    py_files="package1.zip",
    conn_id='spark_default
)

In [None]:
import datetime
from airflow import DAG

# Create a DAG object
dag = DAG(
    dag_id="optimize_diaper_purchases",
    # Run the DAG daily
    schedule_interval='@daily',
    # Specify when tasks should have started earliest
    start_date=datetime.datetime(2019, 6, 25),
    default_args={
        # Don't email on failure
        'email_on_failure': False
    }
)

config = os.path.join(os.environ["AIRFLOW_HOME"], 
    "scripts", "configs", "data_lake.conf")

ingest = BashOperator(
  # Assign a descriptive id
  task_id="ingest_data", 
  # Complete the ingestion pipeline
  bash_command="tap-marketing-api | target-csv --config %s" % config,
  dag=dag
)

In [None]:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'sla': timedelta(hours=2)
}

echo_file = """
    {% for filename in params.filenames %}
        echo "Reading {{filename}}"
    {% endfor %}
"""

with DAG(
    'Tutorial03',
    default_args=default_args,
    start_date=datetime(2021, 8, 1),
    schedule_interval='0 13 * * *'
) as dag:
# Define the tasks    
    task01 = BashOperator(
        task_id='template_task',
        bash_command=echo_file,
        params={'filenames': ['file01.txt', 'file02.txt']}
    )
    
    task01

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    "owner": "squad-a",
    "depends_on_past": False,
    "start_date": datetime(2019, 7, 5),
    "email": ["foo@bar.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "cleaning",
    default_args=default_args,
    user_defined_macros={
        "env": Variable.get("environment")
    },
    schedule_interval="0 5 */2 * *"
)

def say(what):
    print(what)

with dag:
    say_hello = BashOperator(
        task_id="say-hello", 
        bash_command="echo Hello,"
    )
    say_world = BashOperator(
        task_id="say-world", 
        bash_command="echo World"
    )
    shout = PythonOperator(
        task_id="shout",
        python_callable=say,
        op_kwargs={'what': '!'}
    )

    say_hello >> say_world >> shout

In [None]:
from datetime import datetime, timedelta
from time import sleep
from airflow.models import DAG
from airflow.operators.python import PythonOperator

default_args = {
    'sla': timedelta(hours=2)
}

def print_func():
    print("This goes in the logs!")
    
def sleep_func(length_of_time):
    sleep(length_of_time)

with DAG(
    'Tutorial02',
    default_args=default_args,
    start_date=datetime(2021, 8, 1),
    schedule_interval='0 0 * * *'
) as dag:
# Define the tasks    
    task01 = PythonOperator(
        task_id='print',
        python_callable=print_func
    )
    task02 = PythonOperator(
        task_id='sleep',
        python_callable=sleep_func,
        op_kwargs={'length_of_time': 5}
    )
    
    task01 >> task02

In [None]:
# Import the operator
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

# Set the path for our files.
entry_point = os.path.join(
    os.environ["AIRFLOW_HOME"], "scripts", "clean_ratings.py")
dependency_path = os.path.join(
    os.environ["AIRFLOW_HOME"], "dependencies", "pydiaper.zip")

with DAG(
    dag_id='data_pipeline', 
    start_date=datetime(2019, 6, 25),
    schedule_interval='@daily'
    ) as dag:
    # Define task clean, running a cleaning job.
    clean_data = SparkSubmitOperator(
        application=entry_point, 
        py_files=dependency_path,
        task_id='clean_data',
        conn_id='spark_default'
    )

In [None]:
spark_args = {"py_files": dependency_path,
              "conn_id": "spark_default"}
# Define ingest, clean and transform job.
with dag:
    ingest = BashOperator(
        task_id='Ingest_data', 
        bash_command='tap-marketing-api | target-csv --config %s' % config
    )
    clean = SparkSubmitOperator(
        application=clean_path, 
        task_id='clean_data', 
        **spark_args
    )
    insight = SparkSubmitOperator(
        application=transform_path, 
        task_id='show_report', 
        **spark_args
    )
    
    # set triggering sequence
    ingest >> clean >> insight

In [None]:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.providers.google.cloud.operators.bigquery import \
    BigQueryCreateEmptyDatasetOperator
from airflow.utils.dates import days_ago

default_args = {
    'sla': timedelta(hours=2)
}

with DAG(
    'BigQuery_test',
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval='0 0 * * *',
    tags=['Sushi_King']
) as dag:
    
    task_create_dataset = BigQueryCreateEmptyDatasetOperator(
        task_id='create-dataset',
        gcp_conn_id='google_cloud_default',
        dataset_id='test_dataset',
        location='asia-southeast1'
    )
    
    task_create_dataset

In [None]:
import datetime
import pytz
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.bash import BashOperator

tz=pytz.timezone("Asia/Kuala_Lumpur")

yesterday = datetime.datetime.combine(
    datetime.datetime.now(tz=tz) - datetime.timedelta(1),
    datetime.datetime.min.time())

with DAG(
    dag_id="tutorial",
    start_date=yesterday,
    schedule_interval="@daily"
    ) as dag:

    t1 = BashOperator(
        task_id="echo",
        bash_command="echo $PWD"
    )
    t2 = EmailOperator(
        task_id='send_email',
        conn_id='sendgrid_default',
        to='darklemon2000@gmail.com',
        subject="EmailOperator test for SendGrid",
        html_content="This is a test message sent through SendGrid."
    )
    t1 >> t2

In [None]:
import os
import datetime
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

with DAG(
    dag_id="Mydin_scrapy",
    default_args={
        "email": ["darklemonlee@yahoo.co.uk"],
        "email_on_failure": True
    },
    start_date=yesterday,
    schedule_interval="@daily",
    tags=["Test"]
) as dag:
    
    grocery = BashOperator(
        task_id="grocery",
        bash_command=dedent("""
            cd ${AIRFLOW_HOME}/dags/Scrapy/Mydin
            scrapy crawl grocery -O {{ params.filename }}_{{ ds_nodash }}.jl
        """),
        params={
            "filename": "./data/grocery"
        }
    )

### Airflow tasks

In [None]:
# Define the tasks
task1 = BashOperator(
    task_id='first_task',
    bash_command='echo 1',
    dag=dag
)
task2 = BashOperator(
    task_id='second_task',
    bash_command='echo 2',
    dag=dag
)
# Set first_task to run before second_task
task1 >> task2 # task1.set_downstream(task2)
# or
task2 << task1 # task2.set_upstream(task1)

In [None]:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'sla': timedelta(hours=2)
}
with DAG(
    'Tutorial01',
    default_args=default_args,
    start_date=datetime(2021, 8, 1),
    schedule_interval='0 12 * * *'
) as dag:
# Define the tasks    
    task01 = BashOperator(
        task_id='1stecho',
        bash_command='echo $PWD'
    )
    task02 = BashOperator(
        task_id='run-script',
        bash_command='Tutorial01.sh'
    )
    task03 = BashOperator(
        task_id='2ndecho',
        bash_command='ls && echo'
    )
    # Set task01 to run before task02
    task01 >> task02
    # Set task03 to run before task02
    task02 << task03

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'sla': timedelta(hours=2)
}

with DAG(
    'parallel_dag',
    default_args=default_args,
    start_date=datetime(2021, 8, 1),
    schedule_interval='@daily'
) as dag:
    
    task_1 = BashOperator(
        task_id='task_1',
        bash_command='sleep 3'
    )
    
    task_2 = BashOperator(
        task_id='task_2',
        bash_command='sleep 3'
    )
    
    task_3 = BashOperator(
        task_id='task_3',
        bash_command='sleep 3'
    )
    
    task_4 = BashOperator(
        task_id='task_4',
        bash_command='sleep 3'
    )
    
    task_1 >> [task_2, task_3] >> task_4

In [None]:
from datetime import datetime, timedelta
from time import sleep
from airflow.models import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'sla': timedelta(hours=2)
}

def branch_test(**kwargs):
    if int(kwargs['ds_nodash']) % 2 == 0:
        return 'even_day_task'
    else:
        return 'odd_day_task'

with DAG(
    'Tutorial04',
    default_args=default_args,
    start_date=datetime(2021, 7, 30),
    schedule_interval='@daily'
) as dag:
    
    # Define the tasks
    start_task = BashOperator(
        task_id='start',
        bash_command='echo START'
    )
    branch_task = BranchPythonOperator(
        task_id='branch',
        provide_context=True,
        python_callable=branch_test
    )
    even_day_task = BashOperator(
        task_id='even_day_task',
        bash_command='echo EVEN day'
    )
    odd_day_task = BashOperator(
        task_id='odd_day_task',
        bash_command='echo ODD day'
    )
    
    start_task >> branch_task >> even_day_task
    branch_task >> odd_day_task

In [None]:
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import sys

dag = DAG(
   dag_id = 'update_state',
   default_args={"start_date": "2019-10-01"}
)
task1 = BashOperator(
   task_id='generate_random_number',
   bash_command='echo $RANDOM',
   dag=dag
)

def python_version():
    return sys.version

task2 = PythonOperator(
   task_id='get_python_version',
   python_callable=python_version,
   dag=dag
)   
task3 = SimpleHttpOperator(
   task_id='query_server_for_external_ip',
   endpoint='https://api.ipify.org',
   method='GET',
   dag=dag
)   
task3 >> [task2, task1]

### Airflow scheduling

Schedule the task at `start_date` + `schedule_interval`  
'start_date': datetime(2020, 2, 25)  
'schedule_interval': @daily

- `start_date`: The date / time to initially schedule the DAG run
- `end_date`: Optional attribute for when to stop running new DAG instances
- `max_tries`: Optional attribute for how many attempts to make
- `schedule_interval`: How often to schedule via `cron` style syntax or via built-in presets.

**Cron syntax**  
`* * * * *`: minute(0-59), hour(0-23), dayofmonth(1-31), month(1-12), dayofweek(0-6)  
`0,15,30,45 * * * *`: Run every 15 minutes  
`*/15 9-17 * * 1-3,5` Run every 15 minutes on 9am-5pm every Mon, Tue, Wed & Fri

**Scheduler presets**  
- `@once`
- `@hourly`
- `@daily`
- `@weekly`
- `@monthly`
- `@yearly`

In [None]:
import datetime
from airflow.models import DAG

# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2019, 11, 1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}
dag = DAG('update_dataflows', 
    default_args=default_args, 
    schedule_interval='30 12 * * 3'
)

In [None]:
import requests
import json
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

default_args = {
    'owner':'sales_eng',
    'start_date': datetime(2020, 2, 15),
}
process_sales_dag = DAG(
    dag_id='process_sales', 
    default_args=default_args, 
    schedule_interval='@monthly'
)

def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'w') as f:
        f.write(r.content)
    print(f"File pulled from {URL} and saved to {savepath}")

pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
    dag=process_sales_dag
)

def parse_file(inputfile, outputfile):
    with open(inputfile) as infile:
      data=json.load(infile)
      with open(outputfile, 'w') as outfile:
        json.dump(data, outfile)
        
parse_file_task = PythonOperator(
    task_id='parse_file',
    # Set the function to call
    python_callable=parse_file,
    # Add the arguments
    op_kwargs={'inputfile':'latestsales.json', 'outputfile':'parsedfile.json'},
    # Add the DAG
    dag=process_sales_dag
)
email_manager_task = EmailOperator(
    task_id='email_manager',
    to='manager@datacamp.com',
    subject='Latest sales JSON',
    html_content='Attached is the latest sales JSON file as requested.',
    files='parsedfile.json',
    dag=process_sales_dag
)

pull_file_task >> parse_file_task >> email_manager_task

## Airflow workflows

### Airflow sensors

`airflow.sensors.base_sensor_operator` waits for a certain condition to be true

- `mode='poke'`: run repeatedly  
- `mode='reschedule'`: give up task and try again later
- `poke_interval`: wait time between checks
- `timeout`: wait time before failing task

In [None]:
from airflow.contrib.sensors.file_sensor import FileSensor

file_sensor_task = FileSensor(
    task_id='file_sense',
    filepath='salesdata.csv',
    poke_interval=300,
    dag=sales_report_dag
)

init_sales_cleanup >> file_sensor_task >> generate_report

### Airflow executors

- `SequentialExecutor` runs one task at a time  
- `LocalExecutor` runs simultaneous tasks  
- `CeleryExecutor` runs extensive workflows  

`!cat airflow/airflow.cfg | grep "executor = "`

In [None]:
%%bash

# Debugging
airflow list_dags

# fix scheduler
airflow scheduler

### SLAs & reporting in Airflow

Service Level Agreements (SLAs) = amount of time a DAG/task should require to run

In [None]:
# Import the timedelta object
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

# Create the dictionary entry
defaut_args = {
    'start_date': datetime(2020, 2, 20),
    'sla': timedelta(minutes=20)
}
# Add to the DAG
dag = DAG('sla_dag', 
    default_args=default_args,
    schedule_interval='@None'
)
# Create the task with the SLA
task1 = BashOperator(
    task_id='sla_task',
    bash_command='runcode.sh',
    sla=timedelta(seconds=30),
    dag=dag
)

In [None]:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.email_operator import EmailOperator

default_args = {
    'start_date': datetime(2020, 2, 15)
}
report_dag = DAG(
    dag_id='execute_report', 
    default_args=default_args, 
    schedule_interval='@monthly'
)
# Define the email task
email_report = EmailOperator(
    task_id='email_report',
    to='airflow@gmail.com',
    subject='Airflow Monthly Report',
    html_content="""
    Attached is your mounthly worflow report - please refer to it for more detail
    """,
    files=['monthly_report.pdf'],
    dag=report_dag
)
# Set the email task to run after the report is generated
email_report << generate_report

In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime

default_args={
    'email': ['airflowalerts@datacamp.com', 'airflowadmin@datacamp.com'],
    'email_on_failure': True,
    'email_on_success': True,
    'email_on_retry': False,
}
report_dag = DAG(
    dag_id='execute_report',
    schedule_interval="0 0 * * *",
    default_args=default_args
)
precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='reschedule',
    dag=report_dag
)
generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2020,2,20),
    dag=report_dag
)

precheck >> generate_report_task

## Production pipelines

### Templates

- substitute information during DAG run  
- `Jinja` templating language

### Runtime variables

`! help(<Airflow object>)` & look for `template_fields`

1. Execution date YYYY-MM-DD: {{ ds }}
2. Execution date, no dashes YYYYMMDD: {{ ds_nodash }}
3. Previous execution date YYYY-MM-DD: {{ prev_ds }}
4. Previous execution date, no dashes YYYYMMDD: {{ prev_ds_nodash }}
5. DAG object: {{ dag }}
6. Airflow config object: {{ conf }}

### Macros

- `{{ macros.datetime }}`: datetime.datetime object
- `{{ macros.timedelta }}`: timedelta object
- `{{ macros.uuid }}`: uuid object
- `{{ macros.ds_add('2020-04-15', 5) }}`: 2020-04-20

### Branching

- conditional logic
- `airflow.operators.python_operator.BranchPythonOperator`
- `python_callable` returns next task id to follow

In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
  'start_date': datetime(2020, 4, 15),
}
cleandata_dag = DAG('cleandata',
    default_args=default_args,
    schedule_interval='@daily'
)

# Create a templated command to execute
# 'bash cleandata.sh datestring filename'
templated_command = """
bash cleandata.sh {{ ds_nodash }} {{ params.filename }}
"""

# Modify clean_task to use the templated command
clean_task = BashOperator(
    task_id='cleandata_task',
    bash_command=templated_command,
    params={'filename': 'salesdata.txt'},
    dag=cleandata_dag
)
# Create a new BashOperator clean_task2
clean_task2 = BashOperator(
    task_id='cleandata_task2',
    bash_command=templated_command,
    params={'filename': 'supportdata.txt'},
    dag=cleandata_dag
)
# Set the operator dependencies
clean_task >> clean_task2

In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

filelist = [f'file{x}.txt' for x in range(30)]

default_args = {
  'start_date': datetime(2020, 4, 15),
}

cleandata_dag = DAG('cleandata',
    default_args=default_args,
    schedule_interval='@daily'
)

# Modify the template to handle multiple files in a 
# single run.
templated_command = """
  <% for filename in params.filenames %>
  bash cleandata.sh {{ ds_nodash }} {{ filename }};
  <% endfor %>
"""

# Modify clean_task to use the templated command
clean_task = BashOperator(
    task_id='cleandata_task',
    bash_command=templated_command,
    params={'filenames': filelist},
    dag=cleandata_dag
)

In [None]:
from airflow.models import DAG
from airflow.operators.email_operator import EmailOperator
from datetime import datetime

# Create the string representing the html email content
html_email_str = """
Date: {{ ds }}
Username: {{ params.username }}
"""

email_dag = DAG('template_email_test',
    default_args={'start_date': datetime(2020, 4, 15)},
    schedule_interval='@weekly'
)
                
email_task = EmailOperator(
    task_id='email_task',
    to='testuser@datacamp.com',
    subject="{{ macros.uuid.uuid4() }}",
    html_content=html_email_str,
    params={'username': 'testemailuser'},
    dag=email_dag
)

In [None]:
from airflow.models import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

def branch_test(**kwargs):
    if int(kwargs['ds_nodash']) % 2 == 0:
        return 'even_day_task'
    else:
        return 'odd_day_task'

dag = DAG('BranchingTest', 
    default_args={'start_date': datetime(2020, 4, 15)}, 
    schedule_interval='@daily'
)

start_task = DummyOperator(
    task_id='start_task', 
    dag=dag
)

branch_task = BranchPythonOperator(
    task_id='branch_task',
    provide_context=True,
    python_callable=branch_test,
    dag=dag
)

even_day_task = DummyOperator(
    task_id='even_day_task', 
    dag=dag
)

even_day_task2 = DummyOperator(
    task_id='even_day_task2', 
    dag=dag
)

odd_day_task = DummyOperator(
    task_id='odd_day_task', 
    dag=dag
)

odd_day_task2 = DummyOperator(
    task_id='odd_day_task2', 
    dag=dag
)

start_task >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2

In [None]:
from airflow.operators.python_operator import BranchPythonOperator

# Create a function to determine if years are different
def year_check(**kwargs):
    current_year = int(kwargs['ds_nodash'][0:4])
    previous_year = int(kwargs['prev_ds_nodash'][0:4])
    if current_year == previous_year:
        return 'current_year_task'
    else:
        return 'new_year_task'

# Define the BranchPythonOperator
branch_task = BranchPythonOperator(
    task_id='branch_task', 
    dag=branch_dag,
    python_callable=year_check, 
    provide_context=True
)
# Define the dependencies
branch_dag >> current_year_task
branch_dag >> new_year_task

In [None]:
%%writefile dags/process.py
from datetime import date

def process_data(**kwargs):
    file = open("/home/repl/workspace/processed_data-" + kwargs['ds'] + ".tmp", "w")
    file.write(f"Data processed on {date.today()}")
    file.close()

In [None]:
%%writefile dags/pipeline.py
# Import the needed operators
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import process_data
from datetime import date, datetime

# Update the default arguments and apply them to the DAG
default_args = {
  'start_date': datetime(2019, 1, 1),
  'sla': timedelta(minutes=90)
}
    
dag = DAG(
    dag_id='etl_update', 
    default_args=default_args
)

sensor = FileSensor(
    task_id='sense_file', 
    filepath='/home/repl/workspace/startprocess.txt',
    poke_interval=5,
    timeout=15,
    dag=dag
)

bash_task = BashOperator(
    task_id='cleanup_tempfiles', 
    bash_command='rm -f /home/repl/*.tmp',
    dag=dag
)

python_task = PythonOperator(
    task_id='run_processing', 
    python_callable=process_data,
    provide_context=True,
    dag=dag
)

email_subject = """
  Email report for {{ params.department }} on {{ ds_nodash }}
"""

email_report_task = EmailOperator(
    task_id='email_report_task',
    to='sales@mycompany.com',
    subject=email_subject,
    html_content='',
    params={'department': 'Data subscription services'},
    dag=dag
)

no_email_task = DummyOperator(
    task_id='no_email_task',
    dag=dag
)

def check_weekend(**kwargs):
    dt = datetime.strptime(kwargs['execution_date'], "%Y-%m-%d")
    # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat /Sun.
    if (dt.weekday() < 5):
        return 'email_report_task'
    else:
        return 'no_email_task'
    
branch_task = BranchPythonOperator(
    task_id='check_if_weekend',
    python_callable=check_weekend,
    provide_context=True,
    dag=dag
)

sensor >> bash_task >> python_task

python_task >> branch_task >> [email_report_task, no_email_task]

## Deployment test

In [None]:
from airflow.models import DagBag

def test_dagbag_import():
    """
    Verify that Airflow will be able to import all DAGs in the repository.
    """
    dagbag = DagBag()
    number_of_failures = len(dagbag.import_errors)
    assert number_of_failures == 0, \
        "There should be no DAG failures. Got: %s" dagbag.import_errors