Skip to content

Commit

Permalink
IGNITE-7350: Distributed MLP cleanup/refactoring
Browse files Browse the repository at this point in the history
this closes #3368
  • Loading branch information
artemmalykh authored and ybabak committed Jan 16, 2018
1 parent 696277d commit 6eccf23
Show file tree
Hide file tree
Showing 44 changed files with 297 additions and 119 deletions.
Expand Up @@ -35,7 +35,7 @@
import org.apache.ignite.ml.nn.architecture.MLPArchitecture; import org.apache.ignite.ml.nn.architecture.MLPArchitecture;
import org.apache.ignite.ml.nn.initializers.RandomInitializer; import org.apache.ignite.ml.nn.initializers.RandomInitializer;
import org.apache.ignite.ml.nn.trainers.distributed.MLPGroupUpdateTrainer; import org.apache.ignite.ml.nn.trainers.distributed.MLPGroupUpdateTrainer;
import org.apache.ignite.ml.nn.updaters.RPropParameterUpdate; import org.apache.ignite.ml.optimization.updatecalculators.RPropParameterUpdate;
import org.apache.ignite.ml.structures.LabeledVector; import org.apache.ignite.ml.structures.LabeledVector;
import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.thread.IgniteThread;


Expand Down
Expand Up @@ -25,13 +25,13 @@
import org.apache.ignite.ml.math.functions.IgniteSupplier; import org.apache.ignite.ml.math.functions.IgniteSupplier;
import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix; import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix;
import org.apache.ignite.ml.nn.Activators; import org.apache.ignite.ml.nn.Activators;
import org.apache.ignite.ml.nn.LocalBatchTrainerInput; import org.apache.ignite.ml.trainers.local.LocalBatchTrainerInput;
import org.apache.ignite.ml.nn.LossFunctions; import org.apache.ignite.ml.optimization.LossFunctions;
import org.apache.ignite.ml.nn.MultilayerPerceptron; import org.apache.ignite.ml.nn.MultilayerPerceptron;
import org.apache.ignite.ml.nn.architecture.MLPArchitecture; import org.apache.ignite.ml.nn.architecture.MLPArchitecture;
import org.apache.ignite.ml.nn.initializers.RandomInitializer; import org.apache.ignite.ml.nn.initializers.RandomInitializer;
import org.apache.ignite.ml.nn.trainers.local.MLPLocalBatchTrainer; import org.apache.ignite.ml.nn.trainers.local.MLPLocalBatchTrainer;
import org.apache.ignite.ml.nn.updaters.RPropUpdateCalculator; import org.apache.ignite.ml.optimization.updatecalculators.RPropUpdateCalculator;
import org.apache.ignite.ml.util.Utils; import org.apache.ignite.ml.util.Utils;


/** /**
Expand Down
Expand Up @@ -19,6 +19,7 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.ignite.Ignite; import org.apache.ignite.Ignite;
Expand Down Expand Up @@ -56,6 +57,11 @@ public class MLPGroupUpdateTrainerCacheInput extends AbstractMLPGroupUpdateTrain
*/ */
private final MultilayerPerceptron mlp; private final MultilayerPerceptron mlp;


/**
* Random number generator.
*/
private final Random rand;

/** /**
* Construct instance of this class with given parameters. * Construct instance of this class with given parameters.
* *
Expand All @@ -64,15 +70,32 @@ public class MLPGroupUpdateTrainerCacheInput extends AbstractMLPGroupUpdateTrain
* @param networksCnt Count of networks to be trained in parallel by {@link MLPGroupUpdateTrainer}. * @param networksCnt Count of networks to be trained in parallel by {@link MLPGroupUpdateTrainer}.
* @param cache Cache with labeled vectors. * @param cache Cache with labeled vectors.
* @param batchSize Size of batch to return on each training iteration. * @param batchSize Size of batch to return on each training iteration.
* @param rand RNG.
*/ */
public MLPGroupUpdateTrainerCacheInput(MLPArchitecture arch, MLPInitializer init, public MLPGroupUpdateTrainerCacheInput(MLPArchitecture arch, MLPInitializer init,
int networksCnt, IgniteCache<Integer, LabeledVector<Vector, Vector>> cache, int networksCnt, IgniteCache<Integer, LabeledVector<Vector, Vector>> cache,
int batchSize) { int batchSize, Random rand) {
super(networksCnt); super(networksCnt);


this.batchSize = batchSize; this.batchSize = batchSize;
this.cache = cache; this.cache = cache;
this.mlp = new MultilayerPerceptron(arch, init); this.mlp = new MultilayerPerceptron(arch, init);
this.rand = rand;
}

/**
* Construct instance of this class with given parameters.
*
* @param arch Architecture of multilayer perceptron.
* @param init Initializer of multilayer perceptron.
* @param networksCnt Count of networks to be trained in parallel by {@link MLPGroupUpdateTrainer}.
* @param cache Cache with labeled vectors.
* @param batchSize Size of batch to return on each training iteration.
*/
public MLPGroupUpdateTrainerCacheInput(MLPArchitecture arch, MLPInitializer init,
int networksCnt, IgniteCache<Integer, LabeledVector<Vector, Vector>> cache,
int batchSize) {
this(arch, init, networksCnt, cache, batchSize, new Random());
} }


/** /**
Expand All @@ -93,6 +116,7 @@ public MLPGroupUpdateTrainerCacheInput(MLPArchitecture arch, int networksCnt,
@Override public IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier() { @Override public IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier() {
String cName = cache.getName(); String cName = cache.getName();
int bs = batchSize; int bs = batchSize;
Random r = rand; // IMPL NOTE this is intended to make below lambda more lightweight.


return () -> { return () -> {
Ignite ignite = Ignition.localIgnite(); Ignite ignite = Ignition.localIgnite();
Expand All @@ -105,7 +129,7 @@ public MLPGroupUpdateTrainerCacheInput(MLPArchitecture arch, int networksCnt,


int locKeysCnt = keys.size(); int locKeysCnt = keys.size();


int[] selected = Utils.selectKDistinct(locKeysCnt, Math.min(bs, locKeysCnt)); int[] selected = Utils.selectKDistinct(locKeysCnt, Math.min(bs, locKeysCnt), r);


// Get dimensions of vectors in cache. We suppose that every feature vector has // Get dimensions of vectors in cache. We suppose that every feature vector has
// same dimension d 1 and every label has the same dimension d2. // same dimension d 1 and every label has the same dimension d2.
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.apache.ignite.ml.nn.architecture.TransformationLayerArchitecture; import org.apache.ignite.ml.nn.architecture.TransformationLayerArchitecture;
import org.apache.ignite.ml.nn.initializers.MLPInitializer; import org.apache.ignite.ml.nn.initializers.MLPInitializer;
import org.apache.ignite.ml.nn.initializers.RandomInitializer; import org.apache.ignite.ml.nn.initializers.RandomInitializer;
import org.apache.ignite.ml.nn.updaters.SmoothParametrized; import org.apache.ignite.ml.optimization.SmoothParametrized;


import static org.apache.ignite.ml.math.util.MatrixUtil.elementWiseTimes; import static org.apache.ignite.ml.math.util.MatrixUtil.elementWiseTimes;


Expand Down
Expand Up @@ -20,7 +20,7 @@
import java.util.UUID; import java.util.UUID;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.ignite.ml.math.functions.IgniteSupplier; import org.apache.ignite.ml.math.functions.IgniteSupplier;
import org.apache.ignite.ml.nn.LocalBatchTrainerInput; import org.apache.ignite.ml.trainers.local.LocalBatchTrainerInput;
import org.apache.ignite.ml.nn.MultilayerPerceptron; import org.apache.ignite.ml.nn.MultilayerPerceptron;
import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey; import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey;
import org.apache.ignite.ml.trainers.group.GroupTrainerInput; import org.apache.ignite.ml.trainers.group.GroupTrainerInput;
Expand Down
Expand Up @@ -33,11 +33,11 @@
import org.apache.ignite.ml.math.functions.IgniteFunction; import org.apache.ignite.ml.math.functions.IgniteFunction;
import org.apache.ignite.ml.math.functions.IgniteSupplier; import org.apache.ignite.ml.math.functions.IgniteSupplier;
import org.apache.ignite.ml.math.util.MatrixUtil; import org.apache.ignite.ml.math.util.MatrixUtil;
import org.apache.ignite.ml.nn.LossFunctions; import org.apache.ignite.ml.optimization.LossFunctions;
import org.apache.ignite.ml.nn.MultilayerPerceptron; import org.apache.ignite.ml.nn.MultilayerPerceptron;
import org.apache.ignite.ml.nn.updaters.ParameterUpdateCalculator; import org.apache.ignite.ml.optimization.updatecalculators.ParameterUpdateCalculator;
import org.apache.ignite.ml.nn.updaters.RPropParameterUpdate; import org.apache.ignite.ml.optimization.updatecalculators.RPropParameterUpdate;
import org.apache.ignite.ml.nn.updaters.RPropUpdateCalculator; import org.apache.ignite.ml.optimization.updatecalculators.RPropUpdateCalculator;
import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey; import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey;
import org.apache.ignite.ml.trainers.group.MetaoptimizerGroupTrainer; import org.apache.ignite.ml.trainers.group.MetaoptimizerGroupTrainer;
import org.apache.ignite.ml.trainers.group.ResultAndUpdates; import org.apache.ignite.ml.trainers.group.ResultAndUpdates;
Expand Down Expand Up @@ -227,8 +227,7 @@ MLPGroupUpdateTrainingContext<U>>, MLPGroupUpdateTrainingLoopData<U>> trainingLo
UUID uuid = ctx.trainingUUID(); UUID uuid = ctx.trainingUUID();


return () -> { return () -> {
MLPGroupUpdateTrainingData<U> data = MLPGroupUpdateTrainerDataCache MLPGroupUpdateTrainingData<U> data = MLPGroupUpdateTrainerDataCache.getOrCreate(Ignition.localIgnite()).get(uuid);
.getOrCreate(Ignition.localIgnite()).get(uuid);
return new MLPGroupUpdateTrainingContext<>(data, prevUpdate); return new MLPGroupUpdateTrainingContext<>(data, prevUpdate);
}; };
} }
Expand Down
Expand Up @@ -55,7 +55,7 @@ public static IgniteCache<UUID, MLPGroupUpdateTrainingData> getOrCreate(Ignite i
CacheConfiguration<UUID, MLPGroupUpdateTrainingData> cfg = new CacheConfiguration<>(); CacheConfiguration<UUID, MLPGroupUpdateTrainingData> cfg = new CacheConfiguration<>();


// Write to primary. // Write to primary.
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC); cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);


// Atomic transactions only. // Atomic transactions only.
cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
Expand Down
Expand Up @@ -25,21 +25,36 @@
import org.apache.ignite.ml.math.functions.IgniteFunction; import org.apache.ignite.ml.math.functions.IgniteFunction;
import org.apache.ignite.ml.math.functions.IgniteSupplier; import org.apache.ignite.ml.math.functions.IgniteSupplier;
import org.apache.ignite.ml.nn.MultilayerPerceptron; import org.apache.ignite.ml.nn.MultilayerPerceptron;
import org.apache.ignite.ml.nn.updaters.ParameterUpdateCalculator; import org.apache.ignite.ml.optimization.updatecalculators.ParameterUpdateCalculator;


/** Multilayer perceptron group update training data. */ /** Multilayer perceptron group update training data. */
public class MLPGroupUpdateTrainingData<U> { public class MLPGroupUpdateTrainingData<U> {
/** */ /** {@link ParameterUpdateCalculator}. */
private final ParameterUpdateCalculator<MultilayerPerceptron, U> updateCalculator; private final ParameterUpdateCalculator<MultilayerPerceptron, U> updateCalculator;
/** */
/**
* Count of steps which should be done by each of parallel trainings before sending it's update for combining with
* other parallel trainings updates.
*/
private final int stepsCnt; private final int stepsCnt;
/** */
/**
* Function used to reduce updates in one training (for example, sum all sequential gradient updates to get one
* gradient update).
*/
private final IgniteFunction<List<U>, U> updateReducer; private final IgniteFunction<List<U>, U> updateReducer;
/** */
/**
* Supplier of batches in the form of (inputs, groundTruths).
*/
private final IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier; private final IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier;
/** */
/**
* Loss function.
*/
private final IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss; private final IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss;
/** */
/** Error tolerance. */
private final double tolerance; private final double tolerance;


/** Construct multilayer perceptron group update training data with all parameters provided. */ /** Construct multilayer perceptron group update training data with all parameters provided. */
Expand Down
Expand Up @@ -26,29 +26,39 @@
import org.apache.ignite.ml.math.functions.IgniteFunction; import org.apache.ignite.ml.math.functions.IgniteFunction;
import org.apache.ignite.ml.math.functions.IgniteSupplier; import org.apache.ignite.ml.math.functions.IgniteSupplier;
import org.apache.ignite.ml.nn.MultilayerPerceptron; import org.apache.ignite.ml.nn.MultilayerPerceptron;
import org.apache.ignite.ml.nn.updaters.ParameterUpdateCalculator; import org.apache.ignite.ml.optimization.updatecalculators.ParameterUpdateCalculator;
import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey; import org.apache.ignite.ml.trainers.group.GroupTrainerCacheKey;


/** Multilayer perceptron group update training loop data. */ /** Multilayer perceptron group update training loop data. */
public class MLPGroupUpdateTrainingLoopData<P> implements Serializable { public class MLPGroupUpdateTrainingLoopData<P> implements Serializable {
/** */ /** {@link ParameterUpdateCalculator}. */
private final ParameterUpdateCalculator<MultilayerPerceptron, P> updateCalculator; private final ParameterUpdateCalculator<MultilayerPerceptron, P> updateCalculator;
/** */
/**
* Count of steps which should be done by each of parallel trainings before sending it's update for combining with
* other parallel trainings updates.
*/
private final int stepsCnt; private final int stepsCnt;
/** */
/** Function used to reduce updates of all steps of given parallel training. */
private final IgniteFunction<List<P>, P> updateReducer; private final IgniteFunction<List<P>, P> updateReducer;
/** */
/** Previous update. */
private final P previousUpdate; private final P previousUpdate;
/** */
/** Supplier of batches. */
private final IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier; private final IgniteSupplier<IgniteBiTuple<Matrix, Matrix>> batchSupplier;
/** */
/** Loss function. */
private final IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss; private final IgniteFunction<Vector, IgniteDifferentiableVectorToDoubleFunction> loss;
/** */
/** Error tolerance. */
private final double tolerance; private final double tolerance;


/** */ /** Key. */
private final GroupTrainerCacheKey<Void> key; private final GroupTrainerCacheKey<Void> key;
/** */
/** MLP. */
private final MultilayerPerceptron mlp; private final MultilayerPerceptron mlp;


/** Create multilayer perceptron group update training loop data. */ /** Create multilayer perceptron group update training loop data. */
Expand Down
Expand Up @@ -26,7 +26,7 @@
/** Meta-optimizer for multilayer perceptron. */ /** Meta-optimizer for multilayer perceptron. */
public class MLPMetaoptimizer<P> implements Metaoptimizer<MLPGroupUpdateTrainerLocalContext, public class MLPMetaoptimizer<P> implements Metaoptimizer<MLPGroupUpdateTrainerLocalContext,
MLPGroupUpdateTrainingLoopData<P>, P, P, P, ArrayList<P>> { MLPGroupUpdateTrainingLoopData<P>, P, P, P, ArrayList<P>> {
/** */ /** Function used for reducing updates produced by parallel trainings. */
private final IgniteFunction<List<P>, P> allUpdatesReducer; private final IgniteFunction<List<P>, P> allUpdatesReducer;


/** Construct metaoptimizer. */ /** Construct metaoptimizer. */
Expand Down
Expand Up @@ -21,11 +21,12 @@
import org.apache.ignite.ml.math.functions.IgniteDifferentiableVectorToDoubleFunction; import org.apache.ignite.ml.math.functions.IgniteDifferentiableVectorToDoubleFunction;
import org.apache.ignite.ml.math.functions.IgniteFunction; import org.apache.ignite.ml.math.functions.IgniteFunction;
import org.apache.ignite.ml.math.functions.IgniteSupplier; import org.apache.ignite.ml.math.functions.IgniteSupplier;
import org.apache.ignite.ml.nn.LossFunctions; import org.apache.ignite.ml.optimization.LossFunctions;
import org.apache.ignite.ml.nn.MultilayerPerceptron; import org.apache.ignite.ml.nn.MultilayerPerceptron;
import org.apache.ignite.ml.nn.updaters.ParameterUpdateCalculator; import org.apache.ignite.ml.optimization.updatecalculators.ParameterUpdateCalculator;
import org.apache.ignite.ml.nn.updaters.RPropParameterUpdate; import org.apache.ignite.ml.optimization.updatecalculators.RPropParameterUpdate;
import org.apache.ignite.ml.nn.updaters.RPropUpdateCalculator; import org.apache.ignite.ml.optimization.updatecalculators.RPropUpdateCalculator;
import org.apache.ignite.ml.trainers.local.LocalBatchTrainer;


/** /**
* Local batch trainer for MLP. * Local batch trainer for MLP.
Expand Down
Expand Up @@ -34,8 +34,11 @@ public class BarzilaiBorweinUpdater implements Updater {
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override public Vector compute(Vector oldWeights, Vector oldGradient, Vector weights, Vector gradient, int iteration) { @Override public Vector compute(Vector oldWeights, Vector oldGradient, Vector weights, Vector gradient,
double learningRate = computeLearningRate(oldWeights != null ? oldWeights.copy() : null, oldGradient != null ? oldGradient.copy() : null, weights.copy(), gradient.copy()); int iteration) {
double learningRate = computeLearningRate(oldWeights != null ? oldWeights.copy() : null,
oldGradient != null ? oldGradient.copy() : null, weights.copy(), gradient.copy());

return weights.copy().minus(gradient.copy().times(learningRate)); return weights.copy().minus(gradient.copy().times(learningRate));
} }


Expand All @@ -45,6 +48,7 @@ private double computeLearningRate(Vector oldWeights, Vector oldGradient, Vector
return INITIAL_LEARNING_RATE; return INITIAL_LEARNING_RATE;
else { else {
Vector gradientDiff = gradient.minus(oldGradient); Vector gradientDiff = gradient.minus(oldGradient);

return weights.minus(oldWeights).dot(gradientDiff) / Math.pow(gradientDiff.kNorm(2.0), 2.0); return weights.minus(oldWeights).dot(gradientDiff) / Math.pow(gradientDiff.kNorm(2.0), 2.0);
} }
} }
Expand Down
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.ml.optimization;

import org.apache.ignite.ml.math.Vector;
import org.apache.ignite.ml.util.Utils;

/**
* Base interface for parametrized models.
*
* @param <M> Model class.
*/
interface BaseParametrized<M extends BaseParametrized<M>> {
/**
* Get parameters vector.
*
* @return Parameters vector.
*/
Vector parameters();

/**
* Set parameters.
*
* @param vector Parameters vector.
*/
M setParameters(Vector vector);

/**
* Return new model with given parameters vector.
*
* @param vector Parameters vector.
*/
default M withParameters(Vector vector) {
return Utils.copy(this).setParameters(vector);
}

/**
* Get count of parameters of this model.
*
* @return Count of parameters of this model.
*/
int parametersCount();
}
Expand Up @@ -141,6 +141,7 @@ private Vector calculateDistributedGradient(SparseDistributedMatrix data, Vector
cnt++; cnt++;
} }
} }

return resGradient.divide(cnt); return resGradient.divide(cnt);
}, },
weights); weights);
Expand Down Expand Up @@ -187,7 +188,7 @@ private Matrix extractInputs(Matrix data) {
/** Makes carrying of the gradient function and fixes data matrix. */ /** Makes carrying of the gradient function and fixes data matrix. */
private IgniteFunction<Vector, Vector> getLossGradientFunction(Matrix data) { private IgniteFunction<Vector, Vector> getLossGradientFunction(Matrix data) {
if (data instanceof SparseDistributedMatrix) { if (data instanceof SparseDistributedMatrix) {
SparseDistributedMatrix distributedMatrix = (SparseDistributedMatrix) data; SparseDistributedMatrix distributedMatrix = (SparseDistributedMatrix)data;


if (distributedMatrix.getStorage().storageMode() == StorageConstants.ROW_STORAGE_MODE) if (distributedMatrix.getStorage().storageMode() == StorageConstants.ROW_STORAGE_MODE)
return weights -> calculateDistributedGradient(distributedMatrix, weights); return weights -> calculateDistributedGradient(distributedMatrix, weights);
Expand Down
Expand Up @@ -27,5 +27,5 @@
@FunctionalInterface @FunctionalInterface
public interface GradientFunction extends Serializable { public interface GradientFunction extends Serializable {
/** */ /** */
Vector compute(Matrix inputs, Vector groundTruth, Vector point); Vector compute(Matrix inputs, Vector groundTruth, Vector pnt);
} }
Expand Up @@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.ignite.ml.nn; package org.apache.ignite.ml.optimization;


import org.apache.ignite.ml.math.Vector; import org.apache.ignite.ml.math.Vector;
import org.apache.ignite.ml.math.functions.IgniteDifferentiableVectorToDoubleFunction; import org.apache.ignite.ml.math.functions.IgniteDifferentiableVectorToDoubleFunction;
Expand Down

0 comments on commit 6eccf23

Please sign in to comment.