Navigation Menu

Skip to content

Commit

Permalink
Add num_epochs to training (#1)
Browse files Browse the repository at this point in the history
* add train_epochs to training, remove from eval

* use ratios in step conversion, add examples with epoch
  • Loading branch information
1vn committed Jan 28, 2019
1 parent 3e60a0a commit 8d03966
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 28 deletions.
5 changes: 3 additions & 2 deletions docs/applications/resources/models.md
Expand Up @@ -23,6 +23,7 @@ Train custom TensorFlow models at scale.
training:
batch_size: <int> # training batch size (default: 40)
num_steps: <int> # number of training steps (default: 1000)
num_epochs: <int> # number of epochs to train the model over the entire dataset (optional)
shuffle: <boolean> # whether to shuffle the training data (default: true)
tf_random_seed: <int> # random seed for TensorFlow initializers (default: <random>)
save_summary_steps: <int> # save summaries every this many steps (default: 100)
Expand All @@ -34,8 +35,8 @@ Train custom TensorFlow models at scale.

evaluation:
batch_size: <int> # evaluation batch size (default: 40)
num_steps: <int> # number of eval steps (optional)
num_epochs: <int> # number of eval epochs (default: 1)
num_steps: <int> # number of eval steps (default: 100)
num_epochs: <int> # number of epochs to evaluate the model over the entire dataset (optional)
shuffle: <bool> # whether to shuffle the evaluation data (default: false)
start_delay_secs: <int> # start evaluating after waiting for this many seconds (default: 120)
throttle_secs: <int> # do not re-evaluate unless the last evaluation was started at least this many seconds ago (default: 600)
Expand Down
4 changes: 2 additions & 2 deletions examples/mnist/resources/models.yaml
Expand Up @@ -16,7 +16,7 @@
evaluation: 0.3
training:
batch_size: 64
num_steps: 5000
num_epochs: 5

- kind: model
name: conv
Expand All @@ -37,7 +37,7 @@
evaluation: 0.3
training:
batch_size: 64
num_steps: 5000
num_epochs: 5

- kind: model
name: dnn
Expand Down
9 changes: 5 additions & 4 deletions pkg/api/context/models.go
Expand Up @@ -37,10 +37,11 @@ type Model struct {

type TrainingDataset struct {
*ComputedResourceFields
Name string `json:"name"`
ModelName string `json:"model_name"`
TrainKey string `json:"train_key"`
EvalKey string `json:"eval_key"`
Name string `json:"name"`
ModelName string `json:"model_name"`
TrainKey string `json:"train_key"`
EvalKey string `json:"eval_key"`
MetadataKey string `json:"metadata_key"`
}

func (trainingDataset *TrainingDataset) GetName() string {
Expand Down
20 changes: 16 additions & 4 deletions pkg/api/userconfig/models.go
Expand Up @@ -157,7 +157,8 @@ var modelDataPartitionRatioValidation = &cr.StructValidation{

type ModelTraining struct {
BatchSize int64 `json:"batch_size" yaml:"batch_size"`
NumSteps int64 `json:"num_steps" yaml:"num_steps"`
NumSteps *int64 `json:"num_steps" yaml:"num_steps"`
NumEpochs *int64 `json:"num_epochs" yaml:"num_epochs"`
Shuffle bool `json:"shuffle" yaml:"shuffle"`
TfRandomSeed int64 `json:"tf_random_seed" yaml:"tf_random_seed"`
TfRandomizeSeed bool `json:"tf_randomize_seed" yaml:"tf_randomize_seed"`
Expand All @@ -180,9 +181,14 @@ var modelTrainingValidation = &cr.StructValidation{
},
&cr.StructFieldValidation{
StructField: "NumSteps",
Int64Validation: &cr.Int64Validation{
Int64PtrValidation: &cr.Int64PtrValidation{
GreaterThan: util.Int64Ptr(0),
},
},
&cr.StructFieldValidation{
StructField: "NumEpochs",
Int64PtrValidation: &cr.Int64PtrValidation{
GreaterThan: util.Int64Ptr(0),
Default: 1000,
},
},
&cr.StructFieldValidation{
Expand Down Expand Up @@ -350,8 +356,14 @@ func (model *Model) Validate() error {
return errors.Wrap(ErrorSpecifyOnlyOne(SaveCheckpointSecsKey, SaveCheckpointStepsKey), Identify(model), TrainingKey)
}

if model.Training.NumSteps == nil && model.Training.NumEpochs == nil {
model.Training.NumSteps = util.Int64Ptr(1000)
} else if model.Training.NumSteps != nil && model.Training.NumEpochs != nil {
return errors.Wrap(ErrorSpecifyOnlyOne(NumEpochsKey, NumStepsKey), Identify(model), TrainingKey)
}

if model.Evaluation.NumSteps == nil && model.Evaluation.NumEpochs == nil {
model.Evaluation.NumEpochs = util.Int64Ptr(1)
model.Evaluation.NumSteps = util.Int64Ptr(100)
} else if model.Evaluation.NumSteps != nil && model.Evaluation.NumEpochs != nil {
return errors.Wrap(ErrorSpecifyOnlyOne(NumEpochsKey, NumStepsKey), Identify(model), EvaluationKey)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/operator/context/models.go
Expand Up @@ -107,10 +107,11 @@ func getModels(
ResourceType: resource.TrainingDatasetType,
},
},
Name: trainingDatasetName,
ModelName: modelConfig.Name,
TrainKey: filepath.Join(datasetRoot, "train.tfrecord"),
EvalKey: filepath.Join(datasetRoot, "eval.tfrecord"),
Name: trainingDatasetName,
ModelName: modelConfig.Name,
TrainKey: filepath.Join(datasetRoot, "train.tfrecord"),
EvalKey: filepath.Join(datasetRoot, "eval.tfrecord"),
MetadataKey: filepath.Join(datasetRoot, "metadata.json"),
},
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/workloads/spark_job/spark_util.py
Expand Up @@ -69,6 +69,9 @@ def write_training_data(model_name, df, ctx):

df = df.select(*feature_names)

metadata = {"dataset_size": df.count()}
aws.upload_json_to_s3(metadata, training_dataset["metadata_key"], ctx.bucket)

train_ratio = model["data_partition_ratio"]["training"]
eval_ratio = model["data_partition_ratio"]["evaluation"]
[train_df, eval_df] = df.randomSplit([train_ratio, eval_ratio])
Expand Down
39 changes: 27 additions & 12 deletions pkg/workloads/tf_train/train_util.py
Expand Up @@ -17,6 +17,7 @@
import inspect
import importlib
import multiprocessing
import math
import tensorflow as tf

from lib import util, tf_lib, aws
Expand Down Expand Up @@ -67,16 +68,7 @@ def _input_fn():

dataset = dataset.batch(model[mode]["batch_size"])
dataset = dataset.prefetch(buffer_size)

if mode == "training":
dataset = dataset.repeat(None) # Repeat forever
elif mode == "evaluation":
dataset = dataset.repeat(model["evaluation"]["num_epochs"]) # Repeats forever if None
else:
raise CortexException(
"model " + model_name, "expected training or evaluation but found " + mode
)

dataset = dataset.repeat()
iterator = dataset.make_one_shot_iterator()
features, target = iterator.get_next()

Expand Down Expand Up @@ -143,11 +135,34 @@ def train(model_name, model_impl, ctx, model_dir):
serving_input_fn = generate_json_serving_input_fn(model_name, ctx)
exporter = tf.estimator.FinalExporter("estimator", serving_input_fn, as_text=False)

train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=model["training"]["num_steps"])
dataset_metadata = aws.read_json_from_s3(model["dataset"]["metadata_key"], ctx.bucket)
train_num_steps = model["training"]["num_steps"]
if model["training"]["num_epochs"]:
train_num_steps = (
math.ceil(
dataset_metadata["dataset_size"]
* model["data_partition_ratio"]["training"]
/ float(model["training"]["batch_size"])
)
* model["training"]["num_epochs"]
)

train_spec = tf.estimator.TrainSpec(train_input_fn, max_steps=train_num_steps)

eval_num_steps = model["evaluation"]["num_steps"]
if model["evaluation"]["num_epochs"]:
eval_num_steps = (
math.ceil(
dataset_metadata["dataset_size"]
* model["data_partition_ratio"]["evaluation"]
/ float(model["evaluation"]["batch_size"])
)
* model["evaluation"]["num_epochs"]
)

eval_spec = tf.estimator.EvalSpec(
eval_input_fn,
steps=model["evaluation"]["num_steps"],
steps=eval_num_steps,
exporters=[exporter],
name="estimator-eval",
start_delay_secs=model["evaluation"]["start_delay_secs"],
Expand Down

0 comments on commit 8d03966

Please sign in to comment.