diff --git a/demo/quick_start/cluster/pserver.py b/demo/quick_start/cluster/pserver.py new file mode 100644 index 0000000000000..fe10a77133578 --- /dev/null +++ b/demo/quick_start/cluster/pserver.py @@ -0,0 +1,30 @@ +# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from py_paddle import swig_paddle as api + +#import pudb;pudb.set_trace() + + +def main(): + api.initPaddle("--nics=lo0", "--port=7164", "--ports_num=1", + "--num_gradient_servers=1", "--comment=paddle_pserver") + pserver = api.ParameterServer.createParameterServer() + pserver.init() + pserver.start() + pserver.join() + + +if __name__ == '__main__': + main() diff --git a/paddle/api/CMakeLists.txt b/paddle/api/CMakeLists.txt index da6dad10cd807..c060865764c4e 100644 --- a/paddle/api/CMakeLists.txt +++ b/paddle/api/CMakeLists.txt @@ -10,7 +10,8 @@ set(API_SOURCES SequenceGenerator.cpp Trainer.cpp Util.cpp - Vector.cpp) + Vector.cpp + ParameterServer.cpp) set(API_HEADER PaddleAPI.h Internal.h) diff --git a/paddle/api/Paddle.swig b/paddle/api/Paddle.swig index 3365927f9b599..84c50c30add87 100644 --- a/paddle/api/Paddle.swig +++ b/paddle/api/Paddle.swig @@ -178,6 +178,8 @@ namespace std { %newobject ParameterOptimizer::create; %newobject ParameterOptimizer::needSpecialTraversal; %newobject ParameterUpdater::createLocalUpdater; +%newobject ParameterUpdater::createRemoteUpdater; +%newobject ParameterServer::createParameterServer; %feature("director") UpdateCallback; %feature("autodoc", 1); // To generate method stub, for code hint in ide @@ -196,5 +198,6 @@ namespace std { %ignore ParameterConfigPrivate; %ignore OptimizationConfigPrivate; %ignore ParameterTraverseCallbackPrivate; +%ignore ParameterServerPrivate; %include "utils/GlobalConstants.h" %include "api/PaddleAPI.h" diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index 09c891871a5ca..04bc850ff8500 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -803,6 +803,8 @@ class ParameterUpdater { public: static ParameterUpdater* createLocalUpdater(OptimizationConfig* config); + static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config, + int passCount); ~ParameterUpdater(); /** @@ -866,6 +868,28 @@ class ParameterUpdater { ParameterUpdaterPrivate* m; }; +struct ParameterServerPrivate; +class ParameterServer { +private: + ParameterServer(); + +public: + static ParameterServer* createParameterServer(); + + ~ParameterServer(); + + /** + * @brief initialize Parameter Server. + * @param gm + */ + void init(); + void start(); + void join(); + +private: + ParameterServerPrivate* m; +}; + struct EvaluatorPrivate; class Evaluator { private: diff --git a/paddle/api/PaddleAPIPrivate.h b/paddle/api/PaddleAPIPrivate.h index f41352bfec7c3..74c96b8acef9c 100644 --- a/paddle/api/PaddleAPIPrivate.h +++ b/paddle/api/PaddleAPIPrivate.h @@ -17,6 +17,7 @@ limitations under the License. */ #include "paddle/gserver/evaluators/Evaluator.h" #include "paddle/gserver/gradientmachines/GradientMachine.h" #include "paddle/parameter/ParameterUpdaterBase.h" +#include "paddle/pserver/PServerUtil.h" #include "paddle/trainer/TrainerConfigHelper.h" struct GradientMachinePrivate { @@ -72,6 +73,10 @@ struct ParameterUpdaterPrivate { std::unique_ptr updater; }; +struct ParameterServerPrivate { + std::unique_ptr pServerUtil; +}; + struct ParameterPrivate { std::shared_ptr sharedPtr; paddle::Parameter* rawPtr; // rawPtr only used in ParameterUpdater, diff --git a/paddle/api/ParameterServer.cpp b/paddle/api/ParameterServer.cpp new file mode 100644 index 0000000000000..140028e0fbfc8 --- /dev/null +++ b/paddle/api/ParameterServer.cpp @@ -0,0 +1,33 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "PaddleAPI.h" + +#include "PaddleAPIPrivate.h" + +ParameterServer::ParameterServer() : m(new ParameterServerPrivate()) {} + +ParameterServer* ParameterServer::createParameterServer() { + auto pServer = new ParameterServer(); + pServer->m->pServerUtil.reset(new paddle::PServerUtil()); + return pServer; +} + +ParameterServer::~ParameterServer() { delete m; } + +void ParameterServer::init() { m->pServerUtil->init(); } + +void ParameterServer::start() { m->pServerUtil->start(); } + +void ParameterServer::join() { m->pServerUtil->join(); } diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index 7cd8ed7e39074..75b0ae7cb6cc8 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -15,15 +15,25 @@ limitations under the License. */ #include "PaddleAPI.h" #include "PaddleAPIPrivate.h" +#include "paddle/trainer/RemoteParameterUpdater.h" #include "paddle/trainer/ThreadParameterUpdater.h" ParameterUpdater::ParameterUpdater() : m(new ParameterUpdaterPrivate()) {} ParameterUpdater *ParameterUpdater::createLocalUpdater( OptimizationConfig *config) { - auto param = new ParameterUpdater(); - param->m->updater.reset(new paddle::SgdThreadUpdater(config->m->getConfig())); - return param; + auto updater = new ParameterUpdater(); + updater->m->updater.reset( + new paddle::SgdThreadUpdater(config->m->getConfig())); + return updater; +} + +ParameterUpdater *ParameterUpdater::createRemoteUpdater( + OptimizationConfig *config, int passCount) { + auto updater = new ParameterUpdater(); + updater->m->updater.reset(new paddle::RemoteParameterUpdater( + config->m->getConfig(), passCount, nullptr)); + return updater; } ParameterUpdater::~ParameterUpdater() { delete m; } diff --git a/paddle/pserver/CMakeLists.txt b/paddle/pserver/CMakeLists.txt index 1c1e1964b8d3f..9bc48294f06b8 100644 --- a/paddle/pserver/CMakeLists.txt +++ b/paddle/pserver/CMakeLists.txt @@ -24,13 +24,15 @@ set(PSERVER_SOURCES BaseClient.cpp ParameterClient2.cpp ParameterServer2.cpp - SparseParameterDistribution.cpp) + SparseParameterDistribution.cpp + PServerUtil.cpp) set(PSERVER_HEADERS BaseClient.h ParameterClient2.h ParameterServer2.h - SparseParameterDistribution.h) + SparseParameterDistribution.h + PServerUtil.h) add_library(paddle_pserver STATIC ${PSERVER_SOURCES}) diff --git a/paddle/pserver/PServerUtil.cpp b/paddle/pserver/PServerUtil.cpp new file mode 100644 index 0000000000000..d293e112ab471 --- /dev/null +++ b/paddle/pserver/PServerUtil.cpp @@ -0,0 +1,77 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "PServerUtil.h" + +namespace paddle { + +void PServerUtil::init() { + // round robin to load balance RDMA server ENGINE + std::vector devices; + int rdmaCpu = 0; + int onlineCpus = rdma::numCpus(); + int numPorts = FLAGS_ports_num + FLAGS_ports_num_for_sparse; + + if (FLAGS_nics.empty()) { + pservers_.resize(numPorts); + for (int i = 0; i < numPorts; ++i) { + if (FLAGS_rdma_tcp == "rdma") { + pservers_[i].reset( + new ParameterServer2(std::string(), FLAGS_port + i, rdmaCpu++)); + rdmaCpu = rdmaCpu % onlineCpus; + } else { + pservers_[i].reset(new ParameterServer2(std::string(), FLAGS_port + i)); + } + CHECK(pservers_[i]->init()) << "Fail to initialize parameter server" + << FLAGS_port + i; + } + } else { + str::split(FLAGS_nics, ',', &devices); + pservers_.resize(devices.size() * numPorts); + for (int i = 0; i < numPorts; ++i) { + for (size_t j = 0; j < devices.size(); ++j) { + if (FLAGS_rdma_tcp == "rdma") { + pservers_[i * devices.size() + j].reset(new ParameterServer2( + getIpAddr(devices[j]), FLAGS_port + i, rdmaCpu++)); + rdmaCpu = rdmaCpu % onlineCpus; + } else { + pservers_[i * devices.size() + j].reset( + new ParameterServer2(getIpAddr(devices[j]), FLAGS_port + i)); + } + CHECK(pservers_[i * devices.size() + j]->init()) + << "Fail to initialize parameter server" << devices[j] + << FLAGS_port + i; + } + } + } +} + +void PServerUtil::start() { + LOG(INFO) << "pserver sizes : " << pservers_.size(); + int i = 0; + for (const auto &pserver : pservers_) { + LOG(INFO) << "pserver started : " << i; + pserver->start(); + i++; + } +} + +void PServerUtil::join() { + LOG(INFO) << "pserver sizes : " << pservers_.size(); + for (const auto &pserver : pservers_) { + pserver->join(); + } +} + +} // namespace paddle diff --git a/paddle/pserver/PServerUtil.h b/paddle/pserver/PServerUtil.h new file mode 100644 index 0000000000000..f451e62e45e5b --- /dev/null +++ b/paddle/pserver/PServerUtil.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include "ParameterServer2.h" +#include "RDMANetwork.h" +#include "paddle/utils/StringUtil.h" + +namespace paddle { + +class PServerUtil { +public: + void init(); + void start(); + void join(); + +private: + std::vector> pservers_; +}; + +} // namespace paddle diff --git a/paddle/pserver/ParameterServer2Main.cpp b/paddle/pserver/ParameterServer2Main.cpp index ffc521f2c143d..d0428eee43c26 100644 --- a/paddle/pserver/ParameterServer2Main.cpp +++ b/paddle/pserver/ParameterServer2Main.cpp @@ -16,63 +16,16 @@ limitations under the License. */ #include "paddle/utils/StringUtil.h" #include "paddle/utils/Util.h" -#include "ParameterServer2.h" -#include "RDMANetwork.h" -#include "paddle/utils/Flags.h" +#include "PServerUtil.h" using namespace paddle; // NOLINT int main(int argc, char** argv) { initMain(argc, argv); - - std::vector devices; - std::vector> pservers; - - // round robin to loadbalance RDMA server ENGINE - int rdmaCpu = 0; - int onlineCpus = rdma::numCpus(); - int numPorts = FLAGS_ports_num + FLAGS_ports_num_for_sparse; - if (FLAGS_nics.empty()) { - pservers.resize(numPorts); - for (int i = 0; i < numPorts; ++i) { - if (FLAGS_rdma_tcp == "rdma") { - pservers[i].reset( - new ParameterServer2(std::string(), FLAGS_port + i, rdmaCpu++)); - rdmaCpu = rdmaCpu % onlineCpus; - } else { - pservers[i].reset(new ParameterServer2(std::string(), FLAGS_port + i)); - } - CHECK(pservers[i]->init()) << "Fail to initialize parameter server" - << FLAGS_port + i; - LOG(INFO) << "pserver started : " << FLAGS_port + i; - pservers[i]->start(); - } - } else { - str::split(FLAGS_nics, ',', &devices); - pservers.resize(devices.size() * numPorts); - for (int i = 0; i < numPorts; ++i) { - for (size_t j = 0; j < devices.size(); ++j) { - if (FLAGS_rdma_tcp == "rdma") { - pservers[i * devices.size() + j].reset(new ParameterServer2( - getIpAddr(devices[j]), FLAGS_port + i, rdmaCpu++)); - rdmaCpu = rdmaCpu % onlineCpus; - } else { - pservers[i * devices.size() + j].reset( - new ParameterServer2(getIpAddr(devices[j]), FLAGS_port + i)); - } - CHECK(pservers[i * devices.size() + j]->init()) - << "Fail to initialize parameter server" << devices[j] - << FLAGS_port + i; - LOG(INFO) << "pserver started : " << devices[j] << ":" - << FLAGS_port + i; - pservers[i * devices.size() + j]->start(); - } - } - } - - for (auto& pserver : pservers) { - pserver->join(); - } + PServerUtil* pserverUtil = new PServerUtil(); + pserverUtil->init(); + pserverUtil->start(); + pserverUtil->join(); return 0; } diff --git a/paddle/trainer/RemoteParameterUpdater.h b/paddle/trainer/RemoteParameterUpdater.h index 7794b209009a3..5e82c94475162 100644 --- a/paddle/trainer/RemoteParameterUpdater.h +++ b/paddle/trainer/RemoteParameterUpdater.h @@ -56,7 +56,7 @@ class RemoteParameterUpdater : public ParameterUpdater { public: RemoteParameterUpdater( const OptimizationConfig& config, - int expectedPpassCount, + int expectedPassCount, std::unique_ptr&& localUpdater = nullptr); ~RemoteParameterUpdater() { if (controllerThread_) { @@ -146,7 +146,7 @@ class RemoteParameterUpdater : public ParameterUpdater { BatchStatus batchStatus_; /// controller thread for sync-sgd std::unique_ptr controllerThread_; - /// passed alread finished + /// passed already finished int64_t passCount_; /// expected passes to finished int64_t expectedPassCount_; diff --git a/paddle/trainer/TrainerMain.cpp b/paddle/trainer/TrainerMain.cpp index 947f9cadcc983..5fea8962b00e9 100644 --- a/paddle/trainer/TrainerMain.cpp +++ b/paddle/trainer/TrainerMain.cpp @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include +#include #include "paddle/pserver/ParameterServer2.h" #include "paddle/utils/Excepts.h" #include "paddle/utils/PythonUtil.h" @@ -39,54 +40,10 @@ int main(int argc, char** argv) { initMain(argc, argv); initPython(argc, argv); - std::vector> pservers; - std::vector devices; - if (FLAGS_start_pserver) { - // round robin to loadbalance RDMA server ENGINE - int rdmaCpu = 0; - int onlineCpus = rdma::numCpus(); - int numPorts = FLAGS_ports_num + FLAGS_ports_num_for_sparse; - if (FLAGS_nics.empty()) { - pservers.resize(numPorts); - for (int i = 0; i < numPorts; ++i) { - if (FLAGS_rdma_tcp == "rdma") { - pservers[i].reset( - new ParameterServer2(std::string(), FLAGS_port + i, rdmaCpu++)); - rdmaCpu = rdmaCpu % onlineCpus; - } else { - pservers[i].reset( - new ParameterServer2(std::string(), FLAGS_port + i)); - } - - CHECK(pservers[i]->init()) << "Fail to initialize parameter server" - << FLAGS_port + i; - LOG(INFO) << "pserver started : " << FLAGS_port + i; - pservers[i]->start(); - } - } else { - str::split(FLAGS_nics, ',', &devices); - pservers.resize(devices.size() * numPorts); - for (int i = 0; i < numPorts; ++i) { - for (size_t j = 0; j < devices.size(); ++j) { - if (FLAGS_rdma_tcp == "rdma") { - pservers[i * devices.size() + j].reset(new ParameterServer2( - getIpAddr(devices[j]), FLAGS_port + i, rdmaCpu++)); - rdmaCpu = rdmaCpu % onlineCpus; - } else { - pservers[i * devices.size() + j].reset( - new ParameterServer2(getIpAddr(devices[j]), FLAGS_port + i)); - } - - CHECK(pservers[i * devices.size() + j]->init()) - << "Fail to initialize parameter server" << devices[j] - << FLAGS_port + i; - LOG(INFO) << "pserver started : " << devices[j] << ":" - << FLAGS_port + i; - pservers[i * devices.size() + j]->start(); - } - } - } + PServerUtil* pserverUtil = new PServerUtil(); + pserverUtil->init(); + pserverUtil->start(); } Trainer trainer; auto config = TrainerConfigHelper::createFromFlags();