# 1.创建DAG

In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
  'owner': 'dsmith',
  'start_date': datetime(2020, 2, 12),
  'retries': 1
}

codependency_dag = DAG('codependency', default_args=default_args)

# 2.1 创建bash operator， 并用bitshift 规划downstream，upstream

In [None]:


task1 = BashOperator(task_id='first_task',
                     bash_command='echo 1',
                     dag=codependency_dag)

task2 = BashOperator(task_id='second_task',
                     bash_command='echo 2',
                     dag=codependency_dag)

task3 = BashOperator(task_id='third_task',
                     bash_command='echo 3',
                     dag=codependency_dag)

# task1 must run before task2 which must run before task3
task1 >> task2
task2 >> task3

# 2.2 创建 PythonOperator

In [None]:
def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

from airflow.operators.python_operator import PythonOperator

# Create the task
pull_file_task = PythonOperator(
    task_id ='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs ={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
    dag=process_sales_dag
)

# 2.3 创建EmailOperator， 自动给用户发邮件监控数据工作情况

In [None]:
# Import the Operator
from airflow.operators.email_operator import EmailOperator 

# Define the task
email_manager_task = EmailOperator(
    task_id='email_manager',
    to='manager@datacamp.com',
    subject ='Latest sales JSON',
    html_content='Attached is the latest sales JSON file as requested.',
    files  ='parsedfile.json',
    dag=process_sales_dag
)

# Set the order of tasks
pull_file_task >> parse_file_task >> email_manager_task

# 2.4 scheduling

### schedule_interval = '@day 或 cron syntax'

In [None]:
# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2019, 11, 1),              # 在创建DAG的时候，创建DAG开始运行日期
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')  # schedule_interval 创建DAG更新间隔

# 3.1 sensors 
## 感应一些条件是否达成，从而推进task进入到下一个task
#### 结构和operator类似， 有3个参数
#### 1.mode = 'poke' (反复监控一个条件，只有达成才进下一步） 或 'reschedule'（会放弃达成不了的条件，进行下一步，try later）
#### 2.poke_interval 等多久监控一次
#### 3.timeout 超过多久就算失败

## 除了FileSensor, 还有 ExternalTaskSensor， SqlSensor, HttpSensor 等等

In [None]:
from airflow.contrib.sensors.file_sensor import FileSensor
    
precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='reschedule',
    dag=report_dag

# 3.2 Executor
### 1. SequentialExecutor  一次只跑一个task，用于debug
### 2. CeleryExecutor
### 3. LocalExecutor

通过bash command: airflow list_dags 查看

# 3.3 常见的错误，以及debugging 和 troubleshooting 

# 3.4 SLA( service level agreement 检测SLA丢失情况) 和 reporting (通过email） 



In [None]:
# Import the timedelta object
from datetime import timedelta

# Create the dictionary entry
default_args = {
  'start_date': datetime(2020, 2, 20),
  'sla': timedelta(minutes = 30)
}

# Add to the DAG
test_dag = DAG('test_workflow', default_args=default_args, schedule_interval='@None')

##### Remember that this type of SLA applies for the entire workflow, not just an individual task.
####  you can add specific SLAs to individual tasks as needed.

In [None]:
# Import the timedelta object
from datetime import timedelta

test_dag = DAG('test_workflow', start_date=datetime(2020,2,20), schedule_interval='@None')

# Create the task with the SLA
task1 = BashOperator(task_id='first_task',
                     sla =timedelta(hours = 3),
                     bash_command='initialize_data.sh',
                     dag=test_dag)

## reporting
#### You would like to receive a report from Airflow when tasks complete without requiring constant monitoring of the UI or log files. You decide to use the email functionality within Airflow to provide this message.

In [None]:
# Define the email task
email_report = EmailOperator(
        task_id='email_report',
        to='airflow@datacamp.com',
        subject='Airflow Monthly Report',
        html_content="""Attached is your monthly workflow report - please refer to it for more detail""",
        files=['monthly_report.pdf'],
        dag=report_dag
)

# Set the email task to run after the report is generated
email_report << generate_report

#### 也可以在dag中default args中设置，报告airflow run成功或失败
#### Use these options in production to monitor the state of your workflows to help avoid surprises.

In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime

default_args={
    'email': ['airflowalerts@datacamp.com','airflowadmin@datacamp.com'],
    'email_on_failure': True,
    'email_on_success': True 
}
report_dag = DAG(
    dag_id = 'execute_report',
    schedule_interval = "0 0 * * *",
    default_args=default_args
)

precheck = FileSensor(
    task_id='check_for_datafile',
    filepath='salesdata_ready.csv',
    start_date=datetime(2020,2,20),
    mode='reschedule',
    dag=report_dag)

generate_report_task = BashOperator(
    task_id='generate_report',
    bash_command='generate_report.sh',
    start_date=datetime(2020,2,20),
    dag=report_dag
)

precheck >> generate_report_task


# 4.1 templates
## 偷懒用的，减少bash_command 的重复性输入，做一个模板，嵌套要插入的变量，一劳永逸

#### use a templated command instead of hardcoding your workflow objects. This will come in very handy when creating production workflows. 

In [None]:
 templated_command="""
  echo "Reading {{ params.filename }}"
"""
t1 = BashOperator(task_id='template_task',
       bash_command=templated_command,
       params={'filename': 'file1.txt'}
       dag=example_dag)

In [None]:

default_args = {
  'start_date': datetime(2020, 4, 15),
}

cleandata_dag = DAG('cleandata',
                    default_args=default_args,
                    schedule_interval='@daily')

# Modify the templated command to handle a
# second argument called filename. 两个参数，一个变量params
templated_command = """
  bash cleandata.sh {{ ds_nodash }} {{params.filename}} 
"""

# Modify clean_task to pass the new argument
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          params={'filename': 'salesdata.txt'},  #每个task不同的unique 变量
                          dag=cleandata_dag)

# Create a new BashOperator clean_task2
clean_task2 = BashOperator(task_id='cleandata_task2',
                           bash_command = templated_command ,
                           params = {'filename' : 'supportdata.txt'},
                           dag = cleandata_dag  )
                           
# Set the operator dependencies
clean_task2 << clean_task

# 4.2 Using lists with templates

##### This time, you realize that you need to run the command cleandata.sh with the date argument and the file argument as before, except now you have a list of 30 files. You do not want to create 30 tasks, so your job is to modify the code to support running the argument for 30 or more files.

#### When using a single task, all entries would succeed or fail as a single task. Separate operators allow for better monitoring and scheduling of these tasks.

In [None]:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

filelist = [f'file{x}.txt' for x in range(30)]

default_args = {
  'start_date': datetime(2020, 4, 15),
}

cleandata_dag = DAG('cleandata',
                    default_args=default_args,
                    schedule_interval='@daily')

# Modify the template to handle multiple files in a 
# single run.
templated_command = """
  <% for filename in params.filenames %>
  bash cleandata.sh {{ ds_nodash }} {{ filename }};
  <% endfor %>
"""

# Modify clean_task to use the templated command
clean_task = BashOperator(task_id='cleandata_task',
                          bash_command=templated_command,
                          params={'filenames': filelist},
                          dag=cleandata_dag)
                          


# 4.3 branching 分流做分支判断，进入到不同的流程
## BranchPythonOperator
#### This is a simple but effective use of branching to perform an occasional set of tasks without requiring significant code changes. Make sure to remember the various capabilities with branching to make your workflows more robust.

In [None]:
# Create a function to determine if years are different
def year_check(**kwargs):
    current_year = int(kwargs['ds_nodash'][0:4])
    previous_year = int(kwargs['prev_ds_nodash'][0:4])
    if current_year == previous_year:
        return 'current_year_task'
    else:
        return 'new_year_task'

# Define the BranchPythonOperator
branch_task = BranchPythonOperator(task_id='branch_task', dag=branch_dag,
                                   python_callable=year_check, provide_context=True)
# Define the dependencies
branch_dag >> current_year_task
branch_dag >> new_year_task

# 4.4 ☆一个完整的airflow Pipline建造代码☆

Operators, tasks, sensors, conditional logic, templating, SLAs, dependencies, and even alerting!

In [None]:
from datetime import date

def process_data(**kwargs):
    file = open("/home/repl/workspace/processed_data-" + kwargs['ds'] + ".tmp", "w")
    file.write(f"Data processed on {date.today()}")
    file.close()
  

In [None]:
from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta

# Update the default arguments and apply them to the DAG.

default_args = {
  'start_date': datetime(2019,1,1),
  'sla': timedelta(minutes=90)
}
    
dag = DAG(dag_id='etl_update', default_args=default_args)

sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=dag)

bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=dag)

python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=dag)


email_subject="""
  Email report for {{ params.department }} on {{ ds_nodash }}
"""


email_report_task = EmailOperator(task_id='email_report_task',
                                  to='sales@mycompany.com',
                                  subject=email_subject,
                                  html_content='',
                                  params={'department': 'Data subscription services'},
                                  dag=dag)


no_email_task = DummyOperator(task_id='no_email_task', dag=dag)


def check_weekend(**kwargs):
    dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
    # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.
    if (dt.weekday() < 5):
        return 'email_report_task'
    else:
        return 'no_email_task'
    
    
branch_task = BranchPythonOperator(task_id='check_if_weekend',
                                   python_callable = check_weekend,
                                   provide_context = True,
                                   dag=dag)

    
sensor >> bash_task >> python_task

python_task >> branch_task >> [email_report_task, no_email_task]
