Skip to content

Commit

Permalink
[Modelzoo] Enable dump saved_model and incremental_checkpoint. (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanshanpt committed Mar 23, 2022
1 parent 2fe2ce9 commit e3f51a3
Showing 1 changed file with 177 additions and 109 deletions.
286 changes: 177 additions & 109 deletions modelzoo/features/EmbeddingVariable/WDL/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import collections
from tensorflow.python.client import timeline
import json

from tensorflow.python.ops import partitioned_variables
from tensorflow.python.training import incremental_saver as tf_incr_saver

# Set to INFO for tracking training, default is WARN. ERROR for least messages
tf.logging.set_verbosity(tf.logging.INFO)
Expand Down Expand Up @@ -188,35 +188,57 @@ def __init__(self,
inputs=None,
bf16=False,
input_layer_partitioner=None,
dense_layer_partitioner=None):
if not inputs:
raise ValueError('Dataset is not defined.')
self.wide_column = wide_column
self.deep_column = deep_column
if not wide_column or not deep_column:
raise ValueError('Wide column or Deep column is not defined.')
self.dnn_hidden_units = dnn_hidden_units
self.linear_learning_rate = linear_learning_rate
self.deep_learning_rate = deep_learning_rate
self.input_layer_partitioner = input_layer_partitioner
self.dense_layer_partitioner = dense_layer_partitioner

self.feature = inputs[0]
self.label = inputs[1]
self.bf16 = bf16

self._is_training = True

self.predict = self.prediction()
with tf.name_scope('head'):
self.train_op, self.loss = self.optimizer()
self.acc, self.acc_op = tf.metrics.accuracy(
labels=self.label, predictions=self.predict)
self.auc, self.auc_op = tf.metrics.auc(labels=self.label,
predictions=self.predict,
num_thresholds=1000)
tf.summary.scalar('eval_acc', self.acc)
tf.summary.scalar('eval_auc', self.auc)
dense_layer_partitioner=None,
saved_model=False):
if saved_model:
if not inputs:
raise ValueError('Dataset is not defined.')
self.wide_column = wide_column
self.deep_column = deep_column
if not wide_column or not deep_column:
raise ValueError('Wide column or Deep column is not defined.')
self.dnn_hidden_units = dnn_hidden_units
self.linear_learning_rate = linear_learning_rate
self.deep_learning_rate = deep_learning_rate
self.input_layer_partitioner = input_layer_partitioner
self.dense_layer_partitioner = dense_layer_partitioner
self.global_step = tf.train.get_or_create_global_step()

self.feature = inputs
self.bf16 = bf16

self._is_training = False

self.predict = self.prediction()
else:
if not inputs:
raise ValueError('Dataset is not defined.')
self.wide_column = wide_column
self.deep_column = deep_column
if not wide_column or not deep_column:
raise ValueError('Wide column or Deep column is not defined.')
self.dnn_hidden_units = dnn_hidden_units
self.linear_learning_rate = linear_learning_rate
self.deep_learning_rate = deep_learning_rate
self.input_layer_partitioner = input_layer_partitioner
self.dense_layer_partitioner = dense_layer_partitioner

self.feature = inputs[0]
self.label = inputs[1]
self.bf16 = bf16

self._is_training = True

self.predict = self.prediction()
with tf.name_scope('head'):
self.train_op, self.loss = self.optimizer()
self.acc, self.acc_op = tf.metrics.accuracy(
labels=self.label, predictions=self.predict)
self.auc, self.auc_op = tf.metrics.auc(labels=self.label,
predictions=self.predict,
num_thresholds=1000)
tf.summary.scalar('eval_acc', self.acc)
tf.summary.scalar('eval_auc', self.auc)

def dnn(self, dnn_input, dnn_hidden_units=None, layer_name=''):
for layer_id, num_hidden_units in enumerate(dnn_hidden_units):
Expand Down Expand Up @@ -371,6 +393,10 @@ def get_arg_parser():
help='set the number of steps on saving checkpoints',
type=int,
default=0)
parser.add_argument('--incr_save_steps',
help='set the number of steps on saving incr checkpoints',
type=int,
default=0)
parser.add_argument('--keep_checkpoint_max',
help='Maximum number of recent checkpoint to keep',
type=int,
Expand Down Expand Up @@ -405,6 +431,10 @@ def get_arg_parser():
help='slice size of dense layer partitioner. units KB',
type=int,
default=0)
parser.add_argument('--saved_model_path',
type=str,
default="")

return parser


Expand Down Expand Up @@ -475,15 +505,32 @@ def main(tf_config=None, server=None):
min_slice_size=args.dense_layer_partitioner <<
10) if args.dense_layer_partitioner else None

is_saved_model=False
real_input=next_element
if args.saved_model_path:
is_saved_model=True
inputs = {}
# I1-I13
for x in range(1, 10):
inputs['I'+str(x)] = tf.placeholder(tf.float32, [None], name='I'+str(x))
inputs['I10'] = tf.placeholder(tf.int64, [None], name='I10')
for x in range(11, 14):
inputs['I'+str(x)] = tf.placeholder(tf.float32, [None], name='I'+str(x))
# C1-C26
for x in range(1, 27):
inputs['C'+str(x)] = tf.placeholder(tf.string, [None], name='C'+str(x))
real_input=inputs

# create model
model = WDL(wide_column=wide_column,
deep_column=deep_column,
linear_learning_rate=args.linear_learning_rate,
deep_learning_rate=args.deep_learning_rate,
bf16=args.bf16,
inputs=next_element,
inputs=real_input,
input_layer_partitioner=input_layer_partitioner,
dense_layer_partitioner=dense_layer_partitioner)
dense_layer_partitioner=dense_layer_partitioner,
saved_model=is_saved_model)

sess_config = tf.ConfigProto()
if args.inter:
Expand Down Expand Up @@ -518,85 +565,106 @@ def main(tf_config=None, server=None):
while not sess.should_stop():
_, train_loss = sess.run([model.train_op, model.loss])
else:
with tf.Session(config=sess_config) as sess:
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
merged = tf.summary.merge_all()
writer = tf.summary.FileWriter(checkpoint_dir, sess.graph)
saver = tf.train.Saver(tf.global_variables(),
max_to_keep=args.keep_checkpoint_max)
options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()

# train model
sess.run(train_init_op)
model._is_training = True

start = time.perf_counter()
for _in in range(0, train_steps):
if args.save_steps > 0 and (_in % args.save_steps == 0
or _in == train_steps - 1):
_, train_loss, events = sess.run(
[model.train_op, model.loss, merged])
writer.add_summary(events, _in)
checkpoint_path = saver.save(
sess,
save_path=os.path.join(checkpoint_dir,
'WIDE_AND_DEEP-checkpoint'),
global_step=_in)
print("Save checkpoint to %s" % checkpoint_path)
elif (args.timeline > 0 and _in % args.timeline == 0):
_, train_loss = sess.run([model.train_op, model.loss],
options=options,
run_metadata=run_metadata)
fetched_timeline = timeline.Timeline(
run_metadata.step_stats)
chrome_trace = fetched_timeline.generate_chrome_trace_format(
)
print("Save timeline to %s" % checkpoint_dir)
with open(
os.path.join(checkpoint_dir,
'timeline-%d.json' % _in), 'w') as f:
f.write(chrome_trace)
else:
_, train_loss = sess.run([model.train_op, model.loss])

# print training loss and time cost
if (_in % 100 == 0 or _in == train_steps - 1):
end = time.perf_counter()
cost_time = end - start
global_step_sec = (100 if _in % 100 == 0 else train_steps -
1 % 100) / cost_time
print("global_step/sec: %0.4f" % global_step_sec)
print("loss = {}, steps = {}, cost time = {:0.2f}s".format(
train_loss, _in, cost_time))
start = time.perf_counter()

# eval model
if not args.no_eval:
writer = tf.summary.FileWriter(
os.path.join(checkpoint_dir, 'eval'))

sess.run(test_init_op)
if is_saved_model:
with tf.Session(config=sess_config) as sess:
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
model._is_training = False

for _in in range(1, test_steps + 1):
if (_in != test_steps):
sess.run(
[model.acc, model.acc_op, model.auc, model.auc_op])
if (_in % 1000 == 0):
export_dir = args.saved_model_path #'./savedmodel'
tf.saved_model.simple_save(
sess,
args.saved_model_path,
inputs=inputs,
outputs={"predict": model.predict})
else:
with tf.Session(config=sess_config) as sess:
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
merged = tf.summary.merge_all()
writer = tf.summary.FileWriter(checkpoint_dir, sess.graph)
saver = tf.train.Saver(tf.global_variables(),
max_to_keep=args.keep_checkpoint_max,
incremental_save_restore=True)
incr_saver = tf_incr_saver._get_incremental_saver(True, saver)
options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()

# train model
sess.run(train_init_op)
model._is_training = True

start = time.perf_counter()
for _in in range(0, train_steps):
if args.save_steps > 0 and (_in % args.save_steps == 0
or _in == train_steps - 1):
_, train_loss, events = sess.run(
[model.train_op, model.loss, merged])
writer.add_summary(events, _in)
checkpoint_path = saver.save(
sess,
save_path=os.path.join(checkpoint_dir,
'WIDE_AND_DEEP-checkpoint'),
global_step=_in)
print("Save checkpoint to %s" % checkpoint_path)
elif args.incr_save_steps > 0 and _in % args.incr_save_steps == 0:
_, train_loss = sess.run(
[model.train_op, model.loss])
incr_checkpoint_path = incr_saver.incremental_save(
sess,
os.path.join(checkpoint_dir, '.incremental_checkpoint/incr-WIDE_AND_DEEP-checkpoint'),
global_step=_in)
print("Save incremental checkpoint to %s" % incr_checkpoint_path)
elif (args.timeline > 0 and _in % args.timeline == 0):
_, train_loss = sess.run([model.train_op, model.loss],
options=options,
run_metadata=run_metadata)
fetched_timeline = timeline.Timeline(
run_metadata.step_stats)
chrome_trace = fetched_timeline.generate_chrome_trace_format(
)
print("Save timeline to %s" % checkpoint_dir)
with open(
os.path.join(checkpoint_dir,
'timeline-%d.json' % _in), 'w') as f:
f.write(chrome_trace)
else:
_, train_loss = sess.run([model.train_op, model.loss])

# print training loss and time cost
if (_in % 100 == 0 or _in == train_steps - 1):
end = time.perf_counter()
cost_time = end - start
global_step_sec = (100 if _in % 100 == 0 else train_steps -
1 % 100) / cost_time
print("global_step/sec: %0.4f" % global_step_sec)
print("loss = {}, steps = {}, cost time = {:0.2f}s".format(
train_loss, _in, cost_time))
start = time.perf_counter()

# eval model
if not args.no_eval:
writer = tf.summary.FileWriter(
os.path.join(checkpoint_dir, 'eval'))

sess.run(test_init_op)
sess.run(tf.local_variables_initializer())
model._is_training = False

for _in in range(1, test_steps + 1):
if (_in != test_steps):
sess.run(
[model.acc, model.acc_op, model.auc, model.auc_op])
if (_in % 1000 == 0):
print("Evaluation complate:[{}/{}]".format(
_in, test_steps))
else:
_, eval_acc, _, eval_auc, events = sess.run([
model.acc, model.acc_op, model.auc, model.auc_op,
merged
])
writer.add_summary(events, _in)
print("Evaluation complate:[{}/{}]".format(
_in, test_steps))
else:
_, eval_acc, _, eval_auc, events = sess.run([
model.acc, model.acc_op, model.auc, model.auc_op,
merged
])
writer.add_summary(events, _in)
print("Evaluation complate:[{}/{}]".format(
_in, test_steps))
print("ACC = {}\nAUC = {}".format(eval_acc, eval_auc))
print("ACC = {}\nAUC = {}".format(eval_acc, eval_auc))


if __name__ == "__main__":
Expand Down Expand Up @@ -660,4 +728,4 @@ def main(tf_config=None, server=None):
server=server)
else:
print("Task type or index error.")
sys.exit()
sys.exit()

0 comments on commit e3f51a3

Please sign in to comment.