Skip to content

Commit

Permalink
Migrated all non async_receive asynchronous calls to their own io_ser…
Browse files Browse the repository at this point in the history
…vice. This will help prevent dropped packets.
  • Loading branch information
rcythr committed Mar 4, 2013
1 parent cd17132 commit f393813
Show file tree
Hide file tree
Showing 26 changed files with 126 additions and 103 deletions.
4 changes: 3 additions & 1 deletion src/swganh/app/kernel_interface.h
Expand Up @@ -62,7 +62,9 @@ class KernelInterface {

virtual swganh::database::DatabaseManager* GetDatabaseManager() = 0;

virtual boost::asio::io_service& GetIoService() = 0;
virtual boost::asio::io_service& GetIoThreadPool() = 0;

virtual boost::asio::io_service& GetCpuThreadPool() = 0;

// also add entity manager, blah blah.
};
Expand Down
74 changes: 51 additions & 23 deletions src/swganh/app/swganh_app.cc
Expand Up @@ -58,8 +58,8 @@ options_description AppConfig::BuildConfigDescription() {
desc.add_options()
("help,h", "Display help message and config options")

("server_mode", boost::program_options::value<std::string>(&server_mode)->default_value("all"),
"Specifies the service configuration mode to run the server in.")
("server_mode", boost::program_options::value<std::string>(&server_mode)->default_value("all"),
"Specifies the service configuration mode to run the server in.")

("plugin,p", boost::program_options::value<std::vector<std::string>>(&plugins),
"Only used when single_server_mode is disabled, loads a module of the specified name")
Expand All @@ -81,6 +81,12 @@ options_description AppConfig::BuildConfigDescription() {
("db_threads", value<uint32_t>(&db_threads)->default_value(2),
"Total number of threads to allocate for database management")

("io_threads", value<uint32_t>(&io_threads)->default_value(2),
"Total number of threads to allocate for pulling threads off the wire")

("cpu_threads", value<uint32_t>(&cpu_threads)->default_value(boost::thread::hardware_concurrency()),
"Total number of threads to allocate for processing.")

("db.galaxy_manager.host", boost::program_options::value<std::string>(&galaxy_manager_db.host),
"Host address for the galaxy_manager datastore")
("db.galaxy_manager.schema", boost::program_options::value<std::string>(&galaxy_manager_db.schema),
Expand Down Expand Up @@ -140,10 +146,12 @@ options_description AppConfig::BuildConfigDescription() {
}

SwganhApp::SwganhApp(int argc, char* argv[])
: io_service_()
, io_work_(new boost::asio::io_service::work(io_service_))
: io_pool_()
, cpu_pool_()
, io_work_(new boost::asio::io_service::work(io_pool_))
, cpu_work_(new boost::asio::io_service::work(cpu_pool_))
{
kernel_ = make_shared<SwganhKernel>(io_service_);
kernel_ = make_shared<SwganhKernel>(io_pool_, cpu_pool_);
running_ = false;
initialized_ = false;

Expand All @@ -158,9 +166,11 @@ SwganhApp::~SwganhApp()
kernel_->Shutdown();

io_work_.reset();
cpu_work_.reset();

// join the threadpool threads until each one has exited.
for_each(io_threads_.begin(), io_threads_.end(), std::mem_fn(&boost::thread::join));
for_each(cpu_threads_.begin(), cpu_threads_.end(), std::mem_fn(&boost::thread::join));

kernel_.reset();
}
Expand Down Expand Up @@ -244,28 +254,46 @@ void SwganhApp::Start() {

running_ = true;

// Start up a threadpool for running io_service based tasks/active objects
// The increment starts at 2 because the main thread of execution already counts
// as thread in use as does the console thread.
for (uint32_t i = 1; i < boost::thread::hardware_concurrency(); ++i) {
boost::thread t([this] () {
try
{
io_service_.run();
}
catch(...)
//Create a number of threads to pull packets off the wire.
for (uint32_t i = 0; i < kernel_->GetAppConfig().io_threads; ++i) {
boost::thread t([this] () {
//Continue looping despite errors.
//If we successfully leave the run method we return.
while(true)
{
LOG(severity_level::error) << "A near fatal exception has occurred.";
try
{
io_pool_.run();
return;
}
catch(...)
{
LOG(severity_level::error) << "A near fatal exception has occurred.";
}
}

});

#ifdef _WIN32
SetPriorityClass(t.native_handle(), REALTIME_PRIORITY_CLASS);
#endif

});
io_threads_.push_back(move(t));
}

for (uint32_t i = 0; i < kernel_->GetAppConfig().cpu_threads; ++i) {
boost::thread t([this] () {
//Continue looping despite errors.
//If we successfully leave the run method we return.
while(true)
{
try
{
cpu_pool_.run();
return;
}
catch(...)
{
LOG(severity_level::error) << "A near fatal exception has occurred.";
}
}
});
cpu_threads_.push_back(move(t));
}

kernel_->GetServiceManager()->Start();

Expand Down
6 changes: 3 additions & 3 deletions src/swganh/app/swganh_app.h
Expand Up @@ -78,9 +78,9 @@ class SwganhApp : public swganh::app::AppInterface, private boost::noncopyable {

void SetupLogging_();

boost::asio::io_service io_service_;
std::unique_ptr<boost::asio::io_service::work> io_work_;
std::vector<boost::thread> io_threads_;
boost::asio::io_service io_pool_, cpu_pool_;
std::unique_ptr<boost::asio::io_service::work> io_work_, cpu_work_;
std::vector<boost::thread> io_threads_, cpu_threads_;
std::shared_ptr<SwganhKernel> kernel_;
std::atomic<bool> running_;
bool initialized_;
Expand Down
17 changes: 12 additions & 5 deletions src/swganh/app/swganh_kernel.cc
Expand Up @@ -29,8 +29,9 @@ using swganh::service::ServiceManager;
using std::make_shared;
using std::shared_ptr;

SwganhKernel::SwganhKernel(boost::asio::io_service& io_service)
: io_service_(io_service)
SwganhKernel::SwganhKernel(boost::asio::io_service& io_pool, boost::asio::io_service& cpu_pool)
: io_pool_(io_pool)
, cpu_pool_(cpu_pool)
{
version_.major = VERSION_MAJOR;
version_.minor = VERSION_MINOR;
Expand Down Expand Up @@ -76,7 +77,7 @@ DatabaseManager* SwganhKernel::GetDatabaseManager() {

swganh::EventDispatcher* SwganhKernel::GetEventDispatcher() {
if (!event_dispatcher_) {
event_dispatcher_.reset(new swganh::EventDispatcher(GetIoService()));
event_dispatcher_.reset(new swganh::EventDispatcher(GetCpuThreadPool()));
}

return event_dispatcher_.get();
Expand Down Expand Up @@ -112,8 +113,14 @@ ServiceDirectoryInterface* SwganhKernel::GetServiceDirectory() {
return service_directory_.get();
}

boost::asio::io_service& SwganhKernel::GetIoService() {
return io_service_;
boost::asio::io_service& SwganhKernel::GetIoThreadPool()
{
return io_pool_;
}

boost::asio::io_service& SwganhKernel::GetCpuThreadPool()
{
return cpu_pool_;
}

swganh::tre::ResourceManager* SwganhKernel::GetResourceManager()
Expand Down
13 changes: 9 additions & 4 deletions src/swganh/app/swganh_kernel.h
Expand Up @@ -33,7 +33,10 @@ struct AppConfig {
std::string galaxy_name;
std::string tre_config;
uint32_t resource_cache_size;
uint32_t db_threads;

uint32_t io_threads;
uint32_t cpu_threads;
uint32_t db_threads;

/*!
* @Brief Contains information about the database config"
Expand Down Expand Up @@ -69,7 +72,7 @@ struct AppConfig {

class SwganhKernel : public swganh::app::KernelInterface {
public:
explicit SwganhKernel(boost::asio::io_service& io_service);
explicit SwganhKernel(boost::asio::io_service& io_pool, boost::asio::io_service& cpu_pool);
virtual ~SwganhKernel();

void Shutdown();
Expand All @@ -88,7 +91,9 @@ class SwganhKernel : public swganh::app::KernelInterface {

swganh::service::ServiceDirectoryInterface* GetServiceDirectory();

boost::asio::io_service& GetIoService();
boost::asio::io_service& GetIoThreadPool();

boost::asio::io_service& GetCpuThreadPool();

swganh::tre::ResourceManager* GetResourceManager();

Expand All @@ -104,7 +109,7 @@ class SwganhKernel : public swganh::app::KernelInterface {
std::unique_ptr<swganh::service::ServiceDirectoryInterface> service_directory_;
std::unique_ptr<swganh::tre::ResourceManager> resource_manager_;

boost::asio::io_service& io_service_;
boost::asio::io_service &io_pool_, &cpu_pool_;
};

}} // namespace swganh::app
4 changes: 2 additions & 2 deletions src/swganh/network/soe/session.cc
Expand Up @@ -17,11 +17,11 @@ using namespace swganh::network;
using namespace swganh::network::soe;
using namespace std;

Session::Session(ServerInterface* server, boost::asio::io_service& io_service, boost::asio::ip::udp::endpoint remote_endpoint)
Session::Session(ServerInterface* server, boost::asio::io_service& cpu_pool, boost::asio::ip::udp::endpoint remote_endpoint)
: std::enable_shared_from_this<Session>()
, remote_endpoint_(remote_endpoint)
, server_(server)
, strand_(io_service)
, strand_(cpu_pool)
, connected_(false)
, crc_seed_(0xDEADBABE)
, last_acknowledged_sequence_(0)
Expand Down
2 changes: 1 addition & 1 deletion src/swganh/network/soe/session.h
Expand Up @@ -47,7 +47,7 @@ class Session : public std::enable_shared_from_this<Session> {
/**
* Adds itself to the Session Manager.
*/
Session(ServerInterface* server, boost::asio::io_service& io_service, boost::asio::ip::udp::endpoint remote_endpoint);
Session(ServerInterface* server, boost::asio::io_service& cpu_pool, boost::asio::ip::udp::endpoint remote_endpoint);
~Session();

/**
Expand Down
2 changes: 1 addition & 1 deletion src/swganh_core/combat/buff_manager.cc
Expand Up @@ -21,7 +21,7 @@ using namespace std;

BuffManager::BuffManager(swganh::app::SwganhKernel* kernel)
: kernel_(kernel)
, timer_(kernel_->GetIoService())
, timer_(kernel_->GetCpuThreadPool())
{}

void BuffManager::Start()
Expand Down
2 changes: 1 addition & 1 deletion src/swganh_core/combat/combat_service.cc
Expand Up @@ -60,7 +60,7 @@ using swganh::app::SwganhKernel;

CombatService::CombatService(SwganhKernel* kernel)
: generator_(1, 100)
, active_(kernel->GetIoService())
, active_(kernel->GetCpuThreadPool())
, kernel_(kernel)
, buff_manager_(kernel)
{
Expand Down
4 changes: 2 additions & 2 deletions src/swganh_core/command/command_queue.cc
Expand Up @@ -30,10 +30,10 @@ using swganh::object::Tangible;
CommandQueue::CommandQueue(
swganh::app::SwganhKernel* kernel)
: kernel_(kernel)
, timer_(kernel->GetIoService())
, timer_(kernel->GetCpuThreadPool())
, processing_(false)
, default_command_(nullptr)
, active_(kernel->GetIoService())
, active_(kernel->GetCpuThreadPool())
{
command_service_ = kernel->GetServiceManager()->GetService<CommandService>("CommandService");
}
Expand Down
4 changes: 2 additions & 2 deletions src/swganh_core/connection/connection_client.cc
Expand Up @@ -14,8 +14,8 @@ using namespace swganh::connection;
using namespace swganh::object;
using namespace swganh::observer;

ConnectionClient::ConnectionClient(ServerInterface* server, boost::asio::io_service& io_service, boost::asio::ip::udp::endpoint remote_endpoint)
: ConnectionClientInterface(server, io_service, remote_endpoint)
ConnectionClient::ConnectionClient(ServerInterface* server, boost::asio::io_service& cpu_pool, boost::asio::ip::udp::endpoint remote_endpoint)
: ConnectionClientInterface(server, cpu_pool, remote_endpoint)
{}

ConnectionClient::State ConnectionClient::GetState() const
Expand Down
2 changes: 1 addition & 1 deletion src/swganh_core/connection/connection_client.h
Expand Up @@ -19,7 +19,7 @@ class ConnectionClient : public swganh::connection::ConnectionClientInterface
/**
* Creates a new instance
*/
ConnectionClient(swganh::network::soe::ServerInterface* server, boost::asio::io_service& io_service, boost::asio::ip::udp::endpoint remote_endpoint);
ConnectionClient(swganh::network::soe::ServerInterface* server, boost::asio::io_service& cpu_pool, boost::asio::ip::udp::endpoint remote_endpoint);

/**
* @return the current state of this remote client
Expand Down
4 changes: 2 additions & 2 deletions src/swganh_core/connection/connection_client_interface.h
Expand Up @@ -26,9 +26,9 @@ class ConnectionClientInterface : public swganh::network::soe::Session
DISCONNECTING
};

ConnectionClientInterface(swganh::network::soe::ServerInterface* server, boost::asio::io_service& io_service,
ConnectionClientInterface(swganh::network::soe::ServerInterface* server, boost::asio::io_service& cpu_pool,
boost::asio::ip::udp::endpoint remote_endpoint)
: Session(server, io_service, remote_endpoint)
: Session(server, cpu_pool, remote_endpoint)
{
}

Expand Down
6 changes: 3 additions & 3 deletions src/swganh_core/connection/connection_service.cc
Expand Up @@ -53,7 +53,7 @@ ConnectionService::ConnectionService(
: ConnectionServiceInterface(kernel)
, kernel_(kernel)
, ping_server_(nullptr)
, active_(kernel->GetIoService())
, active_(kernel->GetIoThreadPool())
, listen_address_(listen_address)
, listen_port_(listen_port)
, ping_port_(ping_port)
Expand Down Expand Up @@ -85,7 +85,7 @@ ServiceDescription ConnectionService::GetServiceDescription() {
}

void ConnectionService::Startup() {
ping_server_ = make_shared<PingServer>(kernel_->GetIoService(), ping_port_);
ping_server_ = make_shared<PingServer>(kernel_->GetIoThreadPool(), ping_port_);

character_service_ = kernel_->GetServiceManager()->GetService<CharacterServiceInterface>("CharacterService");
login_service_ = kernel_->GetServiceManager()->GetService<LoginServiceInterface>("LoginService");
Expand Down Expand Up @@ -128,7 +128,7 @@ shared_ptr<Session> ConnectionService::CreateSession(const udp::endpoint& endpoi
boost::lock_guard<boost::mutex> lg(session_map_mutex_);
if (session_map_.find(endpoint) == session_map_.end())
{
session = make_shared<ConnectionClient>(this, kernel_->GetIoService(), endpoint);
session = make_shared<ConnectionClient>(this, kernel_->GetCpuThreadPool(), endpoint);
session_map_.insert(make_pair(endpoint, session));
LOG(info) << "Created Connection Service Session for " << endpoint.address().to_string();
}
Expand Down
2 changes: 1 addition & 1 deletion src/swganh_core/connection/connection_service_interface.h
Expand Up @@ -50,7 +50,7 @@ class ConnectionServiceInterface : public swganh::service::ServiceInterface, pub
public:

ConnectionServiceInterface(swganh::app::SwganhKernel* kernel)
: swganh::network::BaseSwgServer(kernel->GetIoService())
: swganh::network::BaseSwgServer(kernel->GetIoThreadPool())
{
}

Expand Down
2 changes: 1 addition & 1 deletion src/swganh_core/galaxy/galaxy_service.cc
Expand Up @@ -55,7 +55,7 @@ uint64_t GalaxyService::GetGalaxyTimeInMilliseconds()
}
void GalaxyService::Startup()
{
galaxy_timer_ = std::make_shared<boost::asio::deadline_timer>(kernel_->GetIoService(), boost::posix_time::seconds(10));
galaxy_timer_ = std::make_shared<boost::asio::deadline_timer>(kernel_->GetCpuThreadPool(), boost::posix_time::seconds(10));

galaxy_timer_->async_wait(boost::bind(&GalaxyService::GalaxyStatusTimerHandler_, this, boost::asio::placeholders::error, 10));
}
Expand Down
4 changes: 2 additions & 2 deletions src/swganh_core/login/login_client.cc
Expand Up @@ -10,8 +10,8 @@ using namespace swganh::login;
using namespace swganh::login;

LoginClient::LoginClient(
ServerInterface* server, boost::asio::io_service& io_service, boost::asio::ip::udp::endpoint remote_endpoint)
: LoginClientInterface(server, io_service, remote_endpoint)
ServerInterface* server, boost::asio::io_service& cpu_pool, boost::asio::ip::udp::endpoint remote_endpoint)
: LoginClientInterface(server, cpu_pool, remote_endpoint)
{}

string LoginClient::GetUsername() const
Expand Down
2 changes: 1 addition & 1 deletion src/swganh_core/login/login_client.h
Expand Up @@ -9,7 +9,7 @@ namespace login {

class LoginClient : public swganh::login::LoginClientInterface {
public:
LoginClient(swganh::network::soe::ServerInterface* server, boost::asio::io_service& io_service, boost::asio::ip::udp::endpoint remote_endpoint);
LoginClient(swganh::network::soe::ServerInterface* server, boost::asio::io_service& cpu_pool, boost::asio::ip::udp::endpoint remote_endpoint);

std::string GetUsername() const;
void SetUsername(std::string username);
Expand Down
4 changes: 2 additions & 2 deletions src/swganh_core/login/login_client_interface.h
Expand Up @@ -14,8 +14,8 @@ class Account;

class LoginClientInterface : public swganh::network::soe::Session {
public:
LoginClientInterface(swganh::network::soe::ServerInterface* server, boost::asio::io_service& io_service, boost::asio::ip::udp::endpoint remote_endpoint)
:Session(server, io_service, remote_endpoint)
LoginClientInterface(swganh::network::soe::ServerInterface* server, boost::asio::io_service& cpu_pool, boost::asio::ip::udp::endpoint remote_endpoint)
:Session(server, cpu_pool, remote_endpoint)
{
}

Expand Down

0 comments on commit f393813

Please sign in to comment.