Skip to content

Commit

Permalink
[core] add/improved IsPublished/IsSubscribed logic (5.12.x)
Browse files Browse the repository at this point in the history
additional check for CSubscriber::GetPublisherCount() using the connection state of the matching subscriber to ensure that GetPublisherCount is increased only if the matching publisher is able to send data
  • Loading branch information
rex-schilasky committed Jun 20, 2024
1 parent 73fbc24 commit 799dcd1
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 6 deletions.
18 changes: 13 additions & 5 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -281,8 +281,12 @@ namespace eCAL

iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par);
}
// inform for local publisher connection
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least one local subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_loc() > 0)
{
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
}
}
}

Expand Down Expand Up @@ -331,8 +335,12 @@ namespace eCAL
const std::string writer_par = tlayer.par_layer().SerializeAsString();
iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par);
}
// inform for external publisher connection
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least one external subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_ext() > 0)
{
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
}
}
}

Expand Down
96 changes: 95 additions & 1 deletion testing/ecal/pubsub_test/src/pubsub_receive_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -219,3 +219,97 @@ TEST(SUBSCRIBER, SporadicEmptyReceives)
// finalize eCAL API
EXPECT_EQ(0, eCAL::Finalize());
}

TEST(PubSub, TestSubscriberSeen)
{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen"));

// enable data loopback
eCAL::Util::EnableLoopback(true);

std::atomic<bool> subscriber_seen_at_publication_start(false);
std::atomic<bool> subscriber_seen_at_publication_end(false);

std::atomic<bool> do_start_publication(false);
std::atomic<bool> publication_finished(false);

// publishing thread
auto publisher_thread = [&]() {
eCAL::CPublisher pub("blob");
pub.ShmSetAcknowledgeTimeout(500);

int cnt(0);
const auto max_runs(1000);
while (eCAL::Ok())
{
if (do_start_publication && cnt < max_runs)
{
if (cnt == 0)
{
subscriber_seen_at_publication_start = pub.IsSubscribed();
}

pub.Send(std::to_string(cnt));
cnt++;

if (cnt == max_runs)
{
subscriber_seen_at_publication_end = pub.IsSubscribed();
publication_finished = true;
break;
}
}
}
};

// subscribing thread
auto subscriber_thread = [&]() {
eCAL::CSubscriber sub("blob");
bool received(false);
auto max_lines(10);
auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
{
if (max_lines != 0)
{
// the final log should look like this
// -----------------------------------
// Receiving 0
// Receiving 1
// Receiving 2
// Receiving 3
// Receiving 4
// Receiving 5
// Receiving 6
// Receiving 7
// Receiving 8
// Receiving 9
// -----------------------------------
std::cout << "Receiving " << std::string(static_cast<const char*>(data_->buf), data_->size) << std::endl;
max_lines--;
}
};
sub.AddReceiveCallback(receive_lambda);

while (eCAL::Ok() && !publication_finished)
{
//if (sub.IsPublished()) do_start_publication = true;
if (sub.GetPublisherCount() > 0) do_start_publication = true;
}
};

// create threads for publisher and subscriber
std::thread pub_thread(publisher_thread);
std::thread sub_thread(subscriber_thread);

// join threads to the main thread
pub_thread.join();
sub_thread.join();

// finalize eCAL API
eCAL::Finalize();

// check if the publisher has seen the subscriber
EXPECT_TRUE(subscriber_seen_at_publication_start);
EXPECT_TRUE(subscriber_seen_at_publication_end);
}

0 comments on commit 799dcd1

Please sign in to comment.