-
Notifications
You must be signed in to change notification settings - Fork 87
/
absenteeism_prediction_dag.py
150 lines (122 loc) · 4.99 KB
/
absenteeism_prediction_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""This is an example project using Hamilton with Apache Airflow
For the purpose of this example, we will read and write data from the Airflow
installation location (${AIRFLOW_HOME}/plugins/data).
For production environment, use intermediary storage.
ref: https://docs.astronomer.io/learn/airflow-passing-data-between-tasks
"""
import os
from datetime import datetime
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
AIRFLOW_HOME = os.getenv("AIRFLOW_HOME")
"""Within the DAG config `DEFAULT_DAG_PARAMS`, the dictionaries `h_prepare_data` and
`h_train_and_evaluate_model` are configs that we will be passed to the Hamilton driver
of Airflow task `prepare_data` and `train_and_evaluate_model` respectively. The settings
`feature_set` and `label` are kept outside driver configs since they need to be
consistent across Airflow tasks.
The top-level config keys (`raw_data_location`, `feature_set`, etc.) are individual
input box in the Airflow UI, making them easy to edit manually. Therefore, you should
avoid nested objects. Your configuration should be limited to values that you will need
to change on the fly.
"""
DEFAULT_DAG_PARAMS = dict(
raw_data_location=f"{AIRFLOW_HOME}/plugins/data/raw/Absenteeism_at_work.csv",
feature_set=[
"age_zero_mean_unit_variance",
"has_children",
"has_pet",
"is_summer",
"service_time",
],
label="absenteeism_time_in_hours",
validation_user_ids=[
"1",
"2",
"4",
"15",
"17",
"24",
"36",
],
h_prepare_data=dict(
development_flag=True,
),
h_train_and_evaluate_model=dict(
development_flag=True,
task="binary_classification",
pred_path=f"{AIRFLOW_HOME}/plugins/data/results/val_predictions.csv",
model_config={},
scorer_name="accuracy",
bootstrap_iter=1000,
),
)
@dag(
dag_id="hamilton-absenteeism-prediction",
description="Predict absenteeism using Hamilton and Airflow",
start_date=datetime(2023, 6, 18),
params=DEFAULT_DAG_PARAMS,
)
def absenteeism_prediction_dag():
"""Predict absenteeism using Hamilton and Airflow
The workflow is composed of 2 tasks, each with its own Hamilton driver.
Notice that the task `prepare_data` relies on the Python module `prepare_data.py`,
while the task `train_and_evaluate_model` relies on two Python modules
`train_model.py` and `evaluate_model.py`. Each Python module describes related sets
of functions, but in the context of this Airflow DAG it made sense to run the
model training and evaluation in the same node!
"""
@task
def prepare_data():
"""Load external data, preprocess dataset, and store cleaned data"""
# Python imports
import pandas as pd
# function modules imports from airflow/plugins directory
from absenteeism import prepare_data
from hamilton import driver
# load DAG params
context = get_current_context()
PARAMS = context["params"]
# load external data; could be data from S3, Snowflake, BigQuery, etc.
raw_data_location = PARAMS["raw_data_location"]
raw_df = pd.read_csv(raw_data_location, sep=";")
# instantiate Hamilton driver
hamilton_config = PARAMS["h_prepare_data"]
dr = driver.Driver(hamilton_config, prepare_data)
# execute Hamilton driver
label = PARAMS["label"]
# prepare_data.ALL_FEATURES is a constant defined in the module
features_df = dr.execute(
final_vars=prepare_data.ALL_FEATURES + [label],
inputs={"raw_df": raw_df},
)
# save preprocessed directory for multiple reused
features_path = f"{AIRFLOW_HOME}/plugins/data/features/full_feature_set.parquet"
features_df.to_parquet(features_path)
# `return` pushes to Airflow XCom a string path
return features_path
@task
def train_and_evaluate_model(features_path: str):
"""Train and evaluate a machine learning model"""
from absenteeism import evaluate_model, train_model
from hamilton import base, driver
context = get_current_context()
PARAMS = context["params"]
hamilton_config = PARAMS["h_train_and_evaluate_model"]
dr = driver.Driver(
hamilton_config,
train_model,
evaluate_model, # pass two function module to the Hamilton driver
adapter=base.DefaultAdapter(),
)
results = dr.execute(
final_vars=["save_validation_preds", "model_results"],
inputs={
"features_path": features_path, # value retrieved from Airflow XCom
"label": PARAMS["label"],
"feature_set": PARAMS["feature_set"],
"validation_user_ids": PARAMS["validation_user_ids"],
},
)
print(results)
(train_and_evaluate_model(prepare_data()))
absenteeism_prediction = absenteeism_prediction_dag()