Skip to content

Commit

Permalink
[Core / Service]: Fixed crash that occured when communicating with eC…
Browse files Browse the repository at this point in the history
…AL 5.12 services (#1579)

The crash occured when a v0 client connected to a service server.
  • Loading branch information
FlorianReimold committed May 3, 2024
1 parent 390d6a0 commit e52a6a8
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 1 deletion.
12 changes: 12 additions & 0 deletions ecal/service/ecal_service/src/server_session_impl_v0.cpp
Expand Up @@ -74,6 +74,18 @@ namespace eCAL
// Data receiving and sending
///////////////////////////////////////////////
void ServerSessionV0::start()
{
// Call the handle_start with the io_service
// It is important to async call handle_start(), as it will call a
// user-defined callback. As we have no influence what that callback will
// be, we must call it from another thread to make sure to not double-lock
// mutexes from the server_impl, if the callback should itself call a
// server_impl api function.

io_context_->post([me = shared_from_this()]() { me->handle_start(); });
}

void ServerSessionV0::handle_start()
{
// Go to handshake state
state_ = State::CONNECTED;
Expand Down
5 changes: 5 additions & 0 deletions ecal/service/ecal_service/src/server_session_impl_v0.h
Expand Up @@ -75,6 +75,11 @@ namespace eCAL
///////////////////////////////////////////////
public:
void start() override;

private:
void handle_start();

public:
void stop() override;

eCAL::service::State get_state() const override;
Expand Down
147 changes: 146 additions & 1 deletion ecal/service/test/src/ecal_tcp_service_test.cpp
Expand Up @@ -1609,6 +1609,151 @@ TEST(Callback, SerializedServiceCallbacks) // NOLINT
}
#endif

#if 1
// Call different eCAL Service API functions from within the callbacks
TEST(ecal_service, Callback_ApiCallsFromCallbacks)
{
for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++)
{
const auto io_context = std::make_shared<asio::io_context>();
const asio::io_context::work dummy_work(*io_context);

atomic_signalable<int> num_server_service_callback_called (0);
atomic_signalable<int> num_server_event_callback_called (0);
atomic_signalable<int> num_client_response_callback_called(0);
atomic_signalable<int> num_client_event_callback_called (0);

// declare server and client shared_ptrs as we need those in the callbacks.
std::shared_ptr<eCAL::service::Server> server;
std::shared_ptr<eCAL::service::ClientSession> client;

const eCAL::service::Server::ServiceCallbackT server_service_callback
= [&num_server_service_callback_called, &server](const std::shared_ptr<const std::string>& /*request*/, const std::shared_ptr<std::string>& /*response*/) -> void
{
if (server)
{
bool is_connected = server->is_connected();
int connection_count = server->get_connection_count();
uint16_t port = server->get_port();

EXPECT_EQ(is_connected, true);
EXPECT_EQ(connection_count, 1);
}

num_server_service_callback_called++;
};

const eCAL::service::Server::EventCallbackT server_event_callback
= [&num_server_event_callback_called, &server](eCAL::service::ServerEventType event, const std::string& /*message*/) -> void
{
if (server)
{
bool is_connected = server->is_connected();
int connection_count = server->get_connection_count();
uint16_t port = server->get_port();

if (event == eCAL::service::ServerEventType::Connected)
{
ASSERT_EQ(is_connected, true);
ASSERT_EQ(connection_count, 1);
}
else if (event == eCAL::service::ServerEventType::Disconnected)
{
ASSERT_EQ(is_connected, false);
ASSERT_EQ(connection_count, 0);
}
}

num_server_event_callback_called++;
};

const eCAL::service::ClientSession::ResponseCallbackT client_response_callback
= [&num_client_response_callback_called, &client]
(const eCAL::service::Error& error, const std::shared_ptr<std::string>& /*response*/) -> void
{
if(client)
{
// We just test if those functions can be called without crashing
auto address = client->get_address();
auto port = client->get_port();
auto protocol_version = client->get_accepted_protocol_version();
auto queue_size = client->get_queue_size();
auto state = client->get_state();
}

num_client_response_callback_called++;
};

const eCAL::service::ClientSession::EventCallbackT client_event_callback
= [&num_client_event_callback_called, &client]
(eCAL::service::ClientEventType event, const std::string& /*message*/) -> void
{
if (client)
{
// We just test if those functions can be called without crashing
auto address = client->get_address();
auto port = client->get_port();
auto protocol_version = client->get_accepted_protocol_version();
auto queue_size = client->get_queue_size();
auto state = client->get_state();
}

num_client_event_callback_called++;
};

server = eCAL::service::Server::create(io_context, protocol_version, 0, server_service_callback, true, server_event_callback);

EXPECT_EQ(num_server_service_callback_called.get(), 0);
EXPECT_EQ(num_server_event_callback_called.get(), 0);
EXPECT_EQ(num_client_response_callback_called.get(), 0);
EXPECT_EQ(num_client_event_callback_called.get(), 0);

client = eCAL::service::ClientSession::create(io_context, protocol_version, "127.0.0.1", server->get_port(), client_event_callback);

std::thread io_thread([&io_context]()
{
io_context->run();
});

// Wait for the connected events to be called
num_server_event_callback_called.wait_for([](int value) { return value >= 1; }, std::chrono::milliseconds(500));
num_client_event_callback_called.wait_for([](int value) { return value >= 1; }, std::chrono::milliseconds(500));

EXPECT_EQ(num_server_service_callback_called.get(), 0);
EXPECT_EQ(num_server_event_callback_called.get(), 1);
EXPECT_EQ(num_client_response_callback_called.get(), 0);
EXPECT_EQ(num_client_event_callback_called.get(), 1);

// Call service and wait for the response
client->async_call_service(std::make_shared<std::string>("1"), client_response_callback);

num_server_service_callback_called .wait_for([](int value) { return value >= 1; }, std::chrono::milliseconds(500));
num_client_response_callback_called.wait_for([](int value) { return value >= 1; }, std::chrono::milliseconds(500));

EXPECT_EQ(num_server_service_callback_called.get(), 1);
EXPECT_EQ(num_server_event_callback_called.get(), 1);
EXPECT_EQ(num_client_response_callback_called.get(), 1);
EXPECT_EQ(num_client_event_callback_called.get(), 1);

// Terminate the client
client = nullptr;

// Wait for the disconnected events to be called
num_server_event_callback_called.wait_for([](int value) { return value >= 2; }, std::chrono::milliseconds(500));

EXPECT_EQ(num_server_service_callback_called.get(), 1);
EXPECT_EQ(num_server_event_callback_called.get(), 2);

// Terminate the server
server = nullptr;

// join the io_thread
io_context->stop();
io_thread.join();
}
}
#endif

#if 1
TEST(ErrorCallback, ErrorCallbackNoServer) // NOLINT
{
Expand Down Expand Up @@ -2598,4 +2743,4 @@ TEST(BlockingCall, Stopped) // NOLINT // This test shows the proper way to stop
}
}
}
#endif
#endif

0 comments on commit e52a6a8

Please sign in to comment.