Skip to content

airflow 설치부터 운영까지 이모저모

Notifications You must be signed in to change notification settings

dean-kg/AirFlow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

17 Commits
 
 

Repository files navigation

AirFlow

airflow 설치부터 운영까지 이모저모

셋업 postgre

sudo apt-get install postgresql postgresql-contrib -y

초기 셋업 db

mysql, sqlite의 경우 multiprocess불가능 -> postgresql 이동필요

https://jungwoon.github.io/airflow/2019/02/26/Airflow.html
sudo -u postgres psql

도커용

https://dorumugs.tistory.com/entry/AirFlow-Manual-on-Docker-stage-install

도커 권한
https://velog.io/@jeong3320/dockerdocker-sudo%EA%B6%8C%ED%95%9C%EC%97%86%EC%9D%B4-%EC%8B%A4%ED%96%89%ED%95%98%EA%B8%B0

서버 이사가면서 재설정

  • docker-compose 2버전 이상설치
  • airflow 2.3.2업데이트

초기 유저설정

airflow users create
--username admin
--firstname FIRST_NAME
--lastname LAST_NAME
--role Admin
--password admin
--email pbj00812@gmail.com

/home/admin/airflow/python_runtime/bin/gunicorn 에러 대처

$ export PATH=$PATH:~/.local/bin

실행

airflow webserver -p port_num &! airflow schduler &!

lightsail vm 실패

  • 3.5달러 (RAM 500MB) // 5달러 (RAM 1GB) -> 모두 webserver, scheduler 동시실행시 서버터짐 ㅋ (최소 4gb의 메모리가 필요)

dags 관련

roles 관련

  • user에게 실행 권한 주기 ("can create on DAG Runs")

webhook 관련

  • slack등의 outcome의 기능은 존재
  • 자체 income 기능은 없다

api

pythonoperator Dags conf 인자 받기

api 상에서
c = Client(None, None)
c.trigger_dag(dag_id='dag_id', conf={'target':target}) 으로 api 호출
dag 상에서

def print_arguments(**kwargs):   
    table_name = kwargs['dag_run'].conf.get('table')   -> 포인트   
      
        
         
task = PythonOperator(
    task_id="sample_task",
    python_callable=print_arguments,
    provide_context=True,                ## 반드시 해당 옵션을 지정해야 함
    dag=dag
)

variable 익명화

password,secret,passwd,authorization,api_key,apikey,access_token 의 단어들이 key값으로 들어가면 value값이 익명으로 나타난다.

postgresql operator

  • hook을 이용 하여 sql return value를 핸들링 할 수 있다.
from airflow.providers.postgres.hooks.postgres import PostgresHook


def reorderCheck(**xcompusher):
    hook = PostgresHook(postgres_conn_id='db_conn')
    
    #x_com으로 특정 value 추출
    userpk = xcompusher['t'].xcom_pull(key='v')['v1']

    if userpk =='NotExist':
        reorder = 'response Error'
    else:
    
        #쿼리 날리기
        resultSql = hook.get_records("select exists(select 1 from completedorderlst where userpk='%s');"%userpk)
        reorder =str(resultSql[0][0])
    
    #x_com 으로 데이터 업로드
    xcompusher['t'].xcom_push(key='vv', value={'vv':reorder})


db 에서 pandas 이동

push

def fun1(**params):
    mergeDF = pd.DataFrame()
    params['ti'].xcom_push(key='subdata', value={'data': mergeDF.to_dict()})
    return

def fun2(**params):
    import pandas as pd
    df = pd.DataFrame(params['ti'].xcom_pull(key='subdata')['data'])
    retrun


postgreHook

check exist

from airflow.providers.postgres.hooks.postgres import PostgresHook

hook = PostgresHook(postgres_conn_id='dbconn')
resultSql = hook.get_records(
    "select exists(select 1 from TABLENAME where COLUMNSNALE='%s');" % VALUE)

insert

request = "insert into TABLENAME (p1,p2,p3) values ('%s',%d,'%s');" % (
    p1value, p2value, p3value)
pg_hook = PostgresHook(postgres_conn_id='dbconn')
connection = pg_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
connection.commit()
cursor.close()
connection.close()

get parameter conf from webserver

def function(**parm):

    ## on webserber {"name":"target_paramter"} -> 큰 따옴표로 찍어야함
    parm['dag_run'].conf.get('name')
    return 

에러발생케이스

  • docker로 운영시 Python pakages -> webserver 도커에 설치
  • Failed to fetch log file from worker. Unsupported URL protocol '' 에러의 경우 logs 폴더에 777권한

About

airflow 설치부터 운영까지 이모저모

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published