# Airflow Note

---
### **Airflow 中最基本的兩個概念是：DAG 和 task。**

* DAG 的全稱是 Directed Acyclic Graph 是所有你想執行的任務的集合，在這個集合中你定義了他們的依賴關係，一個 DAG 是指一個 DAG object，一個 DAG object 可以在 Python 腳本中配置完成。
* DAG 的最終目標是將所有工作依照上下游關係全部執行，而不是關注個別的工作實際上是怎麼被實作的

* DAG 的定義使用 Python 完成的，其實就是一個 Python 檔，存放在 DAG 目錄，Airflow 會動態的從這個目錄構建 DAG object，每個 DAG object 代表了一個 workflow，每個 workflow 都可以包含任意多個 task。 

* 第一次運行Airflow時，它會在 AIRFLOW_HOME目錄中創建一個名為airflow.cfg的文件（默認情況下為 ~/airflow）。此文件包含Airflow的配置，您可以對其進行編輯以更改任何設置。

---  
#### **安裝和使用Airflow是基於Python構建的，可以很容易用pip安裝使用，pip install apache-airflow，默認情況下airflow會在~/airflow目錄存放相關配置。**

#### Airflow 提供了一些列命令來完成airflow 的初始化工作來和它的正確使用。  
> pip install "apache-airflow[crypto, slack]"  
> export AIRFLOW_HOME="$(pwd)"  *(理想上把 AIRFLOW_HOME 加入到 ~/.bash_profile 裡)*    
   


在 airflow 目錄 初始化資料庫和 airflow 配置  
> airflow initdb  

啟動 Airflow 的網頁伺服器：  
> airflow webserver -p 8080 

---

##### **補充1:  Airflow是使用SqlAlchemy庫與metadata進行interact，因此您使用任何支持作為SqlAlchemy後端的數據庫後端。建議使用MySQL或Postgres。**  

* We rely on more strict ANSI SQL settings for MySQL in order to have sane defaults. Make sure to have specified explicit_defaults_for_timestamp=1 in your my.cnf under [mysqld]

* Once you’ve setup your database to host Airflow, you’ll need to alter the SqlAlchemy connection string located in your configuration file $AIRFLOW_HOME/airflow.cfg You should then also change the “executor” setting to use “LocalExecutor”, an executor that can parallelize task instances locally.


##### **補充2:**  The example files are not in /usr/local/airflow/dags. You can simply mute them by edit airflow.cfg (usually in ~/airflow). 
* **set load_examples = False**  in 'core' section

##### **補充3:**  airflow.exceptions.AirflowException: Could not create Fernet object: Incorrect padding
出現這個問題的原因在於fernet key 需要修改airflow.cfg (默認位於~/airflow/)裡的fernet_key 
> python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"  

這個命令會生成一個key 替換airflow.cfg裡的fernet_key 再次執行airflow initdb 或 airflow resetdb  

---

In [None]:
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'Meng Lee',
    'start_date': datetime(2100, 1, 1, 0, 0),
    'schedule_interval': '@daily',
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}


def fn_superman():
    print("取得使用者的閱讀紀錄")
    print("去漫畫網站看有沒有新的章節")
    print("跟紀錄比較，有沒有新連載？")

    # Murphy's Law
    accident_occur = time.time() % 2 > 1
    if accident_occur:
        print("\n天有不測風雲,人有旦夕禍福")
        print("工作遇到預期外狀況被中斷\n")
        return

    new_comic_available = time.time() % 2 > 1
    if new_comic_available:
        print("寄 Slack 通知")
        print("更新閱讀紀錄")
    else:
        print("什麼都不幹，工作順利結束")


with DAG('comic_app_v1', default_args=default_args) as dag:
    superman_task = PythonOperator(
        task_id='superman_task',
        python_callable=fn_superman
    )

---
#### 用 Python 測試 script 的正確性：
> python dags/comic_app_v1.py    

* 沒有特別設定的話， Airflow 會去 AIRFLOW_HOME 路徑底下的 dags 子資料夾找 DAG
* 這些排程設定為了方便管理，一般都另外定義在 default_args 變數並放在 script 的最上面。

#### 此 DAG 的排程（Scheduling）設定如 :

* 'start_date': datetime(2100, 1, 1, 0, 0) 代表從西元 2100 年開始第一次執行此 DAG
* 每次執行之間間隔多久。'schedule_interval': '@daily' 代表每天執行一次
* 'retries': 2 則允許 Airflow 在 DAG 失敗時重試 2 次
* DAG 失敗後等多久後開始重試（'retry_delay': timedelta(minutes=1)　代表等一分鐘）
* 更多更多 ...

#### DAG只知道有哪些工作以及這些工作之間的執行順序。而實際上這些工作要怎麼被完成，實作邏輯則是由各種 Operator 負責。

##### 你可以想像 Opeartors 就是幫我們完成特定種類工作的小幫手，像是一些常見的例子：

* PythonOperator 執行一個 Python 函式
* BashOperator 執行 Bash 指令
* S3KeySensor 監測 S3 上的檔案存不存在
* SlackAPIPostOperator 送訊息給 Slack...   


PythonOperator 可以說是 Airflow 裡最基本也最強大的 Opeartors 之一。學會使用方法以後，你可以將任何你定義的 Python 函式變成一個 Airflow 工作。
基本的使用方法非常簡單，你只要指定一個可呼叫的 Python 函式給 python_callable 參數以及設定一個工作名稱（task_id）即可

---
##### Tips:
1. 用 **python dags/comic_app_v1.py** 確保 DAG 本身沒有語法問題 
2. 使用 Airflow 的 test  **airflow test comic_app_v1 superman_task 2019-04-28**  
2019-04-28 = 被執行日期    

airflow test 指令實際上只能用來測試單一工作，而不能測試整個 DAG  

---

#### App 版本二：模組化
所以現在我們要做的改善（Refactoring）很簡單：
1. 將 App 邏輯從 comic_app_v1 DAG 中的函式 fn_superman 中拿出來
2. 為 App 的每個步驟分別定義一個 Python 函式
3. 在 DAG 裡頭利用 PythonOperator 建立多個 Airflow 工作並分別呼叫這些函式
4. 定義這些工作的執行順序

In [None]:
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.slack_operator import SlackAPIPostOperator

default_args = {
    'owner': 'Meng Lee',
    'start_date': datetime(2100, 1, 1, 0, 0),
    'schedule_interval': '@daily',
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}


def process_metadata(mode, **context):
    if mode == 'read':
        print("取得使用者的閱讀紀錄")
    elif mode == 'write':
        print("更新閱讀紀錄")


def check_comic_info(**context):
    all_comic_info = context['task_instance'].xcom_pull(task_ids='get_read_history')
    print("去漫畫網站看有沒有新的章節")

    anything_new = time.time() % 2 > 1
    return anything_new, all_comic_info


def decide_what_to_do(**context):
    anything_new, all_comic_info = context['task_instance'].xcom_pull(task_ids='check_comic_info')

    print("跟紀錄比較，有沒有新連載？")
    if anything_new:
        return 'yes_generate_notification'
    else:
        return 'no_do_nothing'


def generate_message(**context):
    _, all_comic_info = context['task_instance'].xcom_pull(task_ids='check_comic_info')
    print("產生要寄給 Slack 的訊息內容並存成檔案")


with DAG('comic_app_v2', default_args=default_args) as dag:

    get_read_history = PythonOperator(
        task_id='get_read_history',
        python_callable=process_metadata,
        op_args=['read']
    )

    check_comic_info = PythonOperator(
        task_id='check_comic_info',
        python_callable=check_comic_info,
        provide_context=True
    )

    decide_what_to_do = BranchPythonOperator(
        task_id='new_comic_available',
        python_callable=decide_what_to_do,
        provide_context=True
    )

    update_read_history = PythonOperator(
        task_id='update_read_history',
        python_callable=process_metadata,
        op_args=['write'],
        provide_context=True
    )

    generate_notification = PythonOperator(
        task_id='yes_generate_notification',
        python_callable=generate_message,
        provide_context=True
    )

    send_notification = SlackAPIPostOperator(
        task_id='send_notification',
        token="YOUR_SLACK_TOKEN",
        channel='#comic-notification',
        text="[{{ ds }}] 海賊王有新番了!",
        icon_url='http://airbnb.io/img/projects/airflow3.png'
    )

    do_nothing = DummyOperator(task_id='no_do_nothing')

    # define workflow
    get_read_history >> check_comic_info >> decide_what_to_do

    decide_what_to_do >> generate_notification
    decide_what_to_do >> do_nothing

    generate_notification >> send_notification >> update_read_history

---  
#### **Airflow 排程器**
* 如同當初測試 comic_app_v1 DAG 裡頭的 superman_task 工作一樣，在我們放心讓 Airflow 幫我們排程... comic_app_v2 DAG以前，應該分別測試裡頭所有工作，確保它們的執行結果如我們預期：

> airflow test comic_app_v2 get_read_history 2019-04-28
取得使用者的閱讀紀錄

> airflow test comic_app_v2 check_comic_info 2019-04-28
跟紀錄比較，有沒有新連載？

> airflow test comic_app_v2 new_comic_available 2019-04-28
去漫畫網站看有沒有新的章節  
    
啟動排程器：

> airflow scheduler  

---  

#### **手動觸發 DAG**

* 雖說將一個 DAG 取消暫停（Unpause）可以讓它成為 Airflow 的排程對象，實際上 Airflow 的排程又分兩種方式：

  1. 手動觸發（Manual）
     常用在測試 DAG 或是有意外發生，需要手動重新執行 DAG 的時候
  2. 定期執行（Scheduled）
     也就是所謂的「正式上線」。
     依照 DAG 的 start_date 及 schedule_interval 設定決定何時執行

當然你也可以在不透過 Web UI 的情況下，直接利用 terminal 取消暫停一個 DAG 並觸發它：  

> 取消暫停一個 DAG:  
>  
> airflow unpause comic_app_v2    
>  
> 觸發它:   
>
> airflow trigger_dag comic_app_v2  
  
DAG 跟 DAG Run 的差異在於  
* 前者只是個定義好的工作流程，  
* 後者則是該 DAG 在某個時間點實際被排程器拿去執行（Run）過後的結果，會有一個執行日期（execute_date）。  

---
##### Airflow 提供了一些列的命令行用來查看DAG 和task  
~~~
# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
~~~  

---

##### 我們分別定義好工作以後，在最下面用 >> 語法告訴 Airflow 這兩個工作的相依性：  
yes_generate_notification 工作要在 send_notification 之前執行
> generate_notification >> send_notification >> ...  

##### Task 的依賴Task 之間是能相互建立依賴的，形如：
~~~~
t2.set_upstream(t1)

# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)

t3.set_upstream(t1)

# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')   

Airflow 會自動檢測環形依賴以防止task 無法工作的情況出現
~~~~


----
**補充:**   

在手動觸發 DAG 章節我們有看到，要讓 Airflow 排程器開始排程一個 DAG，首先要終止暫停（Unpause）該 DAG。  
而為何當時 Airflow 沒有在我們 comic_app_v2一終止暫停 就開始自動排程，而要等到我們手動觸發呢？  

  
> 'start_date': datetime(2100, 1, 1, 0, 0)   

代表我們希望 comic_app_v2 DAG 的第一個執行日期（execute_date）為西元 2100 年 1 月 1 號 0 點。  

所以假如西元 2100 年我們架的 Airflow 排程器還在運作的話，它會在：

**start_date 2100 年 1 月 1 號 0 點 0 分 + 1 * schedule_interval**   
=   2100 年 1 月 1 號 0 點 0 分 + 1 * @daily  
=   2100 年 1 月 1 號 0 點 0 分 + 1 * 24 小時  
=   2100 年 1 月 2 號 0 點 0 分  
  
  
  
所以假設有SQL語法:  
~~~
SELECT COUNT(user_id) AS num_new_users
FROM user_activities
WHERE dt = '{execute_date}'
~~~  


{execute_date} 實際上是 Jinja 語法，它允許我們將 Python 變數渲染（Render）到字串裡頭，動態地產生文本。  
這就像是我們有個變數 ds，然後利用 format 語法一樣：
~~~
SELECT COUNT(user_id) AS num_new_users
FROM user_activities
WHERE dt = '2100-01-01'
~~~  

為了避免: SQL 查詢工作會馬上在西元 2100 年 1 月 1 號的 0 點，想辦法去把西元 2100 年 1 月 1 號整天的使用者資料全部撈出來。  
         而因為此 SQL 查詢執行時， 1 月 1 號才剛開始，這個查詢不會取得任何資料。  
         
一般而言，Airflow 會在 start_date 加上一個 schedule_interval 之後開始第一次執行某個 DAG，  
而該 DAG Run 的 execute_date 為 start_date。

**一個 DAG Run 中的執行日期，只等於它「負責」的日期，不等於它實際被 Airflow 排程器執行的日期。**  
**一個被自動排程且執行日期為 dt 的 DAG Run，實際上是在 dt + schedule_period 後被 Airflow 執行。**   


*所以假設將 start_date 設為今天以前的日期，並啟動 Airflow 排程器的話，*   
*就會讓 Airflow 排程器馬上開始排程執行日期為 start_date 的 DAG Run，並且一直執行到最新的 DAG Run 為止。*

####  



**補充2:**  
修改 comic_app_v2 DAG 的程式碼 通常 Airflow 沒多久就會重新載入最新的程式碼。  
如果你懷疑程式碼沒有被更新，可以點擊 Airflow UI 首頁中 comic_app_v2 DAG 最右邊 Links 裡頭的「Refresh」按鈕。    

----  

##### 將 comic_app_v2 DAG 的 start_date 設定成 2018 年 8 月 17 號以後，在作者撰文的 8 月 20 號晚間 10 點為止， Airflow 會排程幾次 DAG Runs？它們分別的執行日期為何？


答案揭曉，Airflow 排程器總共排程三個 DAG Runs，他們的執行日期分別為：

* 2018-08-17
* 2018-08-18
* 2018-08-19
* 8 月 20 號的 DAG Run 則要等到 8 月 21 號 0 點才會被執行。

---
### Airflow 技巧
---
~~~
def process_metadata(mode, **context):
    if mode == 'read':
        ...
    elif mode == 'write':
        ...

with DAG('comic_app_v3', default_args=default_args) as dag:

    get_read_history = PythonOperator(
        task_id='get_read_history',
        python_callable=process_metadata,
        op_args=['read'],
        provide_context=True
    )       

    ...

    update_read_history = PythonOperator(
        task_id='update_read_history',
        python_callable=process_metadata,
        op_args=['write'],
        provide_context=True
    )
~~~  
---

##### 你會發現上面兩個 Airflow 工作的 python_callable 都呼叫 process_metadata，因為它們做類似的事情：

* get_read_history 負責讀取閱讀紀錄
* update_read_history 負責更新閱讀紀錄
而這兩個工作則利用不同的 op_args 來使用 process_metadata 函式的不同功能。  
這樣的好處是我們不需要為每個類似的 PythonOperator 都分別建立一個新的 Python 函式，  
而是利用參數 op_args 來改變同個 Python 函式的執行結果。

**當然，傳遞參數給 Python 函式這件事情本身就是很常見，這時候 op_args 就會派上用場。**

---
---
#### Xcom：工作之間的訊息交換
* Xcom（Cross Communication） 是 Airflow 工作之間交換訊息的方式。一個被 PythonOperator 呼叫的 Python 函式所回傳（return）的值，都可以被其他 Airflow 工作透過 Xcom 存取：  
---
~~~
def check_comic_info(**context):

    print("檢查有無新連載")
    ...

    return anything_new, all_comic_info


def decide_what_to_do(**context):
    anything_new, all_comic_info = context['task_instance'].xcom_pull(task_ids='check_comic_info')

    print("跟紀錄比較，有沒有新連載？")
    if anything_new:
        return 'yes_generate_notification'
    else:
        return 'no_do_nothing'

...


with DAG('comic_app_v3', ...

    ...

    check_comic_info = PythonOperator(
        task_id='check_comic_info',
        python_callable=check_comic_info,
        provide_context=True
    )
~~~
   
   

> anything_new, all_comic_info = context['task_instance'].xcom_pull(task_ids='check_comic_info')  

**下游工作可以透過這樣的方式取得上游工作的執行結果，來決定接下來要做的任務。**

*值得注意的是 XCom 的所有資料在 pickle 之後會被存到 Airflow 的 Metadata Database（通常是 MySQL）裡頭，因此不適合交換太大的數據（例：100 萬行的 Pandas DataFrame），而適合用在交換 Metadata。*

---
~~~
def check_comic_info(**context):
    ...
~~~ 

* 裡頭的 **context 的語法是為了取得 Airflow 在執行工作時產生的環境變數，其中就包含 XCom。
* 除了要在 Python 函式設置 **context 以外，我們還必須將 PythonOperator 的 provide_context 參數設置為 True，Airflow 才會把環境變數傳給該工作

---

---
#### 在工作流程內加入條件分支

要在 Airflow 裡頭實現這樣的邏輯，可以在上下游工作「之間」新增一個 BranchPythonOperater（如圖中的 new_comic_available 工作）：

1. 砍掉原上游工作跟下游工作之間的 >>
2. 將原上游工作 >> 該 BranchPythonOperator 工作
3. 將該 BranchPythonOperator 工作 >> 原下游工作

~~~
from airflow.operators.python_operator BranchPythonOperator

def decide_what_to_do(**context):
    anything_new, all_comic_info = context['task_instance'].xcom_pull(task_ids='check_comic_info')

    print("跟紀錄比較，有沒有新連載？")
    if anything_new:
        return 'yes_generate_notification'
    else:
        return 'no_do_nothing'

    ...

with DAG('comic_app_v3', default_args=default_args) as dag:

    ...

    decide_what_to_do = BranchPythonOperator(
        task_id='new_comic_available',
        python_callable=decide_what_to_do,
        provide_context=True
    )

    generate_notification = PythonOperator(...)
    do_nothing = DummyOperator(task_id='no_do_nothing')

    decide_what_to_do >> generate_notification
    decide_what_to_do >> do_nothing
~~~

而 BranchPythonOperator 一樣會呼叫一個 Python 函式（上例的 decide_what_to_do 函式），由該函式決定到底最後哪個下游工作會被執行。  
基本上該函式會依照實際情況決定哪個下游工作被執行，並將該下游工作的 task_id 回傳。

而因為在這個例子中，我們希望依照上游工作 check_comic_info 回傳的一個布林值 anything_new 來決定要執行哪個下游工作，  
因此可以使用 xcom_pull 取得該結果以後回傳要執行的下游工作 ID task_id。