-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline.py
73 lines (60 loc) · 2.1 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import kfp
from kfp import dsl
# Build ContainerOps of Docker images with needed arguments for data/model access and file_output definitions.
# TODO Fix the FutureWarning regarding ContainerOp
def preprocess_op():
return dsl.ContainerOp(
name='Preprocess Data',
image='nannakaroliina/kubeflow_pipeline_demo_preprocessing:latest',
arguments=[],
file_outputs={
'x_train': '/app/x_train.npy',
'x_test': '/app/x_test.npy',
'y_train': '/app/y_train.npy',
'y_test': '/app/y_test.npy',
}
)
def train_op(x_train, y_train):
return dsl.ContainerOp(
name='Train Model',
image='nannakaroliina/kubeflow_pipeline_demo_train:latest',
arguments=[
'--x_train', x_train,
'--y_train', y_train
],
file_outputs={
'model': '/app/model.plk'
}
)
def predict_op(x_test, y_test, model):
return dsl.ContainerOp(
name='Test Model',
image='nannakaroliina/kubeflow_pipeline_demo_predict:latest',
arguments=[
'--x_test', x_test,
'--y_test', y_test,
'--model', model
],
file_outputs={
'f1_score': '/app/results.txt'
}
)
@dsl.pipeline(
name='Kubeflow pipeline demo',
description='Kubeflow pipeline demo with simple model training'
)
def kubeflow_demo_pipeline():
_preprocess_op = preprocess_op()
_train_op = train_op(
dsl.InputArgumentPath(_preprocess_op.outputs['x_train']),
dsl.InputArgumentPath(_preprocess_op.outputs['y_train'])
).after(_preprocess_op)
_test_op = predict_op(
dsl.InputArgumentPath(_preprocess_op.outputs['x_test']),
dsl.InputArgumentPath(_preprocess_op.outputs['y_test']),
dsl.InputArgumentPath(_train_op.outputs['model'])
).after(_train_op)
if __name__ == '__main__':
# Build a pipeline yaml file to be uploaded to Kubeflow Pipeline UI
# TODO implement local run option without manual pipeline creation
kfp.compiler.Compiler().compile(kubeflow_demo_pipeline, 'pipeline.yaml')