From 6802b65cd203f3302263e6f11156d693bac58b47 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Fri, 14 Apr 2017 00:20:55 +0800 Subject: [PATCH 01/15] init support remote updater --- paddle/api/PaddleAPI.h | 1 + python/paddle/v2/topology.py | 10 ++++++++++ python/paddle/v2/trainer.py | 30 +++++++++++++++++++++--------- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index c4f5dca26cc6a..f5ead40682c69 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -469,6 +469,7 @@ class Arguments { enum GradientMatchineCreateMode { CREATE_MODE_NORMAL = 0, + CREATE_MODE_SGD_SPARSE_CPU_TRAINING = 3, CREATE_MODE_TESTING = 4 }; diff --git a/python/paddle/v2/topology.py b/python/paddle/v2/topology.py index 737b6bf1e2eb6..86e7549e97201 100644 --- a/python/paddle/v2/topology.py +++ b/python/paddle/v2/topology.py @@ -73,6 +73,16 @@ def __check__(layers): assert isinstance(self.__model_config__, ModelConfig) + def use_sparse_updater(self): + """ + check if any parameter require to use sparse_update + :return: + """ + for parameter in self.__model_config__.parameters: + if parameter.sparse_update or parameter.sparse_remote_update: + return True + return False + def proto(self): return self.__model_config__ diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index f5797a86c2b71..2dac95b63d550 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -42,7 +42,7 @@ class SGD(object): :type extra_layers: paddle.v2.config_base.Layer """ - def __init__(self, cost, parameters, update_equation, extra_layers=None): + def __init__(self, cost, parameters, update_equation, extra_layers=None, is_local=True): if not isinstance(parameters, v2_parameters.Parameters): raise TypeError('parameters should be parameters') @@ -55,15 +55,21 @@ def __init__(self, cost, parameters, update_equation, extra_layers=None): self.__topology__ = topology self.__parameters__ = parameters self.__topology_in_proto__ = topology.proto() - - # In local mode, disable sparse_remote_update. - for param in self.__topology_in_proto__.parameters: - if param.sparse_remote_update: - param.sparse_remote_update = False - + self.__is_local__ = is_local + + self.__use_sparse_updater__ = self.__topology__.use_sparse_updater() + # # In local mode, disable sparse_remote_update. + if is_local: + self.__use_sparse_updater__ = False + for param in self.__topology_in_proto__.parameters: + if param.sparse_remote_update: + param.sparse_remote_update = False + + self.__gm_create_mode__ = api.CREATE_MODE_NORMAL if not \ + self.__use_sparse_updater__ else api.CREATE_MODE_SGD_SPARSE_CPU_TRAINING self.__data_types__ = topology.data_type() gm = api.GradientMachine.createFromConfigProto( - self.__topology_in_proto__, api.CREATE_MODE_NORMAL, + self.__topology_in_proto__, self.__gm_create_mode__, self.__optimizer__.enable_types()) assert isinstance(gm, api.GradientMachine) self.__gradient_machine__ = gm @@ -88,7 +94,10 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): event_handler = default_event_handler __check_train_args__(**locals()) - updater = self.__optimizer__.create_local_updater() + if self.__is_local__: + updater = self.__optimizer__.create_local_updater() + else: + updater = self.__optimizer__.create_remote_updater(num_passes) updater.init(self.__gradient_machine__) self.__gradient_machine__.start() @@ -108,6 +117,9 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): v2_event.BeginIteration( pass_id=pass_id, batch_id=batch_id)) pass_type = updater.startBatch(len(data_batch)) + if self.__use_sparse_updater__: + self.__gradient_machine__.prefetch(feeder(data_batch)) + updater.getParametersRemote() self.__gradient_machine__.forwardBackward( feeder(data_batch), out_args, pass_type) self.__gradient_machine__.eval(pass_evaluator) From bad503ff08e36f6af19b8e7203cf0ce3507bd80d Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Fri, 14 Apr 2017 23:33:29 +0800 Subject: [PATCH 02/15] support RemoteSparseUpdater --- paddle/api/PaddleAPI.h | 9 +++++---- paddle/api/ParameterUpdater.cpp | 15 ++++++++++++--- python/paddle/v2/optimizer.py | 4 ++-- python/paddle/v2/trainer.py | 3 ++- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index f5ead40682c69..c8800519bd2fc 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -21,6 +21,7 @@ limitations under the License. */ #include #include "paddle/utils/Common.h" #include "paddle/utils/GlobalConstants.h" +#include "paddle/gserver/gradientmachines/GradientMachine.h" /// Import PaddlePaddle's enumeration into global namespace. using namespace paddle::enumeration_wrapper; // NOLINT @@ -468,9 +469,9 @@ class Arguments { }; enum GradientMatchineCreateMode { - CREATE_MODE_NORMAL = 0, - CREATE_MODE_SGD_SPARSE_CPU_TRAINING = 3, - CREATE_MODE_TESTING = 4 + CREATE_MODE_NORMAL = paddle::GradientMachine::kNormal, + CREATE_MODE_SGD_SPARSE_CPU_TRAINING = paddle::GradientMachine::kSgdSparseCpuTraining, + CREATE_MODE_TESTING = paddle::GradientMachine::kTesting }; struct ParameterConfigPrivate; @@ -818,7 +819,7 @@ class ParameterUpdater { public: static ParameterUpdater* createLocalUpdater(OptimizationConfig* config); static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config, - int passCount); + int passCount, bool userSparseUpdater); ~ParameterUpdater(); /** diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index 75b0ae7cb6cc8..e96ccc928549d 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -29,10 +29,19 @@ ParameterUpdater *ParameterUpdater::createLocalUpdater( } ParameterUpdater *ParameterUpdater::createRemoteUpdater( - OptimizationConfig *config, int passCount) { + OptimizationConfig *config, int passCount, bool userSparseUpdater) { auto updater = new ParameterUpdater(); - updater->m->updater.reset(new paddle::RemoteParameterUpdater( - config->m->getConfig(), passCount, nullptr)); + auto remoteUpdater = new paddle::RemoteParameterUpdater( + config->m->getConfig(), passCount, nullptr); + if (userSparseUpdater) { + std::unique_ptr remoteUpdaterPtr; + remoteUpdaterPtr.reset(remoteUpdater); + auto sparseRemoteUpdater = new paddle::SparseRemoteParameterUpdaterComposite( + config->m->getConfig(), passCount, false, std::move(remoteUpdaterPtr)); + updater->m->updater.reset(sparseRemoteUpdater); + } else { + updater->m->updater.reset(remoteUpdater); + } return updater; } diff --git a/python/paddle/v2/optimizer.py b/python/paddle/v2/optimizer.py index 1a01d95c205c0..6fefd7b2f2413 100644 --- a/python/paddle/v2/optimizer.py +++ b/python/paddle/v2/optimizer.py @@ -41,9 +41,9 @@ def enable_types(self): def create_local_updater(self): return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__) - def create_remote_updater(self, pass_num): + def create_remote_updater(self, pass_num, use_sparse_updater): return swig_api.ParameterUpdater.createRemoteUpdater(self.__opt_conf__, - pass_num) + pass_num, use_sparse_updater) class Momentum(Optimizer): diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 2dac95b63d550..dc23eb5b0d74a 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -97,7 +97,8 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): if self.__is_local__: updater = self.__optimizer__.create_local_updater() else: - updater = self.__optimizer__.create_remote_updater(num_passes) + updater = self.__optimizer__.create_remote_updater(num_passes, + self.__use_sparse_updater__) updater.init(self.__gradient_machine__) self.__gradient_machine__.start() From 64bfd8147fc574466f7b5972de926ed0cec00f66 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sat, 15 Apr 2017 17:13:34 +0800 Subject: [PATCH 03/15] fix style probelm --- paddle/api/PaddleAPI.h | 8 +++++--- paddle/api/ParameterUpdater.cpp | 10 +++++++--- paddle/function/BufferArgTest.cpp | 2 +- paddle/function/FunctionTest.cpp | 2 +- paddle/function/TensorShapeTest.cpp | 2 +- paddle/function/TensorTypeTest.cpp | 2 +- python/paddle/v2/optimizer.py | 4 ++-- python/paddle/v2/trainer.py | 11 ++++++++--- 8 files changed, 26 insertions(+), 15 deletions(-) diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index c8800519bd2fc..725328ce4d29b 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -19,9 +19,9 @@ limitations under the License. */ #include #include #include +#include "paddle/gserver/gradientmachines/GradientMachine.h" #include "paddle/utils/Common.h" #include "paddle/utils/GlobalConstants.h" -#include "paddle/gserver/gradientmachines/GradientMachine.h" /// Import PaddlePaddle's enumeration into global namespace. using namespace paddle::enumeration_wrapper; // NOLINT @@ -470,7 +470,8 @@ class Arguments { enum GradientMatchineCreateMode { CREATE_MODE_NORMAL = paddle::GradientMachine::kNormal, - CREATE_MODE_SGD_SPARSE_CPU_TRAINING = paddle::GradientMachine::kSgdSparseCpuTraining, + CREATE_MODE_SGD_SPARSE_CPU_TRAINING = + paddle::GradientMachine::kSgdSparseCpuTraining, CREATE_MODE_TESTING = paddle::GradientMachine::kTesting }; @@ -819,7 +820,8 @@ class ParameterUpdater { public: static ParameterUpdater* createLocalUpdater(OptimizationConfig* config); static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config, - int passCount, bool userSparseUpdater); + int passCount, + bool userSparseUpdater); ~ParameterUpdater(); /** diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index e96ccc928549d..708379ded5b74 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -32,12 +32,16 @@ ParameterUpdater *ParameterUpdater::createRemoteUpdater( OptimizationConfig *config, int passCount, bool userSparseUpdater) { auto updater = new ParameterUpdater(); auto remoteUpdater = new paddle::RemoteParameterUpdater( - config->m->getConfig(), passCount, nullptr); + config->m->getConfig(), passCount, nullptr); if (userSparseUpdater) { std::unique_ptr remoteUpdaterPtr; remoteUpdaterPtr.reset(remoteUpdater); - auto sparseRemoteUpdater = new paddle::SparseRemoteParameterUpdaterComposite( - config->m->getConfig(), passCount, false, std::move(remoteUpdaterPtr)); + auto sparseRemoteUpdater = + new paddle::SparseRemoteParameterUpdaterComposite( + config->m->getConfig(), + passCount, + false, + std::move(remoteUpdaterPtr)); updater->m->updater.reset(sparseRemoteUpdater); } else { updater->m->updater.reset(remoteUpdater); diff --git a/paddle/function/BufferArgTest.cpp b/paddle/function/BufferArgTest.cpp index 1744f377808f1..f1a234ab1a106 100644 --- a/paddle/function/BufferArgTest.cpp +++ b/paddle/function/BufferArgTest.cpp @@ -12,8 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "BufferArg.h" #include +#include "BufferArg.h" #include "paddle/math/MemoryHandle.h" namespace paddle { diff --git a/paddle/function/FunctionTest.cpp b/paddle/function/FunctionTest.cpp index fdf7e631e5ab8..f9ea7c7e4f6ed 100644 --- a/paddle/function/FunctionTest.cpp +++ b/paddle/function/FunctionTest.cpp @@ -12,8 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "Function.h" #include +#include "Function.h" #include "paddle/math/SparseMatrix.h" namespace paddle { diff --git a/paddle/function/TensorShapeTest.cpp b/paddle/function/TensorShapeTest.cpp index 45a2e106e7fc3..e19afe0c4d594 100644 --- a/paddle/function/TensorShapeTest.cpp +++ b/paddle/function/TensorShapeTest.cpp @@ -12,8 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "TensorShape.h" #include +#include "TensorShape.h" namespace paddle { diff --git a/paddle/function/TensorTypeTest.cpp b/paddle/function/TensorTypeTest.cpp index e50e46f3e9911..5b5c504ae2a33 100644 --- a/paddle/function/TensorTypeTest.cpp +++ b/paddle/function/TensorTypeTest.cpp @@ -12,8 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "TensorType.h" #include +#include "TensorType.h" namespace paddle { diff --git a/python/paddle/v2/optimizer.py b/python/paddle/v2/optimizer.py index 6fefd7b2f2413..7bac1ea3b989a 100644 --- a/python/paddle/v2/optimizer.py +++ b/python/paddle/v2/optimizer.py @@ -42,8 +42,8 @@ def create_local_updater(self): return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__) def create_remote_updater(self, pass_num, use_sparse_updater): - return swig_api.ParameterUpdater.createRemoteUpdater(self.__opt_conf__, - pass_num, use_sparse_updater) + return swig_api.ParameterUpdater.createRemoteUpdater( + self.__opt_conf__, pass_num, use_sparse_updater) class Momentum(Optimizer): diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index dc23eb5b0d74a..80f243b4137d3 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -42,7 +42,12 @@ class SGD(object): :type extra_layers: paddle.v2.config_base.Layer """ - def __init__(self, cost, parameters, update_equation, extra_layers=None, is_local=True): + def __init__(self, + cost, + parameters, + update_equation, + extra_layers=None, + is_local=True): if not isinstance(parameters, v2_parameters.Parameters): raise TypeError('parameters should be parameters') @@ -97,8 +102,8 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): if self.__is_local__: updater = self.__optimizer__.create_local_updater() else: - updater = self.__optimizer__.create_remote_updater(num_passes, - self.__use_sparse_updater__) + updater = self.__optimizer__.create_remote_updater( + num_passes, self.__use_sparse_updater__) updater.init(self.__gradient_machine__) self.__gradient_machine__.start() From 8210350819cbaa83e8ac0a541d14d0819d9a1e34 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sat, 15 Apr 2017 23:39:47 +0800 Subject: [PATCH 04/15] add getParametersRemote for ParameterUpdater in api --- paddle/api/PaddleAPI.h | 7 +++++++ paddle/api/ParameterUpdater.cpp | 4 ++++ python/paddle/v2/topology.py | 6 ++++-- python/paddle/v2/trainer.py | 21 ++++++++++----------- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index 725328ce4d29b..be6be556a7384 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -859,6 +859,13 @@ class ParameterUpdater { */ void update(Parameter* param); + /** + * @breif only get required sparse rows by default. + * @param fullSize: get full matrix parameter if *fullSize* set + * @param apply: get PARAMETER_APPLY on pserver if *apply* set + */ + void getParametersRemote(bool fullSize = false, bool apply = false); + /** * @brief restore the average parameter. * @note It is only used in AverageOptimizer. Restore will get the current diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index 708379ded5b74..ce2ac33d44970 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -72,6 +72,10 @@ void ParameterUpdater::update(Parameter *param) { m->updater->update(paddleParam); } +void ParameterUpdater::getParametersRemote(bool fullSize, bool apply) { + m->updater->getParametersRemote(fullSize, apply); +} + void ParameterUpdater::restore() { m->updater->restore(); } void ParameterUpdater::apply() { m->updater->apply(); } diff --git a/python/paddle/v2/topology.py b/python/paddle/v2/topology.py index 86e7549e97201..ff28c85c53dc8 100644 --- a/python/paddle/v2/topology.py +++ b/python/paddle/v2/topology.py @@ -78,10 +78,12 @@ def use_sparse_updater(self): check if any parameter require to use sparse_update :return: """ + use_sparse = False for parameter in self.__model_config__.parameters: if parameter.sparse_update or parameter.sparse_remote_update: - return True - return False + use_sparse = True + break + return use_sparse def proto(self): return self.__model_config__ diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 80f243b4137d3..30fc2a0886ba7 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -65,7 +65,6 @@ def __init__(self, self.__use_sparse_updater__ = self.__topology__.use_sparse_updater() # # In local mode, disable sparse_remote_update. if is_local: - self.__use_sparse_updater__ = False for param in self.__topology_in_proto__.parameters: if param.sparse_remote_update: param.sparse_remote_update = False @@ -100,11 +99,11 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): __check_train_args__(**locals()) if self.__is_local__: - updater = self.__optimizer__.create_local_updater() + parameter_updater = self.__optimizer__.create_local_updater() else: - updater = self.__optimizer__.create_remote_updater( + parameter_updater = self.__optimizer__.create_remote_updater( num_passes, self.__use_sparse_updater__) - updater.init(self.__gradient_machine__) + parameter_updater.init(self.__gradient_machine__) self.__gradient_machine__.start() batch_evaluator = self.__gradient_machine__.makeEvaluator() @@ -116,26 +115,26 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): for pass_id in xrange(num_passes): event_handler(v2_event.BeginPass(pass_id)) pass_evaluator.start() - updater.startPass() + parameter_updater.startPass() for batch_id, data_batch in enumerate(reader()): batch_evaluator.start() event_handler( v2_event.BeginIteration( pass_id=pass_id, batch_id=batch_id)) - pass_type = updater.startBatch(len(data_batch)) - if self.__use_sparse_updater__: + pass_type = parameter_updater.startBatch(len(data_batch)) + if self.__use_sparse_updater__ and not self.__is_local__: self.__gradient_machine__.prefetch(feeder(data_batch)) - updater.getParametersRemote() + parameter_updater.getParametersRemote() self.__gradient_machine__.forwardBackward( feeder(data_batch), out_args, pass_type) self.__gradient_machine__.eval(pass_evaluator) self.__gradient_machine__.eval(batch_evaluator) for each_param in self.__gradient_machine__.getNonStaticParameters( ): - updater.update(each_param) + parameter_updater.update(each_param) cost_sum = out_args.sum() cost = cost_sum / len(data_batch) - updater.finishBatch(cost) + parameter_updater.finishBatch(cost) batch_evaluator.finish() event_handler( v2_event.EndIteration( @@ -144,7 +143,7 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): cost=cost, evaluator=batch_evaluator)) - updater.finishPass() + parameter_updater.finishPass() pass_evaluator.finish() event_handler(v2_event.EndPass(pass_id, evaluator=pass_evaluator)) self.__gradient_machine__.finish() From f6c5b6fd4602c84882da86c32111391d70dfd8bd Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sun, 16 Apr 2017 15:56:09 +0800 Subject: [PATCH 05/15] add prefetch for trainer.test --- python/paddle/v2/trainer.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 30fc2a0886ba7..c1f964a8106d0 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -79,6 +79,10 @@ def __init__(self, self.__gradient_machine__ = gm self.__gradient_machine__.randParameters() parameters.append_gradient_machine(gm) + self.__parameter_updater__ = None + + def use_remote_sparse_updater(self): + return self.__use_sparse_updater__ and not self.__is_local__ def train(self, reader, num_passes=1, event_handler=None, feeding=None): """ @@ -103,6 +107,7 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): else: parameter_updater = self.__optimizer__.create_remote_updater( num_passes, self.__use_sparse_updater__) + self.__parameter_updater__ = parameter_updater parameter_updater.init(self.__gradient_machine__) self.__gradient_machine__.start() @@ -122,11 +127,12 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): v2_event.BeginIteration( pass_id=pass_id, batch_id=batch_id)) pass_type = parameter_updater.startBatch(len(data_batch)) - if self.__use_sparse_updater__ and not self.__is_local__: - self.__gradient_machine__.prefetch(feeder(data_batch)) + in_args = feeder(data_batch) + if self.use_remote_sparse_updater(): + self.__gradient_machine__.prefetch(in_args) parameter_updater.getParametersRemote() - self.__gradient_machine__.forwardBackward( - feeder(data_batch), out_args, pass_type) + self.__gradient_machine__.forwardBackward(in_args, out_args, + pass_type) self.__gradient_machine__.eval(pass_evaluator) self.__gradient_machine__.eval(batch_evaluator) for each_param in self.__gradient_machine__.getNonStaticParameters( @@ -157,8 +163,11 @@ def test(self, reader, feeding=None): num_samples = 0.0 for data_batch in reader(): num_samples += len(data_batch) - self.__gradient_machine__.forward( - feeder(data_batch), out_args, api.PASS_TEST) + in_args = feeder(data_batch) + if self.use_remote_sparse_updater(): + self.__gradient_machine__.prefetch(in_args) + self.__parameter_updater__.getParametersRemote() + self.__gradient_machine__.forward(in_args, out_args, api.PASS_TEST) total_cost += out_args.sum() self.__gradient_machine__.eval(evaluator) From ea25eef375c33d01e2f28f875dbcb5650596466d Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sun, 16 Apr 2017 16:16:17 +0800 Subject: [PATCH 06/15] word2vec demo support sparse remote update --- .../word2vec/{train_v2.py => api_train_v2.py} | 28 +++++++++++++++---- paddle/api/PaddleAPI.h | 2 +- paddle/api/ParameterUpdater.cpp | 4 +-- 3 files changed, 25 insertions(+), 9 deletions(-) rename demo/word2vec/{train_v2.py => api_train_v2.py} (80%) diff --git a/demo/word2vec/train_v2.py b/demo/word2vec/api_train_v2.py similarity index 80% rename from demo/word2vec/train_v2.py rename to demo/word2vec/api_train_v2.py index 7d952b446f9db..eb61a7250fb84 100644 --- a/demo/word2vec/train_v2.py +++ b/demo/word2vec/api_train_v2.py @@ -2,26 +2,38 @@ import paddle.v2 as paddle -dictsize = 1953 embsize = 32 hiddensize = 256 N = 5 def wordemb(inlayer): - wordemb = paddle.layer.table_projection( + wordemb = paddle.layer.embedding( input=inlayer, size=embsize, param_attr=paddle.attr.Param( name="_proj", initial_std=0.001, learning_rate=1, - l2_rate=0, )) + l2_rate=0, + sparse_update=True)) return wordemb def main(): - paddle.init(use_gpu=False, trainer_count=1) + # for local training + cluster_train = False + + if not cluster_train: + paddle.init(use_gpu=False, trainer_count=1) + else: + paddle.init( + use_gpu=False, + trainer_count=1, + port=7164, + ports_num=1, + ports_num_for_sparse=1, + num_gradient_servers=1) word_dict = paddle.dataset.imikolov.build_dict() dict_size = len(word_dict) firstword = paddle.layer.data( @@ -65,11 +77,15 @@ def event_handler(event): result.metrics) cost = paddle.layer.classification_cost(input=predictword, label=nextword) + parameters = paddle.parameters.create(cost) - adam_optimizer = paddle.optimizer.Adam( + adagrad = paddle.optimizer.AdaGrad( learning_rate=3e-3, regularization=paddle.optimizer.L2Regularization(8e-4)) - trainer = paddle.trainer.SGD(cost, parameters, adam_optimizer) + trainer = paddle.trainer.SGD(cost, + parameters, + adagrad, + is_local=not cluster_train) trainer.train( paddle.batch(paddle.dataset.imikolov.train(word_dict, N), 32), num_passes=30, diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index be6be556a7384..d51204012171c 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -821,7 +821,7 @@ class ParameterUpdater { static ParameterUpdater* createLocalUpdater(OptimizationConfig* config); static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config, int passCount, - bool userSparseUpdater); + bool useSparseUpdater); ~ParameterUpdater(); /** diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index ce2ac33d44970..9dfd12ccbe777 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -29,11 +29,11 @@ ParameterUpdater *ParameterUpdater::createLocalUpdater( } ParameterUpdater *ParameterUpdater::createRemoteUpdater( - OptimizationConfig *config, int passCount, bool userSparseUpdater) { + OptimizationConfig *config, int passCount, bool useSparseUpdater) { auto updater = new ParameterUpdater(); auto remoteUpdater = new paddle::RemoteParameterUpdater( config->m->getConfig(), passCount, nullptr); - if (userSparseUpdater) { + if (useSparseUpdater) { std::unique_ptr remoteUpdaterPtr; remoteUpdaterPtr.reset(remoteUpdater); auto sparseRemoteUpdater = From 6295f2d6db2ed81bc7a1ade9992083a1ad42640a Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sun, 16 Apr 2017 20:23:16 +0800 Subject: [PATCH 07/15] fix style problem --- paddle/function/BufferArgTest.cpp | 2 +- paddle/function/FunctionTest.cpp | 2 +- paddle/function/TensorShapeTest.cpp | 2 +- paddle/function/TensorTypeTest.cpp | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paddle/function/BufferArgTest.cpp b/paddle/function/BufferArgTest.cpp index f1a234ab1a106..1744f377808f1 100644 --- a/paddle/function/BufferArgTest.cpp +++ b/paddle/function/BufferArgTest.cpp @@ -12,8 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include #include "BufferArg.h" +#include #include "paddle/math/MemoryHandle.h" namespace paddle { diff --git a/paddle/function/FunctionTest.cpp b/paddle/function/FunctionTest.cpp index f9ea7c7e4f6ed..fdf7e631e5ab8 100644 --- a/paddle/function/FunctionTest.cpp +++ b/paddle/function/FunctionTest.cpp @@ -12,8 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include #include "Function.h" +#include #include "paddle/math/SparseMatrix.h" namespace paddle { diff --git a/paddle/function/TensorShapeTest.cpp b/paddle/function/TensorShapeTest.cpp index e19afe0c4d594..45a2e106e7fc3 100644 --- a/paddle/function/TensorShapeTest.cpp +++ b/paddle/function/TensorShapeTest.cpp @@ -12,8 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include #include "TensorShape.h" +#include namespace paddle { diff --git a/paddle/function/TensorTypeTest.cpp b/paddle/function/TensorTypeTest.cpp index 5b5c504ae2a33..e50e46f3e9911 100644 --- a/paddle/function/TensorTypeTest.cpp +++ b/paddle/function/TensorTypeTest.cpp @@ -12,8 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include #include "TensorType.h" +#include namespace paddle { From cfff9467ac4df2731c3aadaf4060f93c49a885e8 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 17 Apr 2017 10:08:45 +0800 Subject: [PATCH 08/15] optimizer parameter_updater --- python/paddle/v2/optimizer.py | 8 ++++++++ python/paddle/v2/trainer.py | 23 ++++++++++------------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/python/paddle/v2/optimizer.py b/python/paddle/v2/optimizer.py index 7bac1ea3b989a..887b2567a14fe 100644 --- a/python/paddle/v2/optimizer.py +++ b/python/paddle/v2/optimizer.py @@ -45,6 +45,14 @@ def create_remote_updater(self, pass_num, use_sparse_updater): return swig_api.ParameterUpdater.createRemoteUpdater( self.__opt_conf__, pass_num, use_sparse_updater) + def create_updater(self, is_local, num_passes, use_sparse_updater): + if is_local: + parameter_updater = self.create_local_updater() + else: + parameter_updater = self.create_remote_updater(num_passes, + use_sparse_updater) + return parameter_updater + class Momentum(Optimizer): def __init__(self, momentum=None, sparse=False, **kwargs): diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index c1f964a8106d0..9caaeca2efe15 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -102,13 +102,9 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): event_handler = default_event_handler __check_train_args__(**locals()) - if self.__is_local__: - parameter_updater = self.__optimizer__.create_local_updater() - else: - parameter_updater = self.__optimizer__.create_remote_updater( - num_passes, self.__use_sparse_updater__) - self.__parameter_updater__ = parameter_updater - parameter_updater.init(self.__gradient_machine__) + self.__parameter_updater__ = self.__optimizer__.create_updater( + self.__is_local__, num_passes, self.__use_sparse_updater__) + self.__parameter_updater__.init(self.__gradient_machine__) self.__gradient_machine__.start() batch_evaluator = self.__gradient_machine__.makeEvaluator() @@ -120,27 +116,28 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): for pass_id in xrange(num_passes): event_handler(v2_event.BeginPass(pass_id)) pass_evaluator.start() - parameter_updater.startPass() + self.__parameter_updater__.startPass() for batch_id, data_batch in enumerate(reader()): batch_evaluator.start() event_handler( v2_event.BeginIteration( pass_id=pass_id, batch_id=batch_id)) - pass_type = parameter_updater.startBatch(len(data_batch)) + pass_type = self.__parameter_updater__.startBatch( + len(data_batch)) in_args = feeder(data_batch) if self.use_remote_sparse_updater(): self.__gradient_machine__.prefetch(in_args) - parameter_updater.getParametersRemote() + self.__parameter_updater__.getParametersRemote() self.__gradient_machine__.forwardBackward(in_args, out_args, pass_type) self.__gradient_machine__.eval(pass_evaluator) self.__gradient_machine__.eval(batch_evaluator) for each_param in self.__gradient_machine__.getNonStaticParameters( ): - parameter_updater.update(each_param) + self.__parameter_updater__.update(each_param) cost_sum = out_args.sum() cost = cost_sum / len(data_batch) - parameter_updater.finishBatch(cost) + self.__parameter_updater__.finishBatch(cost) batch_evaluator.finish() event_handler( v2_event.EndIteration( @@ -149,7 +146,7 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): cost=cost, evaluator=batch_evaluator)) - parameter_updater.finishPass() + self.__parameter_updater__.finishPass() pass_evaluator.finish() event_handler(v2_event.EndPass(pass_id, evaluator=pass_evaluator)) self.__gradient_machine__.finish() From cf86ca04b4682c0f1ecf24324ed3dcc7769cea63 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 19 Apr 2017 10:23:26 +0800 Subject: [PATCH 09/15] refine code --- paddle/api/ParameterUpdater.cpp | 3 +-- python/paddle/v2/optimizer.py | 25 ++++++++++++++++++++----- python/paddle/v2/trainer.py | 24 ++++++++++++++++-------- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index 9dfd12ccbe777..79921ea6e787f 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -34,8 +34,7 @@ ParameterUpdater *ParameterUpdater::createRemoteUpdater( auto remoteUpdater = new paddle::RemoteParameterUpdater( config->m->getConfig(), passCount, nullptr); if (useSparseUpdater) { - std::unique_ptr remoteUpdaterPtr; - remoteUpdaterPtr.reset(remoteUpdater); + std::unique_ptr remoteUpdaterPtr(remoteUpdater); auto sparseRemoteUpdater = new paddle::SparseRemoteParameterUpdaterComposite( config->m->getConfig(), diff --git a/python/paddle/v2/optimizer.py b/python/paddle/v2/optimizer.py index 887b2567a14fe..17c56a2b9936e 100644 --- a/python/paddle/v2/optimizer.py +++ b/python/paddle/v2/optimizer.py @@ -38,19 +38,34 @@ def enable_types(self): assert isinstance(tmp, swig_api.ParameterOptimizer) return tmp.getParameterTypes() - def create_local_updater(self): + def __create_local_updater__(self): return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__) - def create_remote_updater(self, pass_num, use_sparse_updater): + def __create_remote_updater__(self, pass_num, use_sparse_updater): return swig_api.ParameterUpdater.createRemoteUpdater( self.__opt_conf__, pass_num, use_sparse_updater) def create_updater(self, is_local, num_passes, use_sparse_updater): + """ + create proper parameter_updater by configuration. + :param is_local: create local or remote parameter updater + :param num_passes: remote parameter updater will use this to config + parameter server. + :param use_sparse_updater: when use remote updater, if some parameter is + sparse, updater should do some extra thing: + + .. code-block:: python + + if use_sparse_remote_updater: + gradient_machine.prefetch(in_args) + parameter_updater.getParametersRemote() + :return: parameter_updater + """ if is_local: - parameter_updater = self.create_local_updater() + parameter_updater = self.__create_local_updater__() else: - parameter_updater = self.create_remote_updater(num_passes, - use_sparse_updater) + parameter_updater = self.__create_remote_updater__( + num_passes, use_sparse_updater) return parameter_updater diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 9caaeca2efe15..552c6690a608f 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -78,12 +78,24 @@ def __init__(self, assert isinstance(gm, api.GradientMachine) self.__gradient_machine__ = gm self.__gradient_machine__.randParameters() - parameters.append_gradient_machine(gm) + self.__parameters__.append_gradient_machine(gm) self.__parameter_updater__ = None - def use_remote_sparse_updater(self): + def __use_remote_sparse_updater__(self): return self.__use_sparse_updater__ and not self.__is_local__ + def __prepare_parameter__(self, in_args): + """ + prepare parameter before forward backward. + 1. When use remote sparse updater, parameters should be got + from ps according to input arguments. + :param in_args: input arguments of this batch. + :return: + """ + if self.__use_remote_sparse_updater__(): + self.__gradient_machine__.prefetch(in_args) + self.__parameter_updater__.getParametersRemote() + def train(self, reader, num_passes=1, event_handler=None, feeding=None): """ Training method. Will train num_passes of input data. @@ -125,9 +137,7 @@ def train(self, reader, num_passes=1, event_handler=None, feeding=None): pass_type = self.__parameter_updater__.startBatch( len(data_batch)) in_args = feeder(data_batch) - if self.use_remote_sparse_updater(): - self.__gradient_machine__.prefetch(in_args) - self.__parameter_updater__.getParametersRemote() + self.__prepare_parameter__(in_args) self.__gradient_machine__.forwardBackward(in_args, out_args, pass_type) self.__gradient_machine__.eval(pass_evaluator) @@ -161,9 +171,7 @@ def test(self, reader, feeding=None): for data_batch in reader(): num_samples += len(data_batch) in_args = feeder(data_batch) - if self.use_remote_sparse_updater(): - self.__gradient_machine__.prefetch(in_args) - self.__parameter_updater__.getParametersRemote() + self.__prepare_parameter__(in_args) self.__gradient_machine__.forward(in_args, out_args, api.PASS_TEST) total_cost += out_args.sum() self.__gradient_machine__.eval(evaluator) From 956217887fee6403caec9f4bc047237c8f5b9fcc Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 19 Apr 2017 22:08:54 +0800 Subject: [PATCH 10/15] support save parameter in trainer --- demo/word2vec/api_train_v2.py | 1 + python/paddle/v2/trainer.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/demo/word2vec/api_train_v2.py b/demo/word2vec/api_train_v2.py index eb61a7250fb84..604adba192ee2 100644 --- a/demo/word2vec/api_train_v2.py +++ b/demo/word2vec/api_train_v2.py @@ -69,6 +69,7 @@ def main(): def event_handler(event): if isinstance(event, paddle.event.EndIteration): if event.batch_id % 100 == 0: + trainer.save_parameter("output", "batch-" + str(event.batch_id)) result = trainer.test( paddle.batch( paddle.dataset.imikolov.test(word_dict, N), 32)) diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 552c6690a608f..028f25a046768 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -1,4 +1,6 @@ import collections +import gzip +import os import py_paddle.swig_paddle as api @@ -96,6 +98,18 @@ def __prepare_parameter__(self, in_args): self.__gradient_machine__.prefetch(in_args) self.__parameter_updater__.getParametersRemote() + def save_parameter(self, dir_name, file_name): + if not os.path.exists(dir_name): + os.makedirs(dir_name) + param_file_name = dir_name + "/" + file_name + '.tar.gz' + assert not os.path.exists(param_file_name) + self.__parameter_updater__.catchUpWith() + self.__parameter_updater__.apply() + self.__parameter_updater__.getParametersRemote(True, True) + with gzip.open(param_file_name, 'w') as f: + self.__parameters__.to_tar(f) + self.__parameter_updater__.restore() + def train(self, reader, num_passes=1, event_handler=None, feeding=None): """ Training method. Will train num_passes of input data. From 68c1efdd9c5b0e38893d5034e36a548c6e06a3e6 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sat, 22 Apr 2017 07:39:41 +0800 Subject: [PATCH 11/15] fix the bug of use sparse_remote_update with MultiGradientMachine --- paddle/gserver/gradientmachines/MultiGradientMachine.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/gserver/gradientmachines/MultiGradientMachine.cpp b/paddle/gserver/gradientmachines/MultiGradientMachine.cpp index 6ae60102b3e43..3159026e6b923 100644 --- a/paddle/gserver/gradientmachines/MultiGradientMachine.cpp +++ b/paddle/gserver/gradientmachines/MultiGradientMachine.cpp @@ -518,7 +518,7 @@ void TrainerThread::computeThread() { backward(); break; case MultiGradientMachine::TASK_COPY_IN_ARGS: - copyInArgs(); + batchSize_ = copyInArgs(); inArgsCopied_ = true; multiMachine_->waitForCopyInArgs(); break; From 35f1dfde72f9be8d9fccdaaf5d328da9f8ffcc36 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 24 Apr 2017 13:58:14 +0800 Subject: [PATCH 12/15] chage trainer.save_parameter to trainer.save_parameter_to_tar --- demo/word2vec/api_train_v2.py | 4 ++-- python/paddle/v2/trainer.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/demo/word2vec/api_train_v2.py b/demo/word2vec/api_train_v2.py index 604adba192ee2..8fea6feeef0c4 100644 --- a/demo/word2vec/api_train_v2.py +++ b/demo/word2vec/api_train_v2.py @@ -29,7 +29,7 @@ def main(): else: paddle.init( use_gpu=False, - trainer_count=1, + trainer_count=2, port=7164, ports_num=1, ports_num_for_sparse=1, @@ -69,7 +69,7 @@ def main(): def event_handler(event): if isinstance(event, paddle.event.EndIteration): if event.batch_id % 100 == 0: - trainer.save_parameter("output", "batch-" + str(event.batch_id)) + trainer.save_parameter_to_tar("output", "batch-" + str(event.batch_id)) result = trainer.test( paddle.batch( paddle.dataset.imikolov.test(word_dict, N), 32)) diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 028f25a046768..220d459525f0d 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -98,7 +98,7 @@ def __prepare_parameter__(self, in_args): self.__gradient_machine__.prefetch(in_args) self.__parameter_updater__.getParametersRemote() - def save_parameter(self, dir_name, file_name): + def save_parameter_to_tar(self, dir_name, file_name): if not os.path.exists(dir_name): os.makedirs(dir_name) param_file_name = dir_name + "/" + file_name + '.tar.gz' From 9e9d456220e1718d0fe2e24a509fa2a9eb324d7c Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 24 Apr 2017 15:27:52 +0800 Subject: [PATCH 13/15] fix pre-commit check --- demo/word2vec/api_train_v2.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/demo/word2vec/api_train_v2.py b/demo/word2vec/api_train_v2.py index 8fea6feeef0c4..98ade830cf51e 100644 --- a/demo/word2vec/api_train_v2.py +++ b/demo/word2vec/api_train_v2.py @@ -69,7 +69,8 @@ def main(): def event_handler(event): if isinstance(event, paddle.event.EndIteration): if event.batch_id % 100 == 0: - trainer.save_parameter_to_tar("output", "batch-" + str(event.batch_id)) + trainer.save_parameter_to_tar("output", + "batch-" + str(event.batch_id)) result = trainer.test( paddle.batch( paddle.dataset.imikolov.test(word_dict, N), 32)) From 6a2776e139b7ba886daaa7ca0026f2f192b30c73 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 24 Apr 2017 16:29:32 +0800 Subject: [PATCH 14/15] save_parameter_to_tar to fd --- demo/word2vec/api_train_v2.py | 5 +++-- python/paddle/v2/trainer.py | 9 ++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/demo/word2vec/api_train_v2.py b/demo/word2vec/api_train_v2.py index 98ade830cf51e..a224951f4d4c8 100644 --- a/demo/word2vec/api_train_v2.py +++ b/demo/word2vec/api_train_v2.py @@ -1,3 +1,4 @@ +import gzip import math import paddle.v2 as paddle @@ -69,8 +70,8 @@ def main(): def event_handler(event): if isinstance(event, paddle.event.EndIteration): if event.batch_id % 100 == 0: - trainer.save_parameter_to_tar("output", - "batch-" + str(event.batch_id)) + with gzip.open("batch-" + str(event.batch_id), 'w') as f: + trainer.save_parameter_to_tar(f) result = trainer.test( paddle.batch( paddle.dataset.imikolov.test(word_dict, N), 32)) diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 220d459525f0d..6a83ba8533cd6 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -98,16 +98,11 @@ def __prepare_parameter__(self, in_args): self.__gradient_machine__.prefetch(in_args) self.__parameter_updater__.getParametersRemote() - def save_parameter_to_tar(self, dir_name, file_name): - if not os.path.exists(dir_name): - os.makedirs(dir_name) - param_file_name = dir_name + "/" + file_name + '.tar.gz' - assert not os.path.exists(param_file_name) + def save_parameter_to_tar(self, f): self.__parameter_updater__.catchUpWith() self.__parameter_updater__.apply() self.__parameter_updater__.getParametersRemote(True, True) - with gzip.open(param_file_name, 'w') as f: - self.__parameters__.to_tar(f) + self.__parameters__.to_tar(f) self.__parameter_updater__.restore() def train(self, reader, num_passes=1, event_handler=None, feeding=None): From cb84cbab9dcc6acca30d65de569bc98770edef41 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 24 Apr 2017 16:42:58 +0800 Subject: [PATCH 15/15] add .tar.gz suffix to parameter save filename --- demo/word2vec/api_train_v2.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/demo/word2vec/api_train_v2.py b/demo/word2vec/api_train_v2.py index a224951f4d4c8..c0940f0e56eaf 100644 --- a/demo/word2vec/api_train_v2.py +++ b/demo/word2vec/api_train_v2.py @@ -70,7 +70,8 @@ def main(): def event_handler(event): if isinstance(event, paddle.event.EndIteration): if event.batch_id % 100 == 0: - with gzip.open("batch-" + str(event.batch_id), 'w') as f: + with gzip.open("batch-" + str(event.batch_id) + ".tar.gz", + 'w') as f: trainer.save_parameter_to_tar(f) result = trainer.test( paddle.batch(