Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make training deterministic #6

Open
wants to merge 4 commits into
base: padding-strategy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def test_training_with_tensorboard_checkpoint_gradientclipping(self):
y_train = np.random.randint(4, size=(200, ))
X_test = np.random.random([40, 32, 32])
y_test = np.random.randint(4, size=(40, ))
model.compile(optimizer="adam",
from zoo.pipeline.api.keras.optimizers import Adam, EpochStep
model.compile(optimizer=Adam(lr=0.003, schedule=EpochStep(1, 0.75)),
loss="sparse_categorical_crossentropy",
metrics=['accuracy'])
tmp_log_dir = create_tmp_path()
Expand All @@ -72,7 +73,7 @@ def test_training_with_tensorboard_checkpoint_gradientclipping(self):
model.set_tensorboard(tmp_log_dir, "training_test")
model.set_checkpoint(tmp_checkpoint_path)
model.set_constant_gradient_clipping(0.01, 0.03)
model.fit(X_train, y_train, batch_size=112, nb_epoch=2, validation_data=(X_test, y_test))
model.fit(X_train, y_train, batch_size=32, nb_epoch=20, validation_data=(X_test, y_test))
model.clear_gradient_clipping()
model.fit(X_train, y_train, batch_size=112, nb_epoch=2, validation_data=(X_test, y_test))
model.set_gradient_clipping_by_l2_norm(0.2)
Expand Down
27 changes: 27 additions & 0 deletions pyzoo/test/zoo/tfpark/test_tfpark_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,33 @@ def test_tf_optimizer_with_sparse_gradient_using_keras(self):
optimizer = TFOptimizer.from_keras(model, dataset)
optimizer.optimize()

def test_tf_optimizer_variable_length(self):
from random import randrange
ids = [np.random.randint(0, 10, size=[randrange(20)+1]) for i in range(0, 20)]
labels = [np.array([1]) for i in range(0, 20)]
id_rdd = self.sc.parallelize(ids)
label_rdd = self.sc.parallelize(labels)
training_rdd = id_rdd.zip(label_rdd)
dataset = TFDataset.from_rdd(training_rdd,
features=(tf.int32, [None]),
labels=(tf.int32, [1]),
names=["features", "labels"],
)
# model = tf.keras.models.Sequential()
# model.add(tf.keras.layers.Dense(2, input_shape=(20, ), activation="softmax"))
# model.compile(optimizer="sgd", loss="sparse_categorical_crossentropy")
words_input = tf.keras.layers.Input(shape=(20, ), name='words_input')
embedding_layer = tf.keras.layers.Embedding(input_dim=10,
output_dim=5, name='word_embedding')
word_embeddings = embedding_layer(words_input)
embedding = tf.keras.layers.Flatten()(word_embeddings)
model = tf.keras.models.Model(inputs=[words_input], outputs=[embedding])
model.compile(optimizer="sgd", loss="mse")
optimizer = TFOptimizer.from_keras(model, dataset)
optimizer.optimize()
print("111")


def test_tensorflow_optimizer(self):
data = tf.keras.layers.Input(shape=[10])

Expand Down
57 changes: 57 additions & 0 deletions pyzoo/zoo/pipeline/api/keras/optimizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,60 @@ def __init__(self,
epsilon,
weight_decay)
self.bigdl_type = bigdl_type


from bigdl.optim.optimizer import DistriOptimizer as BDistriOptimizer, SGD


class DistriOptimizer(BDistriOptimizer):
def __init__(self,
model,
training_rdd,
criterion,
end_trigger=None,
batch_size=32,
optim_method=None,
bigdl_type="float"):
"""
Create an optimizer.


:param model: the neural net model
:param training_data: the training dataset
:param criterion: the loss function
:param optim_method: the algorithm to use for optimization,
e.g. SGD, Adagrad, etc. If optim_method is None, the default algorithm is SGD.
:param end_trigger: when to end the optimization
:param batch_size: training batch size
"""
if not optim_method:
optim_methods = {model.name(): SGD()}
elif isinstance(optim_method, OptimMethod):
optim_methods = {model.name(): optim_method}
elif isinstance(optim_method, JavaObject):
optim_methods = {model.name(): OptimMethod(optim_method, bigdl_type)}
else:
optim_methods = optim_method
if isinstance(training_rdd, RDD):
self.bigdl_type = bigdl_type
self.value = callBigDlFunc(self.bigdl_type, "createDistriOptimizerFromRDD",
model.value, training_rdd, criterion,
optim_methods, end_trigger, batch_size)

def set_validation(self, batch_size, val_rdd, trigger, val_method=None):
"""
Configure validation settings.


:param batch_size: validation batch size
:param val_rdd: validation dataset
:param trigger: validation interval
:param val_method: the ValidationMethod to use,e.g. "Top1Accuracy", "Top5Accuracy", "Loss"
"""
callBigDlFunc(self.bigdl_type, "setValidationWithPaddingStrategy", self.value, batch_size,
trigger, val_rdd, to_list(val_method))


class EpochStep(JavaValue):
def __init__(self, step_size, gamma, bigdl_type="float"):
JavaValue.__init__(self, None, bigdl_type, step_size, float(gamma))
7 changes: 4 additions & 3 deletions pyzoo/zoo/pipeline/api/net/tf_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
from bigdl.nn.criterion import Criterion
from bigdl.nn.layer import Layer
from bigdl.util.common import to_list, JavaValue
from bigdl.optim.optimizer import EveryEpoch, MaxEpoch, Optimizer
from bigdl.optim.optimizer import EveryEpoch, MaxEpoch
from zoo.pipeline.api.keras.engine.topology import to_bigdl_metric
from zoo.pipeline.api.keras.optimizers import DistriOptimizer
from zoo.pipeline.api.net.utils import _find_placeholders, _check_the_same
from zoo.util import nest

Expand Down Expand Up @@ -194,7 +195,7 @@ def to_floats(vs):
raise ValueError("Validation data is not specified. Please set " +
"val rdd in TFDataset, or set val_split larger than zero")

self.optimizer = Optimizer.create(self.training_helper_layer,
self.optimizer = DistriOptimizer(self.training_helper_layer,
training_rdd,
IdentityCriterion(),
batch_size=batch_size,
Expand All @@ -205,7 +206,7 @@ def to_floats(vs):
val_method)
else:
training_rdd = sample_rdd
self.optimizer = Optimizer.create(self.training_helper_layer,
self.optimizer = DistriOptimizer(self.training_helper_layer,
training_rdd,
IdentityCriterion(),
batch_size=batch_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package com.intel.analytics.zoo.pipeline.api.keras.python

import java.util.concurrent.atomic.AtomicInteger
import java.util.{List => JList, Map => JMap}

import com.intel.analytics.bigdl.{Criterion, Module}
import com.intel.analytics.bigdl.dataset.{DataSet, LocalDataSet, MiniBatch}
import com.intel.analytics.bigdl.{Criterion, DataSet, Module}
import com.intel.analytics.bigdl.dataset.{Identity => DIdentity, Sample => JSample, _}

import scala.collection.JavaConverters._
import com.intel.analytics.bigdl.optim.{_}
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.python.api.{EvaluatedResult, JTensor, Sample}
import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric
Expand All @@ -31,12 +32,12 @@ import com.intel.analytics.bigdl.nn.Container
import com.intel.analytics.bigdl.nn.abstractnn.{AbstractModule, Activity}
import com.intel.analytics.bigdl.nn.keras.{KerasLayer, KerasModel}
import com.intel.analytics.bigdl.nn.{BatchNormalization => BNBatchNormalization}
import com.intel.analytics.bigdl.utils.{Shape, Table}
import com.intel.analytics.bigdl.utils.{RandomGenerator, Shape, Table}
import com.intel.analytics.zoo.feature.image.ImageSet
import com.intel.analytics.zoo.pipeline.api.autograd.{Constant, _}
import com.intel.analytics.zoo.pipeline.api.keras.layers.{KerasLayerWrapper, _}
import com.intel.analytics.zoo.pipeline.api.keras.layers.utils.KerasUtils
import com.intel.analytics.zoo.pipeline.api.keras.models.{KerasNet, Model, Sequential}
import com.intel.analytics.zoo.pipeline.api.keras.models.{InternalDistriOptimizer, KerasNet, Model, Sequential}
import com.intel.analytics.zoo.pipeline.api.keras.objectives._
import com.intel.analytics.zoo.pipeline.api.keras.optimizers.{Adam, AdamWeightDecay}
import org.apache.spark.api.java.JavaRDD
Expand All @@ -47,6 +48,7 @@ import com.intel.analytics.zoo.models.seq2seq.{Bridge, RNNDecoder, RNNEncoder}
import com.intel.analytics.zoo.pipeline.api.Net
import com.intel.analytics.zoo.pipeline.api.keras.{metrics => zmetrics}
import com.intel.analytics.zoo.pipeline.api.net.GraphNet
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
Expand Down Expand Up @@ -1355,4 +1357,173 @@ class PythonZooKeras[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ
new AdamWeightDecay[T](learningRate, warmupPortion, total, schedule, beta1, beta2,
epsilon, weightDecay)
}

def batchingWithPaddingStrategy(dataset: DataSet[JSample[T]], batchSize: Int)
: DataSet[MiniBatch[T]] = {
println("Using Feature Padding Strategy")
val paddingTensor = Tensor[T](1).fill(ev.fromType(-1.0))
val featurePaddingParam = PaddingParam[T](Some(Array.fill[Tensor[T]](13)(paddingTensor)))
dataset.transform(SampleToMiniBatch(
batchSize = batchSize, featurePaddingParam = Some(featurePaddingParam)))
}

def createDistriOptimizerFromRDD(model: AbstractModule[Activity, Activity, T],
trainingRdd: JavaRDD[Sample],
criterion: Criterion[T],
optimMethod: JMap[String, OptimMethod[T]],
endTrigger: Trigger,
batchSize: Int): Optimizer[T, MiniBatch[T]] = {
val sampleRDD = toJSample(trainingRdd)

val Array(nodeNumber, coreNumber) = this.getNodeAndCoreNumber()
val dataset = new CachedDistriDataSet[JSample[T]](
sampleRDD.coalesce(nodeNumber, true)
.mapPartitions(iter => {
Iterator.single(iter.toArray)
}).setName("cached dataset")
.cache()
)
val optimizer = new InternalDistriOptimizer(
_model = model,
_dataset = batchingWithPaddingStrategy(dataset, batchSize)
.asInstanceOf[DistributedDataSet[MiniBatch[T]]],
_criterion = criterion
).asInstanceOf[Optimizer[T, MiniBatch[T]]]
enrichOptimizer(optimizer, endTrigger, optimMethod.asScala.toMap)
}

private def enrichOptimizer[T](
optimizer: Optimizer[T, MiniBatch[T]],
endTrigger: Trigger,
optimMethod: Map[String, OptimMethod[T]]): Optimizer[T, MiniBatch[T]] = {
optimizer.setEndWhen(endTrigger)

optimizer.setOptimMethods(optimMethod)

// TODO: remove this
optimizer.disableCheckSingleton()

optimizer
}

def setValidationWithPaddingStrategy(optimizer: Optimizer[T, MiniBatch[T]],
batchSize: Int,
trigger: Trigger,
valRdd: JavaRDD[Sample],
vMethods: JList[ValidationMethod[T]]): Unit = {
val sampleRDD = toJSample(valRdd)
optimizer.setValidation(trigger, batchingWithPaddingStrategy(DataSet.rdd(sampleRDD), batchSize),
vMethods.asScala.toArray)
}

def createEpochStep(stepSize: Int, gamma: Double): SGD.EpochStep = {
SGD.EpochStep(stepSize, gamma)
}
}

class CachedDistriDataSet[T: ClassTag]
(buffer: RDD[Array[T]], isInOrder: Boolean = false, groupSize: Int = 1)
extends DistributedDataSet[T] {

protected lazy val count: Long = buffer.mapPartitions(iter => {
require(iter.hasNext)
val array = iter.next()
require(!iter.hasNext)
Iterator.single(array.length)
}).reduce(_ + _)

protected var indexes: RDD[Array[Int]] = buffer.mapPartitions(iter => {
Iterator.single((0 until iter.next().length).toArray)
}).setName("original index").cache()

override def data(train: Boolean): RDD[T] = {
val _train = train
val _groupSize = if (isInOrder) Utils.getBatchSize(groupSize) else 1
buffer.zipPartitions(indexes)((dataIter, indexIter) => {
val indexes = indexIter.next()
val indexOffset = math.max(1, indexes.length - (_groupSize - 1))
val localData = dataIter.next()
val offset = if (_train) {
RandomGenerator2.RNG.uniform(0, indexOffset).toInt
} else {
0
}
new Iterator[T] {
private val _offset = new AtomicInteger(offset)

override def hasNext: Boolean = {
if (_train) true else _offset.get() < localData.length
}

override def next(): T = {
val i = _offset.getAndIncrement()
if (_train) {
localData(indexes(i % localData.length))
} else {
if (i < localData.length) {
localData(indexes(i))
} else {
null.asInstanceOf[T]
}
}
}
}
})
}

override def size(): Long = count

override def shuffle(): Unit = {
if (!isInOrder) {
indexes.unpersist()
indexes = buffer.mapPartitions(iter => {
Iterator.single(RandomGenerator2.shuffle((0 until iter.next().length).toArray))
}).setName("shuffled index").cache()
}
}

override def originRDD(): RDD[_] = buffer

override def cache(): Unit = {
buffer.count()
indexes.count()
isCached = true
}

override def unpersist(): Unit = {
buffer.unpersist()
indexes.unpersist()
isCached = false
}
}


object RandomGenerator2 {

var randomSeed = 1
val generators = new ThreadLocal[RandomGenerator]()

// scalastyle:off methodName
def RNG: RandomGenerator = {
if (generators.get() == null) {
val rg = RandomGenerator.RNG.clone()
rg.setSeed(randomSeed)
generators.set(rg)
}
generators.get()
}
// scalastyle:on methodName

def shuffle[T](data: Array[T]): Array[T] = {
var i = 0
val length = data.length
while (i < length) {
val exchange = RNG.uniform(0, length - i).toInt + i
val tmp = data(exchange)
data(exchange) = data(i)
data(i) = tmp
i += 1
}
data
}
}