Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Sequence messages can now be followed up by a callback.

  • Loading branch information...
commit 051626f240010eca5e6aa71eb208418d664594f3 1 parent 4e5af1e
@dead1ock dead1ock authored
View
52 src/swganh/network/soe/session.cc
@@ -93,11 +93,14 @@ void Session::Update() {
// Build up a list of data messages to process
uint32_t message_count = outgoing_data_messages_.unsafe_size();
list<ByteBuffer> process_list;
- ByteBuffer tmp;
+ SequencedCallbacks callbacks;
+ OutgoingMessage tmp;
for (uint32_t i = 0; i < message_count; ++i) {
if (outgoing_data_messages_.try_pop(tmp)) {
- process_list.push_back(tmp);
+ process_list.push_back(tmp.first);
+ if(tmp.second.is_initialized())
+ callbacks.push_back(tmp.second.get());
}
}
@@ -113,17 +116,17 @@ void Session::Update() {
data_channel_payload,
max_data_channel_size);
- for_each(fragmented_message.begin(), fragmented_message.end(), [this] (ByteBuffer& fragment) {
- SendSequencedMessage_(&BuildFragmentedDataChannelHeader, move(fragment));
+ for_each(fragmented_message.begin(), fragmented_message.end(), [this, &callbacks] (ByteBuffer& fragment) {
+ SendSequencedMessage_(&BuildFragmentedDataChannelHeader, move(fragment), callbacks);
});
} else {
- SendSequencedMessage_(&BuildDataChannelHeader, move(data_channel_payload));
+ SendSequencedMessage_(&BuildDataChannelHeader, move(data_channel_payload), callbacks);
}
}
-void Session::SendTo(ByteBuffer message)
+void Session::SendTo(ByteBuffer message, boost::optional<SequencedCallback> callback)
{
- outgoing_data_messages_.push(move(message));
+ outgoing_data_messages_.push(OutgoingMessage(move(message), callback));
}
void Session::Close(void)
@@ -218,7 +221,7 @@ void Session::HandleProtocolMessageInternal(swganh::ByteBuffer message)
}
-void Session::SendSequencedMessage_(HeaderBuilder header_builder, ByteBuffer message) {
+void Session::SendSequencedMessage_(HeaderBuilder header_builder, ByteBuffer message, SequencedCallbacks callbacks) {
// Get the next sequence number
uint16_t message_sequence = server_sequence_++;
@@ -230,6 +233,9 @@ void Session::SendSequencedMessage_(HeaderBuilder header_builder, ByteBuffer mes
// Send it over the wire
SendSoePacket_(data_channel_message);
+
+ QueueSequencedCallback(message_sequence, callbacks);
+
// Store it for resending later if necessary
sent_messages_.push_back(make_pair(message_sequence, move(data_channel_message)));
}
@@ -362,6 +368,7 @@ void Session::handleAckA_(AckA packet)
sent_messages_.erase(sent_messages_.begin(), it);
last_acknowledged_sequence_ = packet.sequence;
+ DequeueSequencedCallback(packet.sequence);
}
void Session::handleOutOfOrderA_(OutOfOrderA packet)
@@ -427,3 +434,32 @@ void Session::AcknowledgeSequence_(const uint16_t& sequence)
next_client_sequence_ = sequence + 1;
current_client_sequence_ = sequence;
}
+
+void Session::QueueSequencedCallback(uint16_t sequence, SequencedCallbacks callbacks)
+{
+ auto iter = acknowledgement_callbacks_.find(sequence);
+ if(iter == acknowledgement_callbacks_.end())
+ {
+ acknowledgement_callbacks_.insert(std::pair<uint16_t, SequencedCallbacks>(sequence, callbacks));
+ }
+ else
+ LOG(error) << "Sequence callback is slotted but has not been dequeued.";
+}
+
+void Session::DequeueSequencedCallback(uint16_t sequence)
+{
+ for(auto& item : acknowledgement_callbacks_)
+ {
+ if(item.first > sequence)
+ break;
+
+ for(auto& func : item.second)
+ {
+ func(sequence);
+ }
+ }
+
+ auto iter = acknowledgement_callbacks_.find(sequence);
+ if(iter != acknowledgement_callbacks_.end())
+ acknowledgement_callbacks_.erase(acknowledgement_callbacks_.begin(), iter);
+}
View
18 src/swganh/network/soe/session.h
@@ -19,6 +19,7 @@ namespace Concurrency {
#endif
#include <boost/asio.hpp>
+#include <boost/optional.hpp>
#include "swganh/network/soe/protocol_packets.h"
#include "swganh/network/soe/server_interface.h"
@@ -50,6 +51,10 @@ class Session : public std::enable_shared_from_this<Session> {
Session(ServerInterface* server, boost::asio::io_service& io_service, boost::asio::ip::udp::endpoint remote_endpoint);
~Session();
+ typedef std::function<void(uint16_t /* sequence */)> SequencedCallback;
+ typedef std::list<SequencedCallback> SequencedCallbacks;
+ typedef std::pair<ByteBuffer, boost::optional<SequencedCallback>> OutgoingMessage;
+
/**
* @return The current send sequence for the server.
*/
@@ -96,7 +101,7 @@ class Session : public std::enable_shared_from_this<Session> {
*
* @param message The payload to send in the data channel message(s).
*/
- void SendTo(swganh::ByteBuffer message);
+ void SendTo(ByteBuffer message, boost::optional<SequencedCallback> callback = boost::optional<SequencedCallback>());
/**
* Sends a data channel message to the remote client.
@@ -112,7 +117,7 @@ class Session : public std::enable_shared_from_this<Session> {
ByteBuffer message_buffer;
message.Serialize(message_buffer);
- outgoing_data_messages_.push(std::move(message_buffer));
+ outgoing_data_messages_.push(OutgoingMessage(std::move(message_buffer), boost::optional<SequencedCallback>()));
}
void HandleMessage(swganh::ByteBuffer message);
@@ -142,7 +147,7 @@ class Session : public std::enable_shared_from_this<Session> {
typedef swganh::ByteBuffer(*HeaderBuilder)(uint16_t);
- void SendSequencedMessage_(HeaderBuilder header_builder, ByteBuffer message);
+ void SendSequencedMessage_(HeaderBuilder header_builder, ByteBuffer message, SequencedCallbacks callbacks);
virtual void OnClose() {}
@@ -162,6 +167,8 @@ class Session : public std::enable_shared_from_this<Session> {
bool SequenceIsValid_(const uint16_t& sequence);
void AcknowledgeSequence_(const uint16_t& sequence);
+ void QueueSequencedCallback(uint16_t sequence, SequencedCallbacks);
+ void DequeueSequencedCallback(uint16_t sequence);
boost::asio::ip::udp::endpoint remote_endpoint_; // ip_address
ServerInterface* server_; // owner
@@ -188,9 +195,10 @@ class Session : public std::enable_shared_from_this<Session> {
// Net Stats
NetStatsServer server_net_stats_;
- Concurrency::concurrent_queue<swganh::ByteBuffer> outgoing_data_messages_;
+ Concurrency::concurrent_queue<OutgoingMessage> outgoing_data_messages_;
std::list<swganh::ByteBuffer> incoming_fragmented_messages_;
+
uint16_t incoming_fragmented_total_len_;
uint16_t incoming_fragmented_curr_len_;
@@ -201,6 +209,8 @@ class Session : public std::enable_shared_from_this<Session> {
filters::DecryptionFilter decryption_filter_;
filters::EncryptionFilter encryption_filter_;
filters::SecurityFilter security_filter_;
+
+ std::map<uint16_t, SequencedCallbacks> acknowledgement_callbacks_;
};
}}} // namespace swganh::network::soe
View
9 src/swganh/observer/observer_interface.h
@@ -3,6 +3,7 @@
#pragma once
#include "swganh/byte_buffer.h"
+#include "swganh/network/soe/session.h"
namespace swganh
{
@@ -28,6 +29,14 @@ namespace observer {
* @param message Message containing the updated state of the observable object.
*/
virtual void Notify(swganh::messages::BaseSwgMessage* message) = 0;
+
+ /**
+ * Notifies observer that the observable object has changed state.
+ *
+ * @param message Message containing the updated state of the observable object.
+ * @param callback SequencedCallback fired once acknowledgement is received.
+ */
+ virtual void Notify(swganh::messages::BaseSwgMessage* message, swganh::network::soe::Session::SequencedCallback&&) = 0;
};
}} // namespace swganh::observer
View
14 src/swganh_core/object/object.cc
@@ -145,10 +145,10 @@ void Object::TransferObject(std::shared_ptr<Object> requester, std::shared_ptr<O
boost::upgrade_to_unique_lock<boost::shared_mutex> unique(uplock);
//Perform the transfer
- for(auto& slot : slot_descriptor_)
- {
- slot.second->remove_object(object);
- }
+ //for(auto& slot : slot_descriptor_)
+ //{
+ // slot.second->remove_object(object);
+ //}
arrangement_id = newContainer->__InternalInsert(object, new_position, arrangement_id);
}
@@ -406,7 +406,7 @@ void Object::__InternalAddAwareObject(std::shared_ptr<swganh::object::Object> ob
if(GetPermissions()->canView(shared_from_this(), object))
{
- reverse_still_valid = false;
+ //reverse_still_valid = false;
}
for(auto& slot : slot_descriptor_)
@@ -430,14 +430,14 @@ void Object::__InternalRemoveAwareObject(std::shared_ptr<swganh::object::Object>
{
if(GetPermissions()->canView(shared_from_this(), object))
{
- reverse_still_valid = false;
+ //reverse_still_valid = false;
}
for(auto& slot : slot_descriptor_)
{
slot.second->view_objects([&] (const std::shared_ptr<Object>& v) {
v->__InternalRemoveAwareObject(object, reverse_still_valid);
- if(reverse_still_valid)
+ if(!reverse_still_valid)
{
object->__InternalRemoveAwareObject(v, reverse_still_valid);
}
View
12 src/swganh_core/object/object_controller.cc
@@ -67,6 +67,14 @@ void ObjectController::Notify(BaseSwgMessage* message)
{
swganh::ByteBuffer buffer;
message->SetObserverId(GetId());
- message->Serialize(buffer);
- client_->SendTo(buffer);
+ message->Serialize(buffer);
+ client_->SendTo(buffer, boost::optional<swganh::network::soe::Session::SequencedCallback>());
+}
+
+void ObjectController::Notify(BaseSwgMessage* message, swganh::network::soe::Session::SequencedCallback&& callback)
+{
+ swganh::ByteBuffer buffer;
+ message->SetObserverId(GetId());
+ message->Serialize(buffer);
+ client_->SendTo(buffer, move(callback));
}
View
10 src/swganh_core/object/object_controller.h
@@ -80,7 +80,15 @@ namespace object {
*
* @param message The message to be delivered to the remote client.
*/
- void Notify(swganh::messages::BaseSwgMessage* message);
+ void Notify(swganh::messages::BaseSwgMessage* message);
+
+ /**
+ * Notifies the controller when the object has been updated.
+ *
+ * @param message The message to be delivered to the remote client.
+ * @param callback SequencedCallback fired once acknowledgement is received.
+ */
+ void Notify(swganh::messages::BaseSwgMessage* message, swganh::network::soe::Session::SequencedCallback&& callback);
private:
View
8 src/swganh_core/object/object_controller_binding.h
@@ -24,8 +24,8 @@ struct ObserverInterfaceWrapper : ObserverInterface, wrapper<ObserverInterface>
void exportObjectController()
{
- typedef void (ObserverInterface::*NotifyFunc)(const swganh::ByteBuffer& message);
- class_<ObserverInterface, std::shared_ptr<ObserverInterface>, boost::noncopyable>("ObserverInterface", "Object that describes the observer of an object", no_init)
- .def("notify", NotifyFunc(&ObserverInterface::Notify), "Notifies the controller whent he object has been updated")
- ;
+ //typedef void (ObserverInterface::*NotifyFunc)(const swganh::ByteBuffer& message);
+ //class_<ObserverInterface, std::shared_ptr<ObserverInterface>, boost::noncopyable>("ObserverInterface", "Object that describes the observer of an object", no_init)
+ // .def("notify", NotifyFunc(&ObserverInterface::Notify), "Notifies the controller whent he object has been updated")
+ //;
}
View
29 src/swganh_core/simulation/node.cc
@@ -331,6 +331,35 @@ void Node::SvgDumpObjects(std::ofstream& file)
file << "<text x=\"" << obj->GetPosition().x << "\" y=\"" << obj->GetPosition().z * -1.0f << "\" fill=\"black\" style=\"text-anchor: middle;\" font-size=\"8px\">" << std::string(name.begin(), name.end()) << "<" << '/' << "text>\n";
file << "<polygon points=\"" << bounding_volume_points.str() << "\" style=\"fill-opacity:0;fill:none;stroke:red;stroke-width:0.4px\"" << '/' << "> \n";
file << "<polygon points=\"" << current_collision_points.str() << "\" style=\"fill-opacity:0;fill:none;stroke:blue;stroke-width:0.4px\"" << '/' << "> \n";
+
+ obj->ViewObjects(obj, 0, true, [=, &file](std::shared_ptr<swganh::object::Object> object) {
+ if(object->GetCustomName().size() > 0)
+ {
+ std::cout << "Printing internal object of ";
+ std::wcout << obj->GetCustomName() << " : ";
+ std::wcout << object->GetCustomName() << std::endl;
+ }
+ std::stringstream bounding_volume_points;
+
+ auto bounding_volume = object->GetAABB();
+ auto collision_box = object->GetWorldCollisionBox();
+
+ current_collision_points = std::stringstream();
+ boost::geometry::for_each_point(collision_box, GetCollisionBoxPoints<Point>);
+
+ boost::geometry::box_view<swganh::object::AABB> bounding_volume_view(bounding_volume);
+ for(boost::range_iterator<boost::geometry::box_view<swganh::object::AABB>>::type it = boost::begin(bounding_volume_view); it != boost::end(bounding_volume_view); ++it)
+ {
+ bounding_volume_points << " " << (*it).x() << "," << (*it).y() * -1.0f;
+ }
+
+ auto name = object->GetCustomName();
+ auto abs_position = glm::vec3();
+ object->GetAbsolutes(abs_position, object->GetOrientation());
+ file << "<text x=\"" << abs_position.x << "\" y=\"" << abs_position.z * -1.0f << "\" fill=\"black\" style=\"text-anchor: middle;\" font-size=\"8px\">" << std::string(name.begin(), name.end()) << " * <" << '/' << "text>\n";
+ file << "<polygon points=\"" << bounding_volume_points.str() << "\" style=\"fill-opacity:0;fill:none;stroke:red;stroke-width:0.4px\"" << '/' << "> \n";
+ file << "<polygon points=\"" << current_collision_points.str() << "\" style=\"fill-opacity:0;fill:none;stroke:blue;stroke-width:0.4px\"" << '/' << "> \n";
+ });
}
if(state_ == BRANCH)
View
19 src/swganh_core/simulation/simulation_service.cc
@@ -235,11 +235,6 @@ class SimulationServiceImpl {
throw std::runtime_error("Requested transfer to an invalid scene: " + scene);
}
- /**auto old_scene = scene_manager_->GetScene(obj->GetSceneId());
- if(old_scene) {
- old_scene->RemoveObject(obj);
- }*/
-
// Clear Controller
auto controller = obj->GetController();
obj->ClearController();
@@ -260,9 +255,6 @@ class SimulationServiceImpl {
obj->UpdateWorldCollisionBox();
obj->UpdateAABB();
- // Reset Controller
- obj->SetController(controller);
-
// CmdStartScene
if(obj->GetController() != nullptr)
{
@@ -275,11 +267,14 @@ class SimulationServiceImpl {
start_scene.shared_race_template = obj->GetTemplate();
start_scene.galaxy_time = 0;
- obj->GetController()->Notify(&start_scene);
- }
+ obj->GetController()->Notify(&start_scene, [=](uint16_t sequence) {
+ // Reset Controller
+ obj->SetController(controller);
- // Add object to scene and send baselines
- scene_obj->AddObject(obj);
+ // Add object to scene and send baselines
+ scene_obj->AddObject(obj);
+ });
+ }
}
shared_ptr<Object> TransferObjectToScene(uint64_t object_id, const string& scene)
View
1  src/swganh_core/spawn/fsm_controller.h
@@ -25,6 +25,7 @@ class FsmController : public swganh::observer::ObserverInterface
virtual uint64_t GetId() const;
virtual void Notify(swganh::messages::BaseSwgMessage* message);
+ virtual void Notify(swganh::messages::BaseSwgMessage* message, swganh::network::soe::Session::SequencedCallback&& callback) { };
virtual void Cleanup(boost::posix_time::ptime current_time_);
Please sign in to comment.
Something went wrong with that request. Please try again.