-
Notifications
You must be signed in to change notification settings - Fork 59
/
06_templated_query.py
47 lines (36 loc) · 1.21 KB
/
06_templated_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import datetime as dt
from pathlib import Path
import pandas as pd
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="06_templated_query",
schedule_interval="@daily",
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
)
fetch_events = BashOperator(
task_id="fetch_events",
bash_command=(
"mkdir -p /data/events && "
"curl -o /data/events.json "
"http://events_api:5000/events?"
"start_date={{execution_date.strftime('%Y-%m-%d')}}&"
"end_date={{next_execution_date.strftime('%Y-%m-%d')}}"
),
dag=dag,
)
def _calculate_stats(input_path, output_path):
"""Calculates event statistics."""
events = pd.read_json(input_path)
stats = events.groupby(["date", "user"]).size().reset_index()
Path(output_path).parent.mkdir(exist_ok=True)
stats.to_csv(output_path, index=False)
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
dag=dag,
)
fetch_events >> calculate_stats