# Airflow, Mlflow를 활용한 ML Cycle

## MNIST 손글씨 분석을 위한 데이터 수집-모델 훈련-배포 과정 구축

### 관련 패키지 설치

In [1]:
#sqlalchemy의 버전이 1.4 이상인 경우 에러가 발생합니다.
!pip uninstall sqlalchemy -y
!pip install 'sqlalchemy < 1.4.0' apache-airflow attrdict mlflow

Found existing installation: SQLAlchemy 1.3.24
Uninstalling SQLAlchemy-1.3.24:
  Successfully uninstalled SQLAlchemy-1.3.24
Collecting sqlalchemy<1.4.0
  Using cached SQLAlchemy-1.3.24-cp37-cp37m-manylinux2010_x86_64.whl (1.3 MB)
Installing collected packages: sqlalchemy
Successfully installed sqlalchemy-1.3.24


### Airflow 셋팅

In [2]:
!airflow db init

DB: sqlite:////opt/ml/airflow/airflow.db
[[34m2021-06-10 06:47:06,614[0m] {[34mdb.py:[0m695} INFO[0m - Creating tables[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
Initialization done


In [3]:
# 기본적으로 제공되는 예제 DAG들을 가리기 위해
# 터미널에 다음과 같이 이동하여 해당 파일에 load example 옵션을 비활성화합니다.
# (반드시 해줘야 하는 것은 아닙니다.)

# cd ~/airflow
# vim airflow.cfg

## === airflow.cfg === ##
# load_examples = False

In [4]:
!mkdir ~/airflow/dags

mkdir: cannot create directory ‘/opt/ml/airflow/dags’: File exists


In [5]:
## 멀티 프로세싱 관련된 에러를 피하기 위해
# https://stackoverflow.com/questions/50168647/multiprocessing-causes-python-to-crash-and-gives-an-error-may-have-been-in-progr
!export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

In [6]:
#GUI를 위한 유저생성

!airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --password 1234 \
    --role Admin \
    --email spiderman@superhero.org


admin already exist in the db


In [None]:
#터미널을 하나 켜서 다음 명령어 입력
# export TZ=Asia/Seoul
# export AIRFLOW_HOME=~/airflow
# airflow scheduler

In [7]:
# 이미 6006번 포트를 잡고 있을 경우를 대비해 6006번 포트를 사용하는 프로세스를 종료해줍니다.
!apt-get install lsof
!kill -9 `lsof -t -i:6006`




The following NEW packages will be installed:
  lsof
0 upgraded, 1 newly installed, 0 to remove and 37 not upgraded.
Need to get 248 kB of archives.
After this operation, 451 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic/main amd64 lsof amd64 4.89+dfsg-0.1 [248 kB]
Fetched 248 kB in 2s (144 kB/s)
debconf: delaying package configuration, since apt-utils is not installed
Selecting previously unselected package lsof.
(Reading database ... 17034 files and directories currently installed.)
Preparing to unpack .../lsof_4.89+dfsg-0.1_amd64.deb ...
Unpacking lsof (4.89+dfsg-0.1) ...
Setting up lsof (4.89+dfsg-0.1) ...
kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... or kill -l [sigspec]


In [None]:
#터미널을 하나 더 켜서 다음 명령어 입력 (Airflow GUI 실행)
# export TZ=Asia/Seoul
# export AIRFLOW_HOME=~/airflow
# airflow webserver -p 6006

# 이제 다음 주소로 접근 가능합니다 (텐서보드 접속 포트를 확인해주세요!)
# http://<SERVER_IP>:6009

## Airflow DAG 테스트

### 간단한 Bash Operator를 기반으로 한 Dag

In [8]:
%%writefile ~/airflow/dags/simple_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator


default_args = {
    'owner': 'Boost Kim',
    'start_date': days_ago(1),
}


dag = DAG(
    'simple_pipeline',
    # default_args=default_args,
    description='A simple pipeline',
    schedule_interval=None,
)


task_1 = BashOperator(
    task_id='task_1',
    bash_command="echo 1",
    dag=dag
)
task_2 = BashOperator(
    task_id='task_2',
    bash_command="echo 2",
    dag=dag
)
task_3 = BashOperator(
    task_id='task_3',
    bash_command="echo 3",
    dag=dag
)


task_1 >> task_2 >> task_3

Overwriting /opt/ml/airflow/dags/simple_dag.py


In [9]:
!airflow db reset -y

DB: sqlite:////opt/ml/airflow/airflow.db
[[34m2021-06-10 06:48:50,854[0m] {[34mdb.py:[0m711} INFO[0m - Dropping tables that exist[0m
[[34m2021-06-10 06:48:51,000[0m] {[34mmigration.py:[0m154} INFO[0m - Context impl [01mSQLiteImpl[22m.[0m
[[34m2021-06-10 06:48:51,000[0m] {[34mmigration.py:[0m161} INFO[0m - Will assume [01mnon-transactional[22m DDL.[0m
[[34m2021-06-10 06:48:51,133[0m] {[34mdb.py:[0m695} INFO[0m - Creating tables[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More loggi

### 간단한 Python Operator 기반으로 한 Dag

In [None]:
%%writefile ~/airflow/dags/python_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
import time

default_args = {
    'owner': 'Boost Kim',
    'start_date': days_ago(1),
}


dag = DAG(
    'simple_python_pipeline',
    default_args=default_args,
    description='A simple python pipeline',
    schedule_interval=None,
)

#Python 함수에 변수를 받을 때는 *args = [], *kwargs = {} 형태로 받을 수 있다.
def sleep(**kwargs):
    delta = kwargs['delta']
    time.sleep(delta)
    print("Slept for {} seconds".format(delta))
    

task_1 = PythonOperator(
    task_id='task_1',
    python_callable=sleep,
    op_kwargs={'delta': 10}, #kwargs 형태로 전달
    dag=dag
)
task_2 = PythonOperator(
    task_id='task_2',
    python_callable=sleep,
    op_kwargs={'delta': 30}, #kwargs 형태로 전달
    dag=dag
)
task_3 = PythonOperator(
    task_id='task_3',
    python_callable=sleep,
    op_kwargs={'delta': 5}, #kwargs 형태로 전달
    dag=dag
)

task_1 >> task_2 >> task_3

In [None]:
!airflow db reset -y

### Conditional Tasks

In [None]:
%%writefile ~/airflow/dags/python_combined_dag.py
# 이 실습에서는 database backend를 sqlite를 사용하기 때문에 병렬 처리가 되지 않지만, 
# mysql과 같은 데이터베이스를 사용하면 병렬 처리가 가능합니다.
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
import time

default_args = {
    'owner': 'Boost Kim',
    'start_date': days_ago(1),
}


dag = DAG(
    'combined_python_pipeline',
    default_args=default_args,
    description='A combined python pipeline',
    schedule_interval=None,
    concurrency = 2
)

def sleep(**kwargs):
    delta = kwargs['delta']
    time.sleep(delta)
    print("Slept for {} seconds".format(delta))
    

task_1 = PythonOperator(
    task_id='task_1',
    python_callable=sleep,
    op_kwargs={'delta': 10},
    dag=dag
)
task_2_1 = PythonOperator(
    task_id='task_2_1',
    python_callable=sleep,
    op_kwargs={'delta': 30},
    dag=dag
)
task_2_2 = PythonOperator(
    task_id='task_2_2',
    python_callable=sleep,
    op_kwargs={'delta': 10},
    dag=dag
)
task_3 = PythonOperator(
    task_id='task_3',
    python_callable=sleep,
    op_kwargs={'delta': 5},
    dag=dag
)

# task_2_1과 task_2_2가 만족이 되어야 task_3dl tlfgod
# 현재는 SequentialExecutor 세팅이지만, mysql, postresql 등을 연동하여 LocalExecutor 사용시 병렬 처리 가능!
task_1 >> [task_2_1, task_2_2] >> task_3

In [None]:
!airflow db reset -y

### 스케쥴을 사용한 Dag

In [None]:
%%writefile ~/airflow/dags/scheduled_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator


default_args = {
    'owner': 'Boost Kim',
    'start_date': days_ago(1),
}

# interval 세팅은 다음 링크 참고
# https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html#dag-runs
# 아래 예제는 매 5분마다
dag = DAG(
    'simple_scheduled_pipeline',
    default_args=default_args,
    description='A simple scheduled pipeline',
    schedule_interval='*/5 * * * *',
)


task_1 = BashOperator(
    task_id='task_1',
    bash_command="echo 1",
    dag=dag
)
task_2 = BashOperator(
    task_id='task_3',
    bash_command="echo 2",
    dag=dag
)
task_3 = BashOperator(
    task_id='task_2',
    bash_command="echo 3",
    dag=dag
)


task_1 >> task_2 >> task_3

In [None]:
!airflow db reset -y

### 한번에 한 run만 실행이 되고, 지난 시간 부분을 채우지 않는 Dag

In [None]:
%%writefile ~/airflow/dags/strict_scheduled_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator


default_args = {
    'owner': 'Boost Kim',
    'start_date': days_ago(1),
}

# 1분 마다 하되, scheduler에 등록된 시점을 기준으로만, 그리고 한번에 최대 1개의 run만
dag = DAG(
    'strict_scheduled_pipeline',
    default_args=default_args,
    description='A strict scheduled pipeline',
    schedule_interval='*/1 * * * *',
    is_paused_upon_creation=False, #등록되면 바로 활성화
    catchup = False, # 시작시점(start_date) 부터 채워 넣지 않기
    max_active_runs=1 #한번에 한 run만
)


task_1 = BashOperator(
    task_id='task_1',
    bash_command="echo 1",
    dag=dag
)
task_2 = BashOperator(
    task_id='task_3',
    bash_command="echo 2",
    dag=dag
)
task_3 = BashOperator(
    task_id='task_2',
    bash_command="echo 3",
    dag=dag
)


task_1 >> task_2 >> task_3

In [None]:
!airflow db reset -y

### 오늘의 실제 실습 Dag
- 강의용 코드기 때문에 한 파일에 관련된 함수들을 모두 다 넣었습니다.
- 아래 코드는 실행용이 아닌 airflow dags 폴더에 저장이 되도록해놓은 코드입니다.
- 데이터 수집 - 훈련 - 배포

In [None]:
%%writefile ~/airflow/dags/airflow_example.py

from datetime import timedelta
from airflow import DAG

from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator


from sklearn.ensemble import RandomForestClassifier
import numpy as np

import pickle
import mlflow
from attrdict import AttrDict

import random
import requests

#####START ML CODE#####
def collect_data():
    #40%의 확률로 에러가 발생하도록 설정해놨습니다.
    if random.randint(0,10) < 4:
        raise Exception("Fake ERROR: Failed to Download!")
    url = 'https://s3.amazonaws.com/img-datasets/mnist.npz'
    r = requests.get(url, allow_redirects=True)
    open('mnist.npz', 'wb').write(r.content)
    
# context는 operator간의 값들을 공유하기 위함
def train(**context):
    config = AttrDict(context['dag_run'].conf)

    mlflowInit(config)
    f = np.load('mnist.npz')

    sample = 5000
    X_train, y_train = f['x_train'][:sample], f['y_train'][:sample]
    X_test, y_test = f['x_test'], f['y_test']

    X_train, X_test = X_train.reshape(X_train.shape[0],-1), X_test.reshape(X_test.shape[0],-1)
    

    clf = RandomForestClassifier()
    clf.fit(X_train, y_train)
    prediction = clf.predict(X_test)
    result = (prediction == y_test).mean()
    print(result)
    mlflow.log_metric('acc',result)
    
    mlflow.sklearn.log_model(clf, 'save_model')
    model_path = mlflow.get_artifact_uri().replace('file://', '')
    
    #중요: task간의 값들을 전달하는 방법
    #xcom == cross communication
    context['ti'].xcom_push(key='model_path', value=model_path)
    
    mlflow.end_run()

def mlflowInit(config):
    try:
        mlflow.create_experiment(name=config.experiment)
    except:
        print('Exist experiment')

    mlflow.set_experiment(config.experiment)

    mlflow.start_run()

    mlflow.set_tag('version', config.version)
    mlflow.log_params(config)
#####END ML CODE#####




#####START DAG CODE#####
default_args = {
    'owner': 'Boost Kim',
    'depends_on_past': True,
    'start_date': days_ago(1),
    'retries': 4,
    'retry_delay': timedelta(seconds=20)
}


dag = DAG(
    'ml_pipeline',
    default_args=default_args,
    description='A simple Machine Learning pipeline',
    schedule_interval=None,
)


download_images = PythonOperator(
    task_id='collect_data',
    python_callable=collect_data,
    retries=3,
    dag=dag,
)
train = PythonOperator(
    task_id='train',
    depends_on_past=True,
    python_callable=train,
    dag=dag,
)

#airflow ti 변수 활용 (jinja tempalte 방식)
#https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html#macros-reference
serve = BashOperator(
    task_id='serve',
    depends_on_past=False,
    bash_command="mlflow models serve -m {{ ti.xcom_pull(key='model_path') }}/save_model --no-conda -h 0.0.0.0 -p 8889 &",
    dag=dag,
)

download_images >> train >> serve

#####END DAG CODE#####

In [None]:
!airflow db reset -y

### DAG 실행

In [None]:
#airflow GUI 상에서 해당 DAG를 trigger를 하면서 입력해주는 configuration
#{"version": 0.1,"experiment": "mlflow-airflow"}


## 서빙되는 모델 확인

In [None]:
from IPython.display import Image
Image(filename='data/mnist_5.jpg') 

In [None]:
!pip install Pillow

In [None]:

from PIL import Image
import requests, json
import numpy as np

img = Image.open('data/mnist_5.jpg')
image_data = np.array(img, dtype='uint8').reshape(-1).tolist()

url = 'http://localhost:8889/invocations'

data = {
    "columns": [i for i in range(0, len(image_data))],
    "data": [image_data]
}
headers = {
    'content-type':'application/json'
}
res = requests.post(url, headers=headers, data=json.dumps(data))

print('Predicted From Server:',json.loads(res.text))


In [None]:
#백그라운드로 돌고있는 웹서버를 종료하기 위한 코드
!apt-get install lsof
!kill -9 `lsof -t -i:8889`