diff --git a/demo/word2vec/train_v2.py b/demo/word2vec/api_train_v2.py similarity index 76% rename from demo/word2vec/train_v2.py rename to demo/word2vec/api_train_v2.py index 7d952b446f9db..c0940f0e56eaf 100644 --- a/demo/word2vec/train_v2.py +++ b/demo/word2vec/api_train_v2.py @@ -1,27 +1,40 @@ +import gzip import math import paddle.v2 as paddle -dictsize = 1953 embsize = 32 hiddensize = 256 N = 5 def wordemb(inlayer): - wordemb = paddle.layer.table_projection( + wordemb = paddle.layer.embedding( input=inlayer, size=embsize, param_attr=paddle.attr.Param( name="_proj", initial_std=0.001, learning_rate=1, - l2_rate=0, )) + l2_rate=0, + sparse_update=True)) return wordemb def main(): - paddle.init(use_gpu=False, trainer_count=1) + # for local training + cluster_train = False + + if not cluster_train: + paddle.init(use_gpu=False, trainer_count=1) + else: + paddle.init( + use_gpu=False, + trainer_count=2, + port=7164, + ports_num=1, + ports_num_for_sparse=1, + num_gradient_servers=1) word_dict = paddle.dataset.imikolov.build_dict() dict_size = len(word_dict) firstword = paddle.layer.data( @@ -57,6 +70,9 @@ def main(): def event_handler(event): if isinstance(event, paddle.event.EndIteration): if event.batch_id % 100 == 0: + with gzip.open("batch-" + str(event.batch_id) + ".tar.gz", + 'w') as f: + trainer.save_parameter_to_tar(f) result = trainer.test( paddle.batch( paddle.dataset.imikolov.test(word_dict, N), 32)) @@ -65,11 +81,15 @@ def event_handler(event): result.metrics) cost = paddle.layer.classification_cost(input=predictword, label=nextword) + parameters = paddle.parameters.create(cost) - adam_optimizer = paddle.optimizer.Adam( + adagrad = paddle.optimizer.AdaGrad( learning_rate=3e-3, regularization=paddle.optimizer.L2Regularization(8e-4)) - trainer = paddle.trainer.SGD(cost, parameters, adam_optimizer) + trainer = paddle.trainer.SGD(cost, + parameters, + adagrad, + is_local=not cluster_train) trainer.train( paddle.batch(paddle.dataset.imikolov.train(word_dict, N), 32), num_passes=30, diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index c4f5dca26cc6a..d51204012171c 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -19,6 +19,7 @@ limitations under the License. */ #include #include #include +#include "paddle/gserver/gradientmachines/GradientMachine.h" #include "paddle/utils/Common.h" #include "paddle/utils/GlobalConstants.h" @@ -468,8 +469,10 @@ class Arguments { }; enum GradientMatchineCreateMode { - CREATE_MODE_NORMAL = 0, - CREATE_MODE_TESTING = 4 + CREATE_MODE_NORMAL = paddle::GradientMachine::kNormal, + CREATE_MODE_SGD_SPARSE_CPU_TRAINING = + paddle::GradientMachine::kSgdSparseCpuTraining, + CREATE_MODE_TESTING = paddle::GradientMachine::kTesting }; struct ParameterConfigPrivate; @@ -817,7 +820,8 @@ class ParameterUpdater { public: static ParameterUpdater* createLocalUpdater(OptimizationConfig* config); static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config, - int passCount); + int passCount, + bool useSparseUpdater); ~ParameterUpdater(); /** @@ -855,6 +859,13 @@ class ParameterUpdater { */ void update(Parameter* param); + /** + * @breif only get required sparse rows by default. + * @param fullSize: get full matrix parameter if *fullSize* set + * @param apply: get PARAMETER_APPLY on pserver if *apply* set + */ + void getParametersRemote(bool fullSize = false, bool apply = false); + /** * @brief restore the average parameter. * @note It is only used in AverageOptimizer. Restore will get the current diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index 75b0ae7cb6cc8..79921ea6e787f 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -29,10 +29,22 @@ ParameterUpdater *ParameterUpdater::createLocalUpdater( } ParameterUpdater *ParameterUpdater::createRemoteUpdater( - OptimizationConfig *config, int passCount) { + OptimizationConfig *config, int passCount, bool useSparseUpdater) { auto updater = new ParameterUpdater(); - updater->m->updater.reset(new paddle::RemoteParameterUpdater( - config->m->getConfig(), passCount, nullptr)); + auto remoteUpdater = new paddle::RemoteParameterUpdater( + config->m->getConfig(), passCount, nullptr); + if (useSparseUpdater) { + std::unique_ptr remoteUpdaterPtr(remoteUpdater); + auto sparseRemoteUpdater = + new paddle::SparseRemoteParameterUpdaterComposite( + config->m->getConfig(), + passCount, + false, + std::move(remoteUpdaterPtr)); + updater->m->updater.reset(sparseRemoteUpdater); + } else { + updater->m->updater.reset(remoteUpdater); + } return updater; } @@ -59,6 +71,10 @@ void ParameterUpdater::update(Parameter *param) { m->updater->update(paddleParam); } +void ParameterUpdater::getParametersRemote(bool fullSize, bool apply) { + m->updater->getParametersRemote(fullSize, apply); +} + void ParameterUpdater::restore() { m->updater->restore(); } void ParameterUpdater::apply() { m->updater->apply(); } diff --git a/paddle/gserver/gradientmachines/MultiGradientMachine.cpp b/paddle/gserver/gradientmachines/MultiGradientMachine.cpp index 6ae60102b3e43..3159026e6b923 100644 --- a/paddle/gserver/gradientmachines/MultiGradientMachine.cpp +++ b/paddle/gserver/gradientmachines/MultiGradientMachine.cpp @@ -518,7 +518,7 @@ void TrainerThread::computeThread() { backward(); break; case MultiGradientMachine::TASK_COPY_IN_ARGS: - copyInArgs(); + batchSize_ = copyInArgs(); inArgsCopied_ = true; multiMachine_->waitForCopyInArgs(); break; diff --git a/python/paddle/v2/optimizer.py b/python/paddle/v2/optimizer.py index 1a01d95c205c0..17c56a2b9936e 100644 --- a/python/paddle/v2/optimizer.py +++ b/python/paddle/v2/optimizer.py @@ -38,12 +38,35 @@ def enable_types(self): assert isinstance(tmp, swig_api.ParameterOptimizer) return tmp.getParameterTypes() - def create_local_updater(self): + def __create_local_updater__(self): return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__) - def create_remote_updater(self, pass_num): - return swig_api.ParameterUpdater.createRemoteUpdater(self.__opt_conf__, - pass_num) + def __create_remote_updater__(self, pass_num, use_sparse_updater): + return swig_api.ParameterUpdater.createRemoteUpdater( + self.__opt_conf__, pass_num, use_sparse_updater) + + def create_updater(self, is_local, num_passes, use_sparse_updater): + """ + create proper parameter_updater by configuration. + :param is_local: create local or remote parameter updater + :param num_passes: remote parameter updater will use this to config + parameter server. + :param use_sparse_updater: when use remote updater, if some parameter is + sparse, updater should do some extra thing: + + .. code-block:: python + + if use_sparse_remote_updater: + gradient_machine.prefetch(in_args) + parameter_updater.getParametersRemote() + :return: parameter_updater + """ + if is_local: + parameter_updater = self.__create_local_updater__() + else: + parameter_updater = self.__create_remote_updater__( + num_passes, use_sparse_updater) + return parameter_updater class Momentum(Optimizer): diff --git a/python/paddle/v2/topology.py b/python/paddle/v2/topology.py index 737b6bf1e2eb6..ff28c85c53dc8 100644 --- a/python/paddle/v2/topology.py +++ b/python/paddle/v2/topology.py @@ -73,6 +73,18 @@ def __check__(layers): assert isinstance(self.__model_config__, ModelConfig) + def use_sparse_updater(self): + """ + check if any parameter require to use sparse_update + :return: + """ + use_sparse = False + for parameter in self.__model_config__.parameters: + if parameter.sparse_update or parameter.sparse_remote_update: + use_sparse = True + break + return use_sparse + def proto(self): return self.__model_config__ diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index f5797a86c2b71..6a83ba8533cd6 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -1,4 +1,6 @@ import collections +import gzip +import os import py_paddle.swig_paddle as api @@ -42,7 +44,12 @@ class SGD(object): :type extra_layers: paddle.v2.config_base.Layer """ - def __init__(self, cost, parameters, update_equation, extra_layers=None): + def __init__(self, + cost, + parameters, + update_equation, + extra_layers=None, + is_local=True): if not isinstance(parameters, v2_parameters.Parameters): raise TypeError('parameters should be parameters') @@ -55,20 +62,48 @@ def __init__(self, cost, parameters, update_equation, extra_layers=None): self.__topology__ = topology self.__parameters__ = parameters self.__topology_in_proto__ = topology.proto() + self.__is_local__ = is_local - # In local mode, disable sparse_remote_update. - for param in self.__topology_in_proto__.parameters: - if param.sparse_remote_update: - param.sparse_remote_update = False + self.__use_sparse_updater__ = self.__topology__.use_sparse_updater() + # # In local mode, disable sparse_remote_update. + if is_local: + for param in self.__topology_in_proto__.parameters: + if param.sparse_remote_update: + param.sparse_remote_update = False + self.__gm_create_mode__ = api.CREATE_MODE_NORMAL if not \ + self.__use_sparse_updater__ else api.CREATE_MODE_SGD_SPARSE_CPU_TRAINING self.__data_types__ = topology.data_type() gm = api.GradientMachine.createFromConfigProto( - self.__topology_in_proto__, api.CREATE_MODE_NORMAL, + self.__topology_in_proto__, self.__gm_create_mode__, self.__optimizer__.enable_types()) assert isinstance(gm, api.GradientMachine) self.__gradient_machine__ = gm self.__gradient_machine__.randParameters() - parameters.append_gradient_machine(gm) + self.__parameters__.append_gradient_machine(gm) + self.__parameter_updater__ = None + + def __use_remote_sparse_updater__(self): + return self.__use_sparse_updater__ and not self.__is_local__ + + def __prepare_parameter__(self, in_args): + """ + prepare parameter before forward backward. + 1. When use remote sparse updater, parameters should be got + from ps according to input arguments. + :param in_args: input arguments of this batch. + :return: + """ + if self.__use_remote_sparse_updater__(): + self.__gradient_machine__.prefetch(in_args) + self.__parameter_updater__.getParametersRemote() + + def save_parameter_to_tar(self, f): + self.__parameter_updater__.catchUpWith() + self.__parameter_updater__.apply() + self.__parameter_updater__.getParametersRemote(True, True) + self.__parameters__.to_tar(f) + self.__parameter_updater__.restore() def train(self, reader, num_passes=1, event_handler=None, feeding=None): """ @@ -88,8 +123,9 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): event_handler = default_event_handler __check_train_args__(**locals()) - updater = self.__optimizer__.create_local_updater() - updater.init(self.__gradient_machine__) + self.__parameter_updater__ = self.__optimizer__.create_updater( + self.__is_local__, num_passes, self.__use_sparse_updater__) + self.__parameter_updater__.init(self.__gradient_machine__) self.__gradient_machine__.start() batch_evaluator = self.__gradient_machine__.makeEvaluator() @@ -101,23 +137,26 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): for pass_id in xrange(num_passes): event_handler(v2_event.BeginPass(pass_id)) pass_evaluator.start() - updater.startPass() + self.__parameter_updater__.startPass() for batch_id, data_batch in enumerate(reader()): batch_evaluator.start() event_handler( v2_event.BeginIteration( pass_id=pass_id, batch_id=batch_id)) - pass_type = updater.startBatch(len(data_batch)) - self.__gradient_machine__.forwardBackward( - feeder(data_batch), out_args, pass_type) + pass_type = self.__parameter_updater__.startBatch( + len(data_batch)) + in_args = feeder(data_batch) + self.__prepare_parameter__(in_args) + self.__gradient_machine__.forwardBackward(in_args, out_args, + pass_type) self.__gradient_machine__.eval(pass_evaluator) self.__gradient_machine__.eval(batch_evaluator) for each_param in self.__gradient_machine__.getNonStaticParameters( ): - updater.update(each_param) + self.__parameter_updater__.update(each_param) cost_sum = out_args.sum() cost = cost_sum / len(data_batch) - updater.finishBatch(cost) + self.__parameter_updater__.finishBatch(cost) batch_evaluator.finish() event_handler( v2_event.EndIteration( @@ -126,7 +165,7 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): cost=cost, evaluator=batch_evaluator)) - updater.finishPass() + self.__parameter_updater__.finishPass() pass_evaluator.finish() event_handler(v2_event.EndPass(pass_id, evaluator=pass_evaluator)) self.__gradient_machine__.finish() @@ -140,8 +179,9 @@ def test(self, reader, feeding=None): num_samples = 0.0 for data_batch in reader(): num_samples += len(data_batch) - self.__gradient_machine__.forward( - feeder(data_batch), out_args, api.PASS_TEST) + in_args = feeder(data_batch) + self.__prepare_parameter__(in_args) + self.__gradient_machine__.forward(in_args, out_args, api.PASS_TEST) total_cost += out_args.sum() self.__gradient_machine__.eval(evaluator)