Skip to content

Commit

Permalink
config: added ports
Browse files Browse the repository at this point in the history
  • Loading branch information
abudnik committed Apr 20, 2015
1 parent 5aa6590 commit e9f272c
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 55 deletions.
24 changes: 21 additions & 3 deletions doc/README
Expand Up @@ -161,6 +161,12 @@ connects to Worker to get job completion status.
- ipv6
Setting this parameter value to true leads to using of IPv6 protocol.

- port (optional, default = 5555)

- ping_port (optional, default = 5554)

- master_ping_port (optional, default = 5553)

Master configuration
--------------------------------------------------------------------------------

Expand Down Expand Up @@ -220,6 +226,16 @@ files as the Master's "ipv6" value.
- masterdb
IP or hostname of masterdb service

- node_port (optional, default = 5555)

- node_ping_port (optional, default = 5554)

- master_ping_port (optional, default = 5553)

- master_admin_port (optional, default = 5557)

- masterdb_port (optional, default = 5559)

Masterdb configuration
--------------------------------------------------------------------------------

Expand All @@ -237,12 +253,14 @@ Path to the jobs database directory. Used in configuration with LevelDB library.
Setting this parameter value to true leads to using of IPv6 protocol. Note, that
Masterdb must have the same value of "ipv6" settings as the Master's "ipv6".

- port (optional, default = 5559)

Network configuration
--------------------------------------------------------------------------------

Master listen ports: 5557 (tcp), 5553(udp)
Masterdb listen ports: 5559 (tcp)
Worker listen ports: 5555 (tcp), 5554(udp)
Master default listen ports: 5557 (tcp), 5553(udp)
Masterdb default listen ports: 5559 (tcp)
Worker default listen ports: 5555 (tcp), 5554(udp)

Logs
--------------------------------------------------------------------------------
Expand Down
9 changes: 8 additions & 1 deletion src/common/config.h
Expand Up @@ -32,11 +32,18 @@ class Config
{
public:
template<typename T>
T Get( const char *key )
T Get( const char *key ) const
{
return ptree_.get<T>( key );
}

template<typename T>
void Insert( const char *key, T val )
{
if (!ptree_.get_optional<T>(key))
ptree_.put<T>( key, val );
}

bool ParseConfig( const char *cfgPath, const char *cfgName );

static Config &Instance()
Expand Down
8 changes: 4 additions & 4 deletions src/master/admin.h
Expand Up @@ -28,7 +28,6 @@ the License.
#include "common/json_rpc.h"
#include "common/config.h"
#include "common/log.h"
#include "defines.h"

namespace master {

Expand Down Expand Up @@ -174,10 +173,11 @@ class AdminConnection

try
{
common::Config &cfg = common::Config::Instance();
bool ipv6 = cfg.Get<bool>( "ipv6" );
const common::Config &cfg = common::Config::Instance();
const bool ipv6 = cfg.Get<bool>( "ipv6" );
const unsigned short master_admin_port = cfg.Get<unsigned short>( "master_admin_port" );

tcp::endpoint endpoint( ipv6 ? tcp::v6() : tcp::v4(), MASTER_ADMIN_PORT );
tcp::endpoint endpoint( ipv6 ? tcp::v6() : tcp::v4(), master_admin_port );
acceptor_.open( endpoint.protocol() );
acceptor_.set_option( tcp::acceptor::reuse_address( true ) );
acceptor_.set_option( tcp::no_delay( true ) );
Expand Down
7 changes: 5 additions & 2 deletions src/master/command_sender.cpp
Expand Up @@ -26,9 +26,9 @@ the License.
#include "worker_manager.h"
#include "job_manager.h"
#include "common/log.h"
#include "common/config.h"
#include "common/protocol.h"
#include "common/service_locator.h"
#include "defines.h"

namespace master {

Expand Down Expand Up @@ -114,9 +114,12 @@ void CommandSenderBoost::OnSendCommand( bool success, int errCode, CommandPtr &c

void RpcBoost::SendCommand()
{
const common::Config &cfg = common::Config::Instance();
const unsigned short node_port = cfg.Get<unsigned short>( "node_port" );

tcp::endpoint nodeEndpoint(
boost::asio::ip::address::from_string( hostIP_ ),
NODE_PORT
node_port
);

socket_.async_connect( nodeEndpoint,
Expand Down
7 changes: 5 additions & 2 deletions src/master/job_sender.cpp
Expand Up @@ -25,9 +25,9 @@ the License.
#include "scheduler.h"
#include "job_manager.h"
#include "common/log.h"
#include "common/config.h"
#include "common/protocol.h"
#include "common/service_locator.h"
#include "defines.h"

namespace master {

Expand Down Expand Up @@ -131,9 +131,12 @@ void JobSenderBoost::OnJobSendCompletion( bool success, const WorkerJob &workerJ

void SenderBoost::Send()
{
const common::Config &cfg = common::Config::Instance();
const unsigned short node_port = cfg.Get<unsigned short>( "node_port" );

tcp::endpoint nodeEndpoint(
boost::asio::ip::address::from_string( hostIP_ ),
NODE_PORT
node_port
);

socket_.async_connect( nodeEndpoint,
Expand Down
22 changes: 17 additions & 5 deletions src/master/master.cpp
Expand Up @@ -102,7 +102,7 @@ void SigHandler( int s )
case SIGABRT:
case SIGFPE:
case SIGBUS:
case SIGSEGV:
case SIGSEGV:
case SIGILL:
case SIGSYS:
case SIGXCPU:
Expand Down Expand Up @@ -194,6 +194,7 @@ class MasterApplication
{
cfg.ParseConfig( "", cfgPath_.c_str() );
}
ApplyDefaults( cfg );

unsigned int numHeartbeatThread = 1;
unsigned int numPingReceiverThread = cfg.Get<unsigned int>( "num_ping_receiver_thread" );
Expand All @@ -203,6 +204,7 @@ class MasterApplication
unsigned int numPingThread = numHeartbeatThread + numPingReceiverThread;

std::string masterdb = cfg.Get<std::string>( "masterdb" );
const unsigned short masterdb_port = cfg.Get<unsigned short>( "masterdb_port" );

// initialize main components
common::ServiceLocator &serviceLocator = common::ServiceLocator::Instance();
Expand All @@ -223,7 +225,7 @@ class MasterApplication
dbConnection_.reset( new master::DbHistoryConnection( io_service_db_ ) );
jobHistory_.reset( new master::JobHistory( dbConnection_.get() ) );
serviceLocator.Register( static_cast< master::IJobEventReceiver* >( jobHistory_.get() ) );
if ( dbConnection_->Connect( masterdb, master::MASTERDB_PORT ) )
if ( dbConnection_->Connect( masterdb, masterdb_port ) )
{
jobHistory_->GetJobs();
}
Expand Down Expand Up @@ -367,7 +369,7 @@ class MasterApplication
int sig;
sigemptyset( &waitset );
sigaddset( &waitset, SIGTERM );
sigprocmask( SIG_BLOCK, &waitset, NULL );
sigprocmask( SIG_BLOCK, &waitset, NULL );
while( 1 )
{
int ret = sigwait( &waitset, &sig );
Expand All @@ -380,6 +382,16 @@ class MasterApplication
}
}

private:
void ApplyDefaults( common::Config &cfg ) const
{
cfg.Insert( "node_port", master::NODE_PORT );
cfg.Insert( "node_ping_port", master::NODE_UDP_PORT );
cfg.Insert( "master_ping_port", master::MASTER_UDP_PORT );
cfg.Insert( "master_admin_port", master::MASTER_ADMIN_PORT );
cfg.Insert( "masterdb_port", master::MASTERDB_PORT );
}

private:
std::string exeDir_;
std::string cfgPath_;
Expand Down Expand Up @@ -423,15 +435,15 @@ int main( int argc, char* argv[] )

// parse input command line options
namespace po = boost::program_options;

po::options_description descr;

descr.add_options()
("help", "Print help")
("d", "Run as a daemon")
("s", "Stop daemon")
("c", po::value<std::string>(), "Config file path");

po::variables_map vm;
po::store( po::parse_command_line( argc, argv, descr ), vm );
po::notify( vm );
Expand Down
8 changes: 4 additions & 4 deletions src/master/node_ping.h
Expand Up @@ -25,7 +25,6 @@ the License.

#include <boost/asio.hpp>
#include "common/config.h"
#include "defines.h"

namespace master {

Expand All @@ -48,11 +47,12 @@ class PingReceiverBoost : public PingReceiver
PingReceiverBoost( boost::asio::io_service &io_service )
: socket_( io_service )
{
common::Config &cfg = common::Config::Instance();
bool ipv6 = cfg.Get<bool>( "ipv6" );
const common::Config &cfg = common::Config::Instance();
const bool ipv6 = cfg.Get<bool>( "ipv6" );
const unsigned short master_ping_port = cfg.Get<unsigned short>( "master_ping_port" );

socket_.open( ipv6 ? udp::v6() : udp::v4() );
socket_.bind( udp::endpoint( ipv6 ? udp::v6() : udp::v4(), master::MASTER_UDP_PORT ) );
socket_.bind( udp::endpoint( ipv6 ? udp::v6() : udp::v4(), master_ping_port ) );

memset( buffer_.c_array(), 0, buffer_.size() );
}
Expand Down
8 changes: 4 additions & 4 deletions src/master/ping.h
Expand Up @@ -28,7 +28,6 @@ the License.
#include "common/helper.h"
#include "common/protocol.h"
#include "common/config.h"
#include "defines.h"
#include "worker.h"

namespace master {
Expand Down Expand Up @@ -86,13 +85,14 @@ class PingerBoost : public Pinger
socket_( io_service ),
resolver_( io_service )
{
common::Config &cfg = common::Config::Instance();
bool ipv6 = cfg.Get<bool>( "ipv6" );
const common::Config &cfg = common::Config::Instance();
const bool ipv6 = cfg.Get<bool>( "ipv6" );
const unsigned short node_ping_port = cfg.Get<unsigned short>( "node_ping_port" );

socket_.open( ipv6 ? udp::v6() : udp::v4() );

std::ostringstream ss;
ss << master::NODE_UDP_PORT;
ss << node_ping_port;
port_ = ss.str();
}

Expand Down
7 changes: 5 additions & 2 deletions src/master/result_getter.cpp
Expand Up @@ -27,9 +27,9 @@ the License.
#include "job_manager.h"
#include "scheduler.h"
#include "common/log.h"
#include "common/config.h"
#include "common/protocol.h"
#include "common/service_locator.h"
#include "defines.h"

namespace master {

Expand Down Expand Up @@ -113,9 +113,12 @@ void ResultGetterBoost::OnGetTaskResult( bool success, int errCode, int64_t exec

void GetterBoost::GetTaskResult()
{
const common::Config &cfg = common::Config::Instance();
const unsigned short node_port = cfg.Get<unsigned short>( "node_port" );

tcp::endpoint nodeEndpoint(
boost::asio::ip::address::from_string( hostIP_ ),
NODE_PORT
node_port
);

socket_.async_connect( nodeEndpoint,
Expand Down
18 changes: 13 additions & 5 deletions src/masterdb/masterdb.cpp
Expand Up @@ -57,7 +57,7 @@ void SigHandler( int s )
case SIGABRT:
case SIGFPE:
case SIGBUS:
case SIGSEGV:
case SIGSEGV:
case SIGILL:
case SIGSYS:
case SIGXCPU:
Expand Down Expand Up @@ -145,14 +145,16 @@ class MasterDbApplication
{
cfg.ParseConfig( "", cfgPath_.c_str() );
}
ApplyDefaults( cfg );

// initialize main components
dbClient_.Initialize( exeDir_ );

common::ServiceLocator &serviceLocator = common::ServiceLocator::Instance();
serviceLocator.Register( static_cast< masterdb::IDAO* >( &dbClient_ ) );

acceptor_.reset( new masterdb::ConnectionAcceptor( io_service_, masterdb::MASTERDB_PORT ) );
const unsigned short port = cfg.Get<unsigned short>( "port" );
acceptor_.reset( new masterdb::ConnectionAcceptor( io_service_, port ) );

workerThread_.reset(
new boost::thread( boost::bind( &ThreadFun, &io_service_ ) )
Expand Down Expand Up @@ -196,7 +198,7 @@ class MasterDbApplication
int sig;
sigemptyset( &waitset );
sigaddset( &waitset, SIGTERM );
sigprocmask( SIG_BLOCK, &waitset, NULL );
sigprocmask( SIG_BLOCK, &waitset, NULL );
while( 1 )
{
int ret = sigwait( &waitset, &sig );
Expand All @@ -209,6 +211,12 @@ class MasterDbApplication
}
}

private:
void ApplyDefaults( common::Config &cfg ) const
{
cfg.Insert( "port", masterdb::MASTERDB_PORT );
}

private:
std::string exeDir_;
std::string cfgPath_;
Expand Down Expand Up @@ -238,15 +246,15 @@ int main( int argc, char* argv[] )

// parse input command line options
namespace po = boost::program_options;

po::options_description descr;

descr.add_options()
("help", "Print help")
("d", "Run as a daemon")
("s", "Stop daemon")
("c", po::value<std::string>(), "Config file path");

po::variables_map vm;
po::store( po::parse_command_line( argc, argv, descr ), vm );
po::notify( vm );
Expand Down

0 comments on commit e9f272c

Please sign in to comment.