-
Notifications
You must be signed in to change notification settings - Fork 3
/
airflow.txt
129 lines (101 loc) · 4.21 KB
/
airflow.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
airflow.cfg
============================================================
# Installation
pip install "apache-airflow[crypto]"
Postgres Backend
------------------------------------------------------------
executor = SequentialExecutor
sql_alchemy_conn = postgresql://USER:PASSWORD@localhost:5432/mydatabase
Celery executor
------------------------------------------------------------
broker_url = amqp://myuser:mypassword@localhost:5672/myvhost
celery_result_backend = db+postgresql://airflow:password@localhost:5432/database
============================================================
apt-get -qq update \
&& apt-get -y install rabbitmq-server gcc python-dev postgresql-9.6 mlocate \
&& updatedb
apt-get -y install vim \
&& cat <<EOF > ~/.vimrc
set tabstop=4
set shiftwidth=4
set autoindent
:syntax enable
set pastetoggle=<F12>
:color desert
if has("autocmd")
au BufReadPost * if line("'\"") > 0 && line("'\"") <= line("$") | exe "normal! g\`\"" | endif
endif
EOF
rabbitmqctl add_user airflow-worker secret \
&& rabbitmqctl set_permissions -p / airflow-worker ".*" ".*" ".*"
wget -qO- https://bootstrap.pypa.io/get-pip.py | python - \
&& pip install celery apache-airflow[celery,postgres,async,crypto]
# Postgres airflow DB and user
cat <<EOF > /tmp/airflow.sql
CREATE USER airflow WITH ENCRYPTED PASSWORD 'secret';
CREATE DATABASE airflow OWNER airflow;
EOF
su - postgres -c "psql -f /tmp/airflow.sql"
Install GCS Fuse
============================================================
export GCSFUSE_REPO=gcsfuse-`lsb_release -c -s` \
&& echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list \
&& curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add - \
&& apt-get -qq update \
&& apt-get -qq -y install gcsfuse \
&& gcsfuse --implicit-dirs --only-dir airflow-dags -o nonempty s-dump-aje3aizu4eex /root/airflow/dags
Utilities
============================================================
def send_slack_message(ctx):
"""
Create a notification on Slack channel.
Make sure that the owner of the task is set to 'my_slack_username' so that you will get the notification.
For multiple users, delimit with comma: 'user1, user2, user3'
Assign this callbaack to the DAG 'on_failure_callback' to activate the notification.
Example of slack endpoint: https://hooks.slack.com/services/wee9uYoo/P999920NA/LahRuc2aethohghienae
:param ctx: Airflow context dictionary. Refer to: https://airflow.incubator.apache.org/code.html?highlight=on_failure_callback#macros
:return:
"""
users = [x.strip() for x in ctx['task'].owner.split(',')]
slack_users = ['<@{}>'.format(x) for x in users]
requests.post(url=Variable.get('SLACK_ENDPOINT'),
json={'text': '{owner}: {task_instance}'.format(owner=' '.join(slack_users),
task_instance=ctx['task_instance'])})
Example DAGs
============================================================
mkdir -p ~/airflow/dags
cat <<EOF > ~/airflow/dags/mydag.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 11, 25)
}
dag = DAG('mydag', default_args=default_args, schedule_interval='0 0 * * *', catchup=False)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
EOF
DAG Tips
============================================================
# Prevent backfill
dag = DAG('print_date', default_args=default_args, schedule_interval='* * * * *', catchup=False)
Query task instances that are RUNNING or QUEUED
============================================================
import time
from airflow.models import TaskInstance
from airflow.utils.db import provide_session
from airflow.utils.state import State
@provide_session
def print_running_and_queued_tasks(session=None):
taskinstances = session.query(TaskInstance).filter(TaskInstance.state.in_([State.RUNNING, State.QUEUED]))
for t in taskinstances:
print(t)
if __name__ == '__main__':
while True:
print_running_and_queued_tasks()
print('-'*60)
time.sleep(3)