From a5ef3329f224d14eb6fda434d21d1c91bfdd38c1 Mon Sep 17 00:00:00 2001 From: hkvision Date: Mon, 8 Jul 2019 20:14:12 +0800 Subject: [PATCH 1/4] scala --- .../api/keras/python/PythonZooKeras.scala | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala index 956836e0293..49f4ee1ba54 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala @@ -18,11 +18,11 @@ package com.intel.analytics.zoo.pipeline.api.keras.python import java.util.{List => JList, Map => JMap} -import com.intel.analytics.bigdl.{Criterion, Module} -import com.intel.analytics.bigdl.dataset.{DataSet, LocalDataSet, MiniBatch} +import com.intel.analytics.bigdl.{Criterion, DataSet, Module} +import com.intel.analytics.bigdl.dataset.{Sample => JSample, Identity => DIdentity, _} import scala.collection.JavaConverters._ -import com.intel.analytics.bigdl.optim.{_} +import com.intel.analytics.bigdl.optim._ import com.intel.analytics.bigdl.python.api.{EvaluatedResult, JTensor, Sample} import com.intel.analytics.bigdl.tensor.Tensor import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric @@ -1355,4 +1355,43 @@ class PythonZooKeras[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ new AdamWeightDecay[T](learningRate, warmupPortion, total, schedule, beta1, beta2, epsilon, weightDecay) } + + def batchingWithPaddingStrategy(dataset: DataSet[JSample[T]], batchSize: Int) + : DataSet[MiniBatch[T]] = { + println("Using Feature Padding Strategy") + val featurePaddingParam = PaddingParam[T](Some(Array(Tensor[T](1).fill(ev.fromType(-1.0))))) + dataset.transform(SampleToMiniBatch( + batchSize = batchSize, featurePaddingParam = Some(featurePaddingParam))) + } + + override def createDistriOptimizerFromRDD(model: AbstractModule[Activity, Activity, T], + trainingRdd: JavaRDD[Sample], + criterion: Criterion[T], + optimMethod: JMap[String, OptimMethod[T]], + endTrigger: Trigger, + batchSize: Int): Optimizer[T, MiniBatch[T]] = { + val sampleRDD = toJSample(trainingRdd) + + val optimizer = new DistriOptimizer( + _model = model, + _dataset = batchingWithPaddingStrategy(DataSet.rdd(sampleRDD), batchSize) + .asInstanceOf[DistributedDataSet[MiniBatch[T]]], + _criterion = criterion + ).asInstanceOf[Optimizer[T, MiniBatch[T]]] + enrichOptimizer(optimizer, endTrigger, optimMethod.asScala.toMap) + } + + private def enrichOptimizer[T]( + optimizer: Optimizer[T, MiniBatch[T]], + endTrigger: Trigger, + optimMethod: Map[String, OptimMethod[T]]): Optimizer[T, MiniBatch[T]] = { + optimizer.setEndWhen(endTrigger) + + optimizer.setOptimMethods(optimMethod) + + // TODO: remove this + optimizer.disableCheckSingleton() + + optimizer + } } From dd0bf660c47c16bcc66150304f4ff3c4b5a81827 Mon Sep 17 00:00:00 2001 From: hkvision Date: Thu, 11 Jul 2019 11:05:59 +0800 Subject: [PATCH 2/4] python --- pyzoo/test/zoo/tfpark/test_tfpark_model.py | 27 ++++++++++ pyzoo/zoo/pipeline/api/keras/optimizers.py | 52 +++++++++++++++++++ pyzoo/zoo/pipeline/api/net/tf_optimizer.py | 7 +-- .../api/keras/python/PythonZooKeras.scala | 21 ++++++-- 4 files changed, 99 insertions(+), 8 deletions(-) diff --git a/pyzoo/test/zoo/tfpark/test_tfpark_model.py b/pyzoo/test/zoo/tfpark/test_tfpark_model.py index abd0a198ad2..13b50a4a3c0 100644 --- a/pyzoo/test/zoo/tfpark/test_tfpark_model.py +++ b/pyzoo/test/zoo/tfpark/test_tfpark_model.py @@ -316,6 +316,33 @@ def test_tf_optimizer_with_sparse_gradient_using_keras(self): optimizer = TFOptimizer.from_keras(model, dataset) optimizer.optimize() + def test_tf_optimizer_variable_length(self): + from random import randrange + ids = [np.random.randint(0, 10, size=[randrange(20)+1]) for i in range(0, 20)] + labels = [np.array([1]) for i in range(0, 20)] + id_rdd = self.sc.parallelize(ids) + label_rdd = self.sc.parallelize(labels) + training_rdd = id_rdd.zip(label_rdd) + dataset = TFDataset.from_rdd(training_rdd, + features=(tf.int32, [None]), + labels=(tf.int32, [1]), + names=["features", "labels"], + ) + # model = tf.keras.models.Sequential() + # model.add(tf.keras.layers.Dense(2, input_shape=(20, ), activation="softmax")) + # model.compile(optimizer="sgd", loss="sparse_categorical_crossentropy") + words_input = tf.keras.layers.Input(shape=(20, ), name='words_input') + embedding_layer = tf.keras.layers.Embedding(input_dim=10, + output_dim=5, name='word_embedding') + word_embeddings = embedding_layer(words_input) + embedding = tf.keras.layers.Flatten()(word_embeddings) + model = tf.keras.models.Model(inputs=[words_input], outputs=[embedding]) + model.compile(optimizer="sgd", loss="mse") + optimizer = TFOptimizer.from_keras(model, dataset) + optimizer.optimize() + print("111") + + def test_tensorflow_optimizer(self): data = tf.keras.layers.Input(shape=[10]) diff --git a/pyzoo/zoo/pipeline/api/keras/optimizers.py b/pyzoo/zoo/pipeline/api/keras/optimizers.py index af910c505cd..a3463c68d16 100644 --- a/pyzoo/zoo/pipeline/api/keras/optimizers.py +++ b/pyzoo/zoo/pipeline/api/keras/optimizers.py @@ -105,3 +105,55 @@ def __init__(self, epsilon, weight_decay) self.bigdl_type = bigdl_type + + +from bigdl.optim.optimizer import DistriOptimizer as BDistriOptimizer, SGD + + +class DistriOptimizer(BDistriOptimizer): + def __init__(self, + model, + training_rdd, + criterion, + end_trigger=None, + batch_size=32, + optim_method=None, + bigdl_type="float"): + """ + Create an optimizer. + + + :param model: the neural net model + :param training_data: the training dataset + :param criterion: the loss function + :param optim_method: the algorithm to use for optimization, + e.g. SGD, Adagrad, etc. If optim_method is None, the default algorithm is SGD. + :param end_trigger: when to end the optimization + :param batch_size: training batch size + """ + if not optim_method: + optim_methods = {model.name(): SGD()} + elif isinstance(optim_method, OptimMethod): + optim_methods = {model.name(): optim_method} + elif isinstance(optim_method, JavaObject): + optim_methods = {model.name(): OptimMethod(optim_method, bigdl_type)} + else: + optim_methods = optim_method + if isinstance(training_rdd, RDD): + self.bigdl_type = bigdl_type + self.value = callBigDlFunc(self.bigdl_type, "createDistriOptimizerFromRDD", + model.value, training_rdd, criterion, + optim_methods, end_trigger, batch_size) + + def set_validation(self, batch_size, val_rdd, trigger, val_method=None): + """ + Configure validation settings. + + + :param batch_size: validation batch size + :param val_rdd: validation dataset + :param trigger: validation interval + :param val_method: the ValidationMethod to use,e.g. "Top1Accuracy", "Top5Accuracy", "Loss" + """ + callBigDlFunc(self.bigdl_type, "setValidationWithPaddingStrategy", self.value, batch_size, + trigger, val_rdd, to_list(val_method)) diff --git a/pyzoo/zoo/pipeline/api/net/tf_optimizer.py b/pyzoo/zoo/pipeline/api/net/tf_optimizer.py index 42cf7aba309..9c424ead3aa 100644 --- a/pyzoo/zoo/pipeline/api/net/tf_optimizer.py +++ b/pyzoo/zoo/pipeline/api/net/tf_optimizer.py @@ -24,8 +24,9 @@ from bigdl.nn.criterion import Criterion from bigdl.nn.layer import Layer from bigdl.util.common import to_list, JavaValue -from bigdl.optim.optimizer import EveryEpoch, MaxEpoch, Optimizer +from bigdl.optim.optimizer import EveryEpoch, MaxEpoch from zoo.pipeline.api.keras.engine.topology import to_bigdl_metric +from zoo.pipeline.api.keras.optimizers import DistriOptimizer from zoo.pipeline.api.net.utils import _find_placeholders, _check_the_same from zoo.util import nest @@ -194,7 +195,7 @@ def to_floats(vs): raise ValueError("Validation data is not specified. Please set " + "val rdd in TFDataset, or set val_split larger than zero") - self.optimizer = Optimizer.create(self.training_helper_layer, + self.optimizer = DistriOptimizer(self.training_helper_layer, training_rdd, IdentityCriterion(), batch_size=batch_size, @@ -205,7 +206,7 @@ def to_floats(vs): val_method) else: training_rdd = sample_rdd - self.optimizer = Optimizer.create(self.training_helper_layer, + self.optimizer = DistriOptimizer(self.training_helper_layer, training_rdd, IdentityCriterion(), batch_size=batch_size, diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala index 49f4ee1ba54..4fb9341e43c 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala @@ -19,7 +19,7 @@ package com.intel.analytics.zoo.pipeline.api.keras.python import java.util.{List => JList, Map => JMap} import com.intel.analytics.bigdl.{Criterion, DataSet, Module} -import com.intel.analytics.bigdl.dataset.{Sample => JSample, Identity => DIdentity, _} +import com.intel.analytics.bigdl.dataset.{Identity => DIdentity, Sample => JSample, _} import scala.collection.JavaConverters._ import com.intel.analytics.bigdl.optim._ @@ -36,7 +36,7 @@ import com.intel.analytics.zoo.feature.image.ImageSet import com.intel.analytics.zoo.pipeline.api.autograd.{Constant, _} import com.intel.analytics.zoo.pipeline.api.keras.layers.{KerasLayerWrapper, _} import com.intel.analytics.zoo.pipeline.api.keras.layers.utils.KerasUtils -import com.intel.analytics.zoo.pipeline.api.keras.models.{KerasNet, Model, Sequential} +import com.intel.analytics.zoo.pipeline.api.keras.models.{InternalDistriOptimizer, KerasNet, Model, Sequential} import com.intel.analytics.zoo.pipeline.api.keras.objectives._ import com.intel.analytics.zoo.pipeline.api.keras.optimizers.{Adam, AdamWeightDecay} import org.apache.spark.api.java.JavaRDD @@ -1359,12 +1359,13 @@ class PythonZooKeras[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ def batchingWithPaddingStrategy(dataset: DataSet[JSample[T]], batchSize: Int) : DataSet[MiniBatch[T]] = { println("Using Feature Padding Strategy") - val featurePaddingParam = PaddingParam[T](Some(Array(Tensor[T](1).fill(ev.fromType(-1.0))))) + val paddingTensor = Tensor[T](1).fill(ev.fromType(-1.0)) + val featurePaddingParam = PaddingParam[T](Some(Array.fill[Tensor[T]](13)(paddingTensor))) dataset.transform(SampleToMiniBatch( batchSize = batchSize, featurePaddingParam = Some(featurePaddingParam))) } - override def createDistriOptimizerFromRDD(model: AbstractModule[Activity, Activity, T], + def createDistriOptimizerFromRDD(model: AbstractModule[Activity, Activity, T], trainingRdd: JavaRDD[Sample], criterion: Criterion[T], optimMethod: JMap[String, OptimMethod[T]], @@ -1372,7 +1373,7 @@ class PythonZooKeras[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ batchSize: Int): Optimizer[T, MiniBatch[T]] = { val sampleRDD = toJSample(trainingRdd) - val optimizer = new DistriOptimizer( + val optimizer = new InternalDistriOptimizer( _model = model, _dataset = batchingWithPaddingStrategy(DataSet.rdd(sampleRDD), batchSize) .asInstanceOf[DistributedDataSet[MiniBatch[T]]], @@ -1394,4 +1395,14 @@ class PythonZooKeras[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ optimizer } + + def setValidationWithPaddingStrategy(optimizer: Optimizer[T, MiniBatch[T]], + batchSize: Int, + trigger: Trigger, + valRdd: JavaRDD[Sample], + vMethods: JList[ValidationMethod[T]]): Unit = { + val sampleRDD = toJSample(valRdd) + optimizer.setValidation(trigger, batchingWithPaddingStrategy(DataSet.rdd(sampleRDD), batchSize), + vMethods.asScala.toArray) + } } From 77efcfabe776934a0eb573e49be5e1fbccef2798 Mon Sep 17 00:00:00 2001 From: hkvision Date: Wed, 17 Jul 2019 11:14:11 +0800 Subject: [PATCH 3/4] expose epochstep --- pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py | 5 +++-- pyzoo/zoo/pipeline/api/keras/optimizers.py | 5 +++++ .../zoo/pipeline/api/keras/python/PythonZooKeras.scala | 4 ++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py b/pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py index fff7f94c5ea..e8c9438ca84 100644 --- a/pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py +++ b/pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py @@ -63,7 +63,8 @@ def test_training_with_tensorboard_checkpoint_gradientclipping(self): y_train = np.random.randint(4, size=(200, )) X_test = np.random.random([40, 32, 32]) y_test = np.random.randint(4, size=(40, )) - model.compile(optimizer="adam", + from zoo.pipeline.api.keras.optimizers import Adam, EpochStep + model.compile(optimizer=Adam(lr=0.003, schedule=EpochStep(1, 0.75)), loss="sparse_categorical_crossentropy", metrics=['accuracy']) tmp_log_dir = create_tmp_path() @@ -72,7 +73,7 @@ def test_training_with_tensorboard_checkpoint_gradientclipping(self): model.set_tensorboard(tmp_log_dir, "training_test") model.set_checkpoint(tmp_checkpoint_path) model.set_constant_gradient_clipping(0.01, 0.03) - model.fit(X_train, y_train, batch_size=112, nb_epoch=2, validation_data=(X_test, y_test)) + model.fit(X_train, y_train, batch_size=32, nb_epoch=20, validation_data=(X_test, y_test)) model.clear_gradient_clipping() model.fit(X_train, y_train, batch_size=112, nb_epoch=2, validation_data=(X_test, y_test)) model.set_gradient_clipping_by_l2_norm(0.2) diff --git a/pyzoo/zoo/pipeline/api/keras/optimizers.py b/pyzoo/zoo/pipeline/api/keras/optimizers.py index a3463c68d16..1801dd53057 100644 --- a/pyzoo/zoo/pipeline/api/keras/optimizers.py +++ b/pyzoo/zoo/pipeline/api/keras/optimizers.py @@ -157,3 +157,8 @@ def set_validation(self, batch_size, val_rdd, trigger, val_method=None): """ callBigDlFunc(self.bigdl_type, "setValidationWithPaddingStrategy", self.value, batch_size, trigger, val_rdd, to_list(val_method)) + + +class EpochStep(JavaValue): + def __init__(self, step_size, gamma, bigdl_type="float"): + JavaValue.__init__(self, None, bigdl_type, step_size, float(gamma)) diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala index 4fb9341e43c..506f0e5e8d2 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala @@ -1405,4 +1405,8 @@ class PythonZooKeras[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ optimizer.setValidation(trigger, batchingWithPaddingStrategy(DataSet.rdd(sampleRDD), batchSize), vMethods.asScala.toArray) } + + def createEpochStep(stepSize: Int, gamma: Double): SGD.EpochStep = { + SGD.EpochStep(stepSize, gamma) + } } From 76f7282114ce2c1f5b4dff12f3349d92d66e7038 Mon Sep 17 00:00:00 2001 From: yangw Date: Sun, 28 Jul 2019 21:41:04 +0800 Subject: [PATCH 4/4] make training deterministic --- .../api/keras/python/PythonZooKeras.scala | 121 +++++++++++++++++- 1 file changed, 119 insertions(+), 2 deletions(-) diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala index 506f0e5e8d2..14ff1cbc7ef 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala @@ -16,6 +16,7 @@ package com.intel.analytics.zoo.pipeline.api.keras.python +import java.util.concurrent.atomic.AtomicInteger import java.util.{List => JList, Map => JMap} import com.intel.analytics.bigdl.{Criterion, DataSet, Module} @@ -31,7 +32,7 @@ import com.intel.analytics.bigdl.nn.Container import com.intel.analytics.bigdl.nn.abstractnn.{AbstractModule, Activity} import com.intel.analytics.bigdl.nn.keras.{KerasLayer, KerasModel} import com.intel.analytics.bigdl.nn.{BatchNormalization => BNBatchNormalization} -import com.intel.analytics.bigdl.utils.{Shape, Table} +import com.intel.analytics.bigdl.utils.{RandomGenerator, Shape, Table} import com.intel.analytics.zoo.feature.image.ImageSet import com.intel.analytics.zoo.pipeline.api.autograd.{Constant, _} import com.intel.analytics.zoo.pipeline.api.keras.layers.{KerasLayerWrapper, _} @@ -47,6 +48,7 @@ import com.intel.analytics.zoo.models.seq2seq.{Bridge, RNNDecoder, RNNEncoder} import com.intel.analytics.zoo.pipeline.api.Net import com.intel.analytics.zoo.pipeline.api.keras.{metrics => zmetrics} import com.intel.analytics.zoo.pipeline.api.net.GraphNet +import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -1373,9 +1375,17 @@ class PythonZooKeras[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ batchSize: Int): Optimizer[T, MiniBatch[T]] = { val sampleRDD = toJSample(trainingRdd) + val Array(nodeNumber, coreNumber) = this.getNodeAndCoreNumber() + val dataset = new CachedDistriDataSet[JSample[T]]( + sampleRDD.coalesce(nodeNumber, true) + .mapPartitions(iter => { + Iterator.single(iter.toArray) + }).setName("cached dataset") + .cache() + ) val optimizer = new InternalDistriOptimizer( _model = model, - _dataset = batchingWithPaddingStrategy(DataSet.rdd(sampleRDD), batchSize) + _dataset = batchingWithPaddingStrategy(dataset, batchSize) .asInstanceOf[DistributedDataSet[MiniBatch[T]]], _criterion = criterion ).asInstanceOf[Optimizer[T, MiniBatch[T]]] @@ -1410,3 +1420,110 @@ class PythonZooKeras[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ SGD.EpochStep(stepSize, gamma) } } + +class CachedDistriDataSet[T: ClassTag] +(buffer: RDD[Array[T]], isInOrder: Boolean = false, groupSize: Int = 1) + extends DistributedDataSet[T] { + + protected lazy val count: Long = buffer.mapPartitions(iter => { + require(iter.hasNext) + val array = iter.next() + require(!iter.hasNext) + Iterator.single(array.length) + }).reduce(_ + _) + + protected var indexes: RDD[Array[Int]] = buffer.mapPartitions(iter => { + Iterator.single((0 until iter.next().length).toArray) + }).setName("original index").cache() + + override def data(train: Boolean): RDD[T] = { + val _train = train + val _groupSize = if (isInOrder) Utils.getBatchSize(groupSize) else 1 + buffer.zipPartitions(indexes)((dataIter, indexIter) => { + val indexes = indexIter.next() + val indexOffset = math.max(1, indexes.length - (_groupSize - 1)) + val localData = dataIter.next() + val offset = if (_train) { + RandomGenerator2.RNG.uniform(0, indexOffset).toInt + } else { + 0 + } + new Iterator[T] { + private val _offset = new AtomicInteger(offset) + + override def hasNext: Boolean = { + if (_train) true else _offset.get() < localData.length + } + + override def next(): T = { + val i = _offset.getAndIncrement() + if (_train) { + localData(indexes(i % localData.length)) + } else { + if (i < localData.length) { + localData(indexes(i)) + } else { + null.asInstanceOf[T] + } + } + } + } + }) + } + + override def size(): Long = count + + override def shuffle(): Unit = { + if (!isInOrder) { + indexes.unpersist() + indexes = buffer.mapPartitions(iter => { + Iterator.single(RandomGenerator2.shuffle((0 until iter.next().length).toArray)) + }).setName("shuffled index").cache() + } + } + + override def originRDD(): RDD[_] = buffer + + override def cache(): Unit = { + buffer.count() + indexes.count() + isCached = true + } + + override def unpersist(): Unit = { + buffer.unpersist() + indexes.unpersist() + isCached = false + } +} + + +object RandomGenerator2 { + + var randomSeed = 1 + val generators = new ThreadLocal[RandomGenerator]() + + // scalastyle:off methodName + def RNG: RandomGenerator = { + if (generators.get() == null) { + val rg = RandomGenerator.RNG.clone() + rg.setSeed(randomSeed) + generators.set(rg) + } + generators.get() + } + // scalastyle:on methodName + + def shuffle[T](data: Array[T]): Array[T] = { + var i = 0 + val length = data.length + while (i < length) { + val exchange = RNG.uniform(0, length - i).toInt + i + val tmp = data(exchange) + data(exchange) = data(i) + data(i) = tmp + i += 1 + } + data + } +}