-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
per_entity_training.py
152 lines (129 loc) · 5.14 KB
/
per_entity_training.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A pipeline to demonstrate per-entity training.
This pipeline reads data from a CSV file, that contains information
about 15 different attributes like salary >=50k, education level,
native country, age, occupation and others. The pipeline does some filtering
by selecting certain education level, discarding missing values and empty rows.
The pipeline then groups the rows based on education level and
trains Decision Trees for each group and finally saves them.
"""
import argparse
import logging
import pickle
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.tree import DecisionTreeClassifier
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class CreateKey(beam.DoFn):
def process(self, element, *args, **kwargs):
# 3rd column of the dataset is Education
idx = 3
key = element.pop(idx)
yield (key, element)
def custom_filter(element):
"""Discard data point if contains ?,
doesn't have all features, or
doesn't have Bachelors, Masters or a Doctorate Degree"""
return len(element) == 15 and '?' not in element \
and ' Bachelors' in element or ' Masters' in element \
or ' Doctorate' in element
class PrepareDataforTraining(beam.DoFn):
"""Preprocess data in a format suitable for training."""
def process(self, element, *args, **kwargs):
key, values = element
#Convert to dataframe
df = pd.DataFrame(values)
last_ix = len(df.columns) - 1
X, y = df.drop(last_ix, axis=1), df[last_ix]
# select categorical and numerical features
cat_ix = X.select_dtypes(include=['object', 'bool']).columns
num_ix = X.select_dtypes(include=['int64', 'float64']).columns
# label encode the target variable to have the classes 0 and 1
y = LabelEncoder().fit_transform(y)
yield (X, y, cat_ix, num_ix, key)
class TrainModel(beam.DoFn):
"""Takes preprocessed data as input,
transforms categorical columns using OneHotEncoder,
normalizes numerical columns and then
fits a decision tree classifier.
"""
def process(self, element, *args, **kwargs):
X, y, cat_ix, num_ix, key = element
steps = [('c', OneHotEncoder(handle_unknown='ignore'), cat_ix),
('n', MinMaxScaler(), num_ix)]
# one hot encode categorical, normalize numerical
ct = ColumnTransformer(steps)
# wrap the model in a pipeline
pipeline = Pipeline(steps=[('t', ct), ('m', DecisionTreeClassifier())])
pipeline.fit(X, y)
yield (key, pipeline)
class ModelSink(fileio.FileSink):
def open(self, fh):
self._fh = fh
def write(self, record):
_, trained_model = record
pickled_model = pickle.dumps(trained_model)
self._fh.write(pickled_model)
def flush(self):
self._fh.flush()
def parse_known_args(argv):
"""Parses args for the workflow."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
help='Path to the text file containing sentences.')
parser.add_argument(
'--output-dir',
dest='output',
required=True,
help='Path of directory for saving trained models.')
return parser.parse_known_args(argv)
def run(
argv=None,
save_main_session=True,
):
"""
Args:
argv: Command line arguments defined for this example.
save_main_session: Used for internal testing.
"""
known_args, pipeline_args = parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline | "Read Data" >> beam.io.ReadFromText(known_args.input)
| "Split data to make List" >> beam.Map(lambda x: x.split(','))
| "Filter rows" >> beam.Filter(custom_filter)
| "Create Key" >> beam.ParDo(CreateKey())
| "Group by education" >> beam.GroupByKey()
| "Prepare Data" >> beam.ParDo(PrepareDataforTraining())
| "Train Model" >> beam.ParDo(TrainModel())
|
"Save" >> fileio.WriteToFiles(path=known_args.output, sink=ModelSink()))
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()