diff --git a/pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py b/pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py index fff7f94c5ea..e8c9438ca84 100644 --- a/pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py +++ b/pyzoo/test/zoo/pipeline/api/keras/test_simple_integration.py @@ -63,7 +63,8 @@ def test_training_with_tensorboard_checkpoint_gradientclipping(self): y_train = np.random.randint(4, size=(200, )) X_test = np.random.random([40, 32, 32]) y_test = np.random.randint(4, size=(40, )) - model.compile(optimizer="adam", + from zoo.pipeline.api.keras.optimizers import Adam, EpochStep + model.compile(optimizer=Adam(lr=0.003, schedule=EpochStep(1, 0.75)), loss="sparse_categorical_crossentropy", metrics=['accuracy']) tmp_log_dir = create_tmp_path() @@ -72,7 +73,7 @@ def test_training_with_tensorboard_checkpoint_gradientclipping(self): model.set_tensorboard(tmp_log_dir, "training_test") model.set_checkpoint(tmp_checkpoint_path) model.set_constant_gradient_clipping(0.01, 0.03) - model.fit(X_train, y_train, batch_size=112, nb_epoch=2, validation_data=(X_test, y_test)) + model.fit(X_train, y_train, batch_size=32, nb_epoch=20, validation_data=(X_test, y_test)) model.clear_gradient_clipping() model.fit(X_train, y_train, batch_size=112, nb_epoch=2, validation_data=(X_test, y_test)) model.set_gradient_clipping_by_l2_norm(0.2) diff --git a/pyzoo/test/zoo/tfpark/test_tfpark_model.py b/pyzoo/test/zoo/tfpark/test_tfpark_model.py index abd0a198ad2..13b50a4a3c0 100644 --- a/pyzoo/test/zoo/tfpark/test_tfpark_model.py +++ b/pyzoo/test/zoo/tfpark/test_tfpark_model.py @@ -316,6 +316,33 @@ def test_tf_optimizer_with_sparse_gradient_using_keras(self): optimizer = TFOptimizer.from_keras(model, dataset) optimizer.optimize() + def test_tf_optimizer_variable_length(self): + from random import randrange + ids = [np.random.randint(0, 10, size=[randrange(20)+1]) for i in range(0, 20)] + labels = [np.array([1]) for i in range(0, 20)] + id_rdd = self.sc.parallelize(ids) + label_rdd = self.sc.parallelize(labels) + training_rdd = id_rdd.zip(label_rdd) + dataset = TFDataset.from_rdd(training_rdd, + features=(tf.int32, [None]), + labels=(tf.int32, [1]), + names=["features", "labels"], + ) + # model = tf.keras.models.Sequential() + # model.add(tf.keras.layers.Dense(2, input_shape=(20, ), activation="softmax")) + # model.compile(optimizer="sgd", loss="sparse_categorical_crossentropy") + words_input = tf.keras.layers.Input(shape=(20, ), name='words_input') + embedding_layer = tf.keras.layers.Embedding(input_dim=10, + output_dim=5, name='word_embedding') + word_embeddings = embedding_layer(words_input) + embedding = tf.keras.layers.Flatten()(word_embeddings) + model = tf.keras.models.Model(inputs=[words_input], outputs=[embedding]) + model.compile(optimizer="sgd", loss="mse") + optimizer = TFOptimizer.from_keras(model, dataset) + optimizer.optimize() + print("111") + + def test_tensorflow_optimizer(self): data = tf.keras.layers.Input(shape=[10]) diff --git a/pyzoo/zoo/pipeline/api/keras/optimizers.py b/pyzoo/zoo/pipeline/api/keras/optimizers.py index af910c505cd..1801dd53057 100644 --- a/pyzoo/zoo/pipeline/api/keras/optimizers.py +++ b/pyzoo/zoo/pipeline/api/keras/optimizers.py @@ -105,3 +105,60 @@ def __init__(self, epsilon, weight_decay) self.bigdl_type = bigdl_type + + +from bigdl.optim.optimizer import DistriOptimizer as BDistriOptimizer, SGD + + +class DistriOptimizer(BDistriOptimizer): + def __init__(self, + model, + training_rdd, + criterion, + end_trigger=None, + batch_size=32, + optim_method=None, + bigdl_type="float"): + """ + Create an optimizer. + + + :param model: the neural net model + :param training_data: the training dataset + :param criterion: the loss function + :param optim_method: the algorithm to use for optimization, + e.g. SGD, Adagrad, etc. If optim_method is None, the default algorithm is SGD. + :param end_trigger: when to end the optimization + :param batch_size: training batch size + """ + if not optim_method: + optim_methods = {model.name(): SGD()} + elif isinstance(optim_method, OptimMethod): + optim_methods = {model.name(): optim_method} + elif isinstance(optim_method, JavaObject): + optim_methods = {model.name(): OptimMethod(optim_method, bigdl_type)} + else: + optim_methods = optim_method + if isinstance(training_rdd, RDD): + self.bigdl_type = bigdl_type + self.value = callBigDlFunc(self.bigdl_type, "createDistriOptimizerFromRDD", + model.value, training_rdd, criterion, + optim_methods, end_trigger, batch_size) + + def set_validation(self, batch_size, val_rdd, trigger, val_method=None): + """ + Configure validation settings. + + + :param batch_size: validation batch size + :param val_rdd: validation dataset + :param trigger: validation interval + :param val_method: the ValidationMethod to use,e.g. "Top1Accuracy", "Top5Accuracy", "Loss" + """ + callBigDlFunc(self.bigdl_type, "setValidationWithPaddingStrategy", self.value, batch_size, + trigger, val_rdd, to_list(val_method)) + + +class EpochStep(JavaValue): + def __init__(self, step_size, gamma, bigdl_type="float"): + JavaValue.__init__(self, None, bigdl_type, step_size, float(gamma)) diff --git a/pyzoo/zoo/pipeline/api/net/tf_optimizer.py b/pyzoo/zoo/pipeline/api/net/tf_optimizer.py index 42cf7aba309..9c424ead3aa 100644 --- a/pyzoo/zoo/pipeline/api/net/tf_optimizer.py +++ b/pyzoo/zoo/pipeline/api/net/tf_optimizer.py @@ -24,8 +24,9 @@ from bigdl.nn.criterion import Criterion from bigdl.nn.layer import Layer from bigdl.util.common import to_list, JavaValue -from bigdl.optim.optimizer import EveryEpoch, MaxEpoch, Optimizer +from bigdl.optim.optimizer import EveryEpoch, MaxEpoch from zoo.pipeline.api.keras.engine.topology import to_bigdl_metric +from zoo.pipeline.api.keras.optimizers import DistriOptimizer from zoo.pipeline.api.net.utils import _find_placeholders, _check_the_same from zoo.util import nest @@ -194,7 +195,7 @@ def to_floats(vs): raise ValueError("Validation data is not specified. Please set " + "val rdd in TFDataset, or set val_split larger than zero") - self.optimizer = Optimizer.create(self.training_helper_layer, + self.optimizer = DistriOptimizer(self.training_helper_layer, training_rdd, IdentityCriterion(), batch_size=batch_size, @@ -205,7 +206,7 @@ def to_floats(vs): val_method) else: training_rdd = sample_rdd - self.optimizer = Optimizer.create(self.training_helper_layer, + self.optimizer = DistriOptimizer(self.training_helper_layer, training_rdd, IdentityCriterion(), batch_size=batch_size, diff --git a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala index 956836e0293..14ff1cbc7ef 100644 --- a/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala +++ b/zoo/src/main/scala/com/intel/analytics/zoo/pipeline/api/keras/python/PythonZooKeras.scala @@ -16,13 +16,14 @@ package com.intel.analytics.zoo.pipeline.api.keras.python +import java.util.concurrent.atomic.AtomicInteger import java.util.{List => JList, Map => JMap} -import com.intel.analytics.bigdl.{Criterion, Module} -import com.intel.analytics.bigdl.dataset.{DataSet, LocalDataSet, MiniBatch} +import com.intel.analytics.bigdl.{Criterion, DataSet, Module} +import com.intel.analytics.bigdl.dataset.{Identity => DIdentity, Sample => JSample, _} import scala.collection.JavaConverters._ -import com.intel.analytics.bigdl.optim.{_} +import com.intel.analytics.bigdl.optim._ import com.intel.analytics.bigdl.python.api.{EvaluatedResult, JTensor, Sample} import com.intel.analytics.bigdl.tensor.Tensor import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric @@ -31,12 +32,12 @@ import com.intel.analytics.bigdl.nn.Container import com.intel.analytics.bigdl.nn.abstractnn.{AbstractModule, Activity} import com.intel.analytics.bigdl.nn.keras.{KerasLayer, KerasModel} import com.intel.analytics.bigdl.nn.{BatchNormalization => BNBatchNormalization} -import com.intel.analytics.bigdl.utils.{Shape, Table} +import com.intel.analytics.bigdl.utils.{RandomGenerator, Shape, Table} import com.intel.analytics.zoo.feature.image.ImageSet import com.intel.analytics.zoo.pipeline.api.autograd.{Constant, _} import com.intel.analytics.zoo.pipeline.api.keras.layers.{KerasLayerWrapper, _} import com.intel.analytics.zoo.pipeline.api.keras.layers.utils.KerasUtils -import com.intel.analytics.zoo.pipeline.api.keras.models.{KerasNet, Model, Sequential} +import com.intel.analytics.zoo.pipeline.api.keras.models.{InternalDistriOptimizer, KerasNet, Model, Sequential} import com.intel.analytics.zoo.pipeline.api.keras.objectives._ import com.intel.analytics.zoo.pipeline.api.keras.optimizers.{Adam, AdamWeightDecay} import org.apache.spark.api.java.JavaRDD @@ -47,6 +48,7 @@ import com.intel.analytics.zoo.models.seq2seq.{Bridge, RNNDecoder, RNNEncoder} import com.intel.analytics.zoo.pipeline.api.Net import com.intel.analytics.zoo.pipeline.api.keras.{metrics => zmetrics} import com.intel.analytics.zoo.pipeline.api.net.GraphNet +import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -1355,4 +1357,173 @@ class PythonZooKeras[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZ new AdamWeightDecay[T](learningRate, warmupPortion, total, schedule, beta1, beta2, epsilon, weightDecay) } + + def batchingWithPaddingStrategy(dataset: DataSet[JSample[T]], batchSize: Int) + : DataSet[MiniBatch[T]] = { + println("Using Feature Padding Strategy") + val paddingTensor = Tensor[T](1).fill(ev.fromType(-1.0)) + val featurePaddingParam = PaddingParam[T](Some(Array.fill[Tensor[T]](13)(paddingTensor))) + dataset.transform(SampleToMiniBatch( + batchSize = batchSize, featurePaddingParam = Some(featurePaddingParam))) + } + + def createDistriOptimizerFromRDD(model: AbstractModule[Activity, Activity, T], + trainingRdd: JavaRDD[Sample], + criterion: Criterion[T], + optimMethod: JMap[String, OptimMethod[T]], + endTrigger: Trigger, + batchSize: Int): Optimizer[T, MiniBatch[T]] = { + val sampleRDD = toJSample(trainingRdd) + + val Array(nodeNumber, coreNumber) = this.getNodeAndCoreNumber() + val dataset = new CachedDistriDataSet[JSample[T]]( + sampleRDD.coalesce(nodeNumber, true) + .mapPartitions(iter => { + Iterator.single(iter.toArray) + }).setName("cached dataset") + .cache() + ) + val optimizer = new InternalDistriOptimizer( + _model = model, + _dataset = batchingWithPaddingStrategy(dataset, batchSize) + .asInstanceOf[DistributedDataSet[MiniBatch[T]]], + _criterion = criterion + ).asInstanceOf[Optimizer[T, MiniBatch[T]]] + enrichOptimizer(optimizer, endTrigger, optimMethod.asScala.toMap) + } + + private def enrichOptimizer[T]( + optimizer: Optimizer[T, MiniBatch[T]], + endTrigger: Trigger, + optimMethod: Map[String, OptimMethod[T]]): Optimizer[T, MiniBatch[T]] = { + optimizer.setEndWhen(endTrigger) + + optimizer.setOptimMethods(optimMethod) + + // TODO: remove this + optimizer.disableCheckSingleton() + + optimizer + } + + def setValidationWithPaddingStrategy(optimizer: Optimizer[T, MiniBatch[T]], + batchSize: Int, + trigger: Trigger, + valRdd: JavaRDD[Sample], + vMethods: JList[ValidationMethod[T]]): Unit = { + val sampleRDD = toJSample(valRdd) + optimizer.setValidation(trigger, batchingWithPaddingStrategy(DataSet.rdd(sampleRDD), batchSize), + vMethods.asScala.toArray) + } + + def createEpochStep(stepSize: Int, gamma: Double): SGD.EpochStep = { + SGD.EpochStep(stepSize, gamma) + } +} + +class CachedDistriDataSet[T: ClassTag] +(buffer: RDD[Array[T]], isInOrder: Boolean = false, groupSize: Int = 1) + extends DistributedDataSet[T] { + + protected lazy val count: Long = buffer.mapPartitions(iter => { + require(iter.hasNext) + val array = iter.next() + require(!iter.hasNext) + Iterator.single(array.length) + }).reduce(_ + _) + + protected var indexes: RDD[Array[Int]] = buffer.mapPartitions(iter => { + Iterator.single((0 until iter.next().length).toArray) + }).setName("original index").cache() + + override def data(train: Boolean): RDD[T] = { + val _train = train + val _groupSize = if (isInOrder) Utils.getBatchSize(groupSize) else 1 + buffer.zipPartitions(indexes)((dataIter, indexIter) => { + val indexes = indexIter.next() + val indexOffset = math.max(1, indexes.length - (_groupSize - 1)) + val localData = dataIter.next() + val offset = if (_train) { + RandomGenerator2.RNG.uniform(0, indexOffset).toInt + } else { + 0 + } + new Iterator[T] { + private val _offset = new AtomicInteger(offset) + + override def hasNext: Boolean = { + if (_train) true else _offset.get() < localData.length + } + + override def next(): T = { + val i = _offset.getAndIncrement() + if (_train) { + localData(indexes(i % localData.length)) + } else { + if (i < localData.length) { + localData(indexes(i)) + } else { + null.asInstanceOf[T] + } + } + } + } + }) + } + + override def size(): Long = count + + override def shuffle(): Unit = { + if (!isInOrder) { + indexes.unpersist() + indexes = buffer.mapPartitions(iter => { + Iterator.single(RandomGenerator2.shuffle((0 until iter.next().length).toArray)) + }).setName("shuffled index").cache() + } + } + + override def originRDD(): RDD[_] = buffer + + override def cache(): Unit = { + buffer.count() + indexes.count() + isCached = true + } + + override def unpersist(): Unit = { + buffer.unpersist() + indexes.unpersist() + isCached = false + } +} + + +object RandomGenerator2 { + + var randomSeed = 1 + val generators = new ThreadLocal[RandomGenerator]() + + // scalastyle:off methodName + def RNG: RandomGenerator = { + if (generators.get() == null) { + val rg = RandomGenerator.RNG.clone() + rg.setSeed(randomSeed) + generators.set(rg) + } + generators.get() + } + // scalastyle:on methodName + + def shuffle[T](data: Array[T]): Array[T] = { + var i = 0 + val length = data.length + while (i < length) { + val exchange = RNG.uniform(0, length - i).toInt + i + val tmp = data(exchange) + data(exchange) = data(i) + data(i) = tmp + i += 1 + } + data + } }