Skip to content

Commit

Permalink
Added a base connection service which defines the connection service …
Browse files Browse the repository at this point in the history
…interface and provides some default functionality for routing messages.
  • Loading branch information
apathyboy committed Aug 3, 2011
1 parent bcbb0d4 commit 6931453
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 0 deletions.
136 changes: 136 additions & 0 deletions src/swganh/connection/base_connection_service.cc
@@ -0,0 +1,136 @@

#include "swganh/connection/base_connection_service.h"

#include <boost/lexical_cast.hpp>
#include <glog/logging.h>

#include "anh/crc.h"
#include "anh/event_dispatcher/basic_event.h"
#include "anh/event_dispatcher/event_dispatcher_interface.h"
#include "anh/network/soe/packet.h"
#include "anh/network/soe/server.h"
#include "anh/service/service_manager.h"

#include "swganh/app/swganh_kernel.h"

#include "swganh/connection/connection_client.h"

using namespace anh::network;

using namespace swganh::base;
using namespace swganh::character;
using namespace swganh::connection;
using namespace swganh::login;

using namespace std;

using boost::asio::ip::udp;

BaseConnectionService::BaseConnectionService(std::shared_ptr<anh::app::KernelInterface> kernel)
: BaseService(kernel)
, soe_server_(nullptr)
{
soe_server_.reset(new soe::Server(
kernel->GetIoService(),
bind(&BaseConnectionService::HandleMessage, this, placeholders::_1)));
soe_server_->event_dispatcher(kernel->GetEventDispatcher());
}

void BaseConnectionService::DescribeConfigOptions(boost::program_options::options_description& description) {
description.add_options()
("service.connection.udp_port", boost::program_options::value<uint16_t>(&listen_port_),
"The port the connection service will listen for incoming client connections on")
("service.connection.address", boost::program_options::value<string>(&listen_address_),
"The public address the connection service will listen for incoming client connections on")
;

OnDescribeConfigOptions(description);
}

void BaseConnectionService::onStart() {
soe_server_->Start(listen_port_);

character_service_ = std::static_pointer_cast<BaseCharacterService>(kernel()->GetServiceManager()->GetService("CharacterService"));
login_service_ = std::static_pointer_cast<swganh::login::LoginServiceInterface>(kernel()->GetServiceManager()->GetService("LoginService"));
}

void BaseConnectionService::onStop() {
soe_server_->Shutdown();
clients_.clear();
}

const string& BaseConnectionService::listen_address() {
return listen_address_;
}

uint16_t BaseConnectionService::listen_port() {
return listen_port_;
}

BaseConnectionService::ClientMap& BaseConnectionService::clients() {
return clients_;
}

std::unique_ptr<anh::network::soe::Server>& BaseConnectionService::server() {
return soe_server_;
}

shared_ptr<BaseCharacterService> BaseConnectionService::character_service() {
return character_service_;
}

shared_ptr<LoginServiceInterface> BaseConnectionService::login_service() {
return login_service_;
}

void BaseConnectionService::HandleMessage(shared_ptr<soe::Packet> packet)
{
auto message = packet->message();

uint32_t message_type = message->peekAt<uint32_t>(message->read_position() + sizeof(uint16_t));

MessageHandlerMap::accessor a;

// No handler specified, trigger an event.
if (!handlers_.find(a, message_type)) {
DLOG(WARNING) << "Received message with no handler, triggering event: "
<< std::hex << message_type << "\n\n" << *message;

kernel()->GetEventDispatcher()->trigger(
anh::event_dispatcher::make_shared_event(
message_type,
*packet));

return;
}

auto client = GetClientFromEndpoint(packet->session()->remote_endpoint());

if (!client) {
if (a->second.second) {
DLOG(WARNING) << "Received a message from an invalid source: "
<< std::hex << message_type << "\n\n" << *message;

throw std::runtime_error("A valid client is required to invoke this message handler");
} else {
client = make_shared<ConnectionClient>();
client->session = packet->session();
}
}

a->second.first(client, packet);
}

shared_ptr<ConnectionClient> BaseConnectionService::GetClientFromEndpoint(
const udp::endpoint& remote_endpoint)
{
shared_ptr<ConnectionClient> client = nullptr;

ClientMap::accessor a;

if (clients_.find(a, remote_endpoint)) {
client = a->second;
}

return client;
}
119 changes: 119 additions & 0 deletions src/swganh/connection/base_connection_service.h
@@ -0,0 +1,119 @@

#ifndef SWGANH_CONNECTION_BASE_CONNECTION_SERVICE_H_
#define SWGANH_CONNECTION_BASE_CONNECTION_SERVICE_H_

#include <cstdint>
#include <memory>
#include <string>

#include <tbb/concurrent_hash_map.h>

#include "anh/hash_string.h"

#include "anh/network/soe/packet.h"
#include "anh/network/soe/packet_utilities.h"
#include "anh/network/soe/session.h"

#include "swganh/base/base_service.h"

#include "swganh/character/base_character_service.h"
#include "swganh/login/login_service_interface.h"

namespace anh {
namespace network {
namespace soe {
class Server;
}}} // namespace anh::network::soe

namespace swganh {
namespace connection {

struct ConnectionClient;

class BaseConnectionService : public swganh::base::BaseService
{
public:
typedef std::function<void (
std::shared_ptr<ConnectionClient> client,
std::shared_ptr<anh::network::soe::Packet> packet)
> MessageHandler;

typedef tbb::concurrent_hash_map<
boost::asio::ip::udp::endpoint,
std::shared_ptr<ConnectionClient>,
anh::network::soe::EndpointHashCompare
> ClientMap;

typedef tbb::concurrent_hash_map<
anh::HashString,
std::pair<MessageHandler, bool> // bool IsClientRequired
> MessageHandlerMap;

public:
explicit BaseConnectionService(std::shared_ptr<anh::app::KernelInterface> kernel);

void DescribeConfigOptions(boost::program_options::options_description& description);

void onStart();
void onStop();

template<typename MessageType>
void RegisterMessageHandler(
anh::HashString message_type_id,
std::function<void (std::shared_ptr<ConnectionClient>, MessageType)> handler,
bool client_required = true)
{
MessageHandlerMap::accessor a;
if (handlers_.find(a, message_type_id)) {
throw std::runtime_error("A handler has already been defined for the message type: " + message_type_id.ident_string());
}

auto wrapped_handler = [this, handler] (
std::shared_ptr<ConnectionClient> client,
std::shared_ptr<anh::network::soe::Packet> packet)
{
MessageType message;
message.deserialize(*packet->message());

handler(client, message);
};

handlers_.insert(std::make_pair(message_type_id, std::make_pair(wrapped_handler, client_required)));
}

void HandleMessage(std::shared_ptr<anh::network::soe::Packet> packet);

std::shared_ptr<ConnectionClient> GetClientFromEndpoint(
const boost::asio::ip::udp::endpoint& remote_endpoint);

protected:
virtual void OnDescribeConfigOptions(boost::program_options::options_description& description) = 0;

const std::string& listen_address();

uint16_t listen_port();

ClientMap& clients();

std::unique_ptr<anh::network::soe::Server>& server();

std::shared_ptr<swganh::character::BaseCharacterService> character_service();

std::shared_ptr<swganh::login::LoginServiceInterface> login_service();

private:
ClientMap clients_;
MessageHandlerMap handlers_;

std::unique_ptr<anh::network::soe::Server> soe_server_;

std::shared_ptr<swganh::character::BaseCharacterService> character_service_;
std::shared_ptr<swganh::login::LoginServiceInterface> login_service_;

std::string listen_address_;
uint16_t listen_port_;
};

}} // namespace swganh::connection

#endif // SWGANH_CONNECTION_BASE_CONNECTION_SERVICE_H_
25 changes: 25 additions & 0 deletions src/swganh/connection/connection_client.h
@@ -0,0 +1,25 @@

#ifndef SWGANH_CONNECTION_CONNECTION_CLIENT_H_
#define SWGANH_CONNECTION_CONNECTION_CLIENT_H_

#include <cstdint>
#include <memory>

namespace anh {
namespace network {
namespace soe {
class Session;
}}} // namespace anh::network::soe

namespace swganh {
namespace connection {

struct ConnectionClient {
uint32_t account_id;
uint64_t player_id;
std::shared_ptr<anh::network::soe::Session> session;
};

}} // namespace swganh::connection

#endif // SWGANH_CONNECTION_CONNECTION_CLIENT_H_

0 comments on commit 6931453

Please sign in to comment.