Skip to content

Commit

Permalink
iox-eclipse-iceoryx#408 remove queueCapacity from subscribe method
Browse files Browse the repository at this point in the history
Signed-off-by: Mathias Kraus <mathias.kraus@apex.ai>
  • Loading branch information
elBoberido committed Dec 17, 2020
1 parent f3b31b4 commit 0dd9787
Show file tree
Hide file tree
Showing 43 changed files with 235 additions and 173 deletions.
3 changes: 1 addition & 2 deletions iceoryx_binding_c/include/iceoryx_binding_c/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ void iox_sub_deinit(iox_sub_t const self);

/// @brief subscribes to the service
/// @param[in] self handle to the subscriber
/// @param[in] queueCapacity size of the receiver queue
void iox_sub_subscribe(iox_sub_t const self, const uint64_t queueCapacity);
void iox_sub_subscribe(iox_sub_t const self);

/// @brief unsubscribes from a service
/// @param[in] self handle to the subscriber
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_binding_c/source/c_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ void iox_sub_deinit(iox_sub_t const self)
self->~cpp2c_Subscriber();
}

void iox_sub_subscribe(iox_sub_t const self, const uint64_t queueCapacity)
void iox_sub_subscribe(iox_sub_t const self)
{
SubscriberPortUser(self->m_portData).subscribe(queueCapacity);
SubscriberPortUser(self->m_portData).subscribe();
}

void iox_sub_unsubscribe(iox_sub_t const self)
Expand Down
24 changes: 13 additions & 11 deletions iceoryx_binding_c/test/moduletests/test_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class iox_sub_test : public Test
void Subscribe(SubscriberPortData* ptr)
{
uint64_t queueCapacity = MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY;
iox_sub_subscribe(m_sut, queueCapacity);
iox_sub_subscribe(m_sut);

SubscriberPortSingleProducer(ptr).tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::ACK, TEST_SERVICE_DESCRIPTION);
Expand All @@ -86,8 +86,10 @@ class iox_sub_test : public Test

iox::cxx::GenericRAII m_uniqueRouDiId{[] { iox::popo::internal::setUniqueRouDiId(0); },
[] { iox::popo::internal::unsetUniqueRouDiId(); }};
iox::popo::SubscriberPortData m_portPtr{
TEST_SERVICE_DESCRIPTION, "myApp", iox::cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer};
iox::popo::SubscriberPortData m_portPtr{TEST_SERVICE_DESCRIPTION,
"myApp",
iox::cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer,
iox::popo::SubscriberOptions()};
ChunkQueuePusher<SubscriberPortData::ChunkQueueData_t> m_chunkPusher{&m_portPtr.m_chunkReceiverData};
std::unique_ptr<cpp2c_Subscriber> m_subscriber{new cpp2c_Subscriber};
iox_sub_t m_sut = m_subscriber.get();
Expand All @@ -105,8 +107,8 @@ TEST_F(iox_sub_test, initialStateNotSubscribed)

TEST_F(iox_sub_test, offerLeadsToSubscibeRequestedState)
{
uint64_t queueCapacity = 1u;
iox_sub_subscribe(m_sut, queueCapacity);
uint64_t queueCapacity = 1U;
iox_sub_subscribe(m_sut);

SubscriberPortSingleProducer(&m_portPtr).tryGetCaProMessage();

Expand All @@ -115,8 +117,8 @@ TEST_F(iox_sub_test, offerLeadsToSubscibeRequestedState)

TEST_F(iox_sub_test, NACKResponseLeadsToSubscribeWaitForOfferState)
{
uint64_t queueCapacity = 1u;
iox_sub_subscribe(m_sut, queueCapacity);
uint64_t queueCapacity = 1U;
iox_sub_subscribe(m_sut);

SubscriberPortSingleProducer(&m_portPtr).tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::NACK, TEST_SERVICE_DESCRIPTION);
Expand All @@ -127,8 +129,8 @@ TEST_F(iox_sub_test, NACKResponseLeadsToSubscribeWaitForOfferState)

TEST_F(iox_sub_test, ACKResponseLeadsToSubscribedState)
{
uint64_t queueCapacity = 1u;
iox_sub_subscribe(m_sut, queueCapacity);
uint64_t queueCapacity = 1U;
iox_sub_subscribe(m_sut);

SubscriberPortSingleProducer(&m_portPtr).tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::ACK, TEST_SERVICE_DESCRIPTION);
Expand All @@ -139,8 +141,8 @@ TEST_F(iox_sub_test, ACKResponseLeadsToSubscribedState)

TEST_F(iox_sub_test, UnsubscribeLeadsToUnscribeRequestedState)
{
uint64_t queueCapacity = 1u;
iox_sub_subscribe(m_sut, queueCapacity);
uint64_t queueCapacity = 1U;
iox_sub_subscribe(m_sut);

SubscriberPortSingleProducer(&m_portPtr).tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::ACK, TEST_SERVICE_DESCRIPTION);
Expand Down
9 changes: 5 additions & 4 deletions iceoryx_binding_c/test/moduletests/test_trigger_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class iox_trigger_info_test : public Test
void Subscribe(SubscriberPortData* ptr)
{
uint64_t queueCapacity = MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY;
iox_sub_subscribe(m_subscriberHandle, queueCapacity);
iox_sub_subscribe(m_subscriberHandle);

SubscriberPortSingleProducer(ptr).tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::ACK, TEST_SERVICE_DESCRIPTION);
Expand All @@ -86,8 +86,10 @@ class iox_trigger_info_test : public Test
Allocator m_memoryAllocator{m_memory, MEMORY_SIZE};
MePooConfig m_mempoolconf;
MemoryManager m_memoryManager;
iox::popo::SubscriberPortData m_portPtr{
TEST_SERVICE_DESCRIPTION, "myApp", iox::cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer};
iox::popo::SubscriberPortData m_portPtr{TEST_SERVICE_DESCRIPTION,
"myApp",
iox::cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer,
iox::popo::SubscriberOptions()};
ChunkQueuePusher<SubscriberPortData::ChunkQueueData_t> m_chunkPusher{&m_portPtr.m_chunkReceiverData};
cpp2c_Subscriber m_subscriber;
iox_sub_t m_subscriberHandle = &m_subscriber;
Expand Down Expand Up @@ -230,4 +232,3 @@ TEST_F(iox_trigger_info_test, callbackCanBeCalledMultipleTimes)

EXPECT_EQ(m_lastTriggerCallbackArgument, &m_userTrigger);
}

5 changes: 3 additions & 2 deletions iceoryx_dds/include/iceoryx_dds/gateway/dds_to_iox.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 by Robert Bosch GmbH, Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,7 +43,8 @@ class DDS2IceoryxGateway : public gateway_t

private:
void* m_reservedChunk = nullptr;
cxx::expected<channel_t, gw::GatewayError> setupChannel(const capro::ServiceDescription& service) noexcept;
cxx::expected<channel_t, gw::GatewayError> setupChannel(const capro::ServiceDescription& service,
const popo::PublisherOptions& publisherOptions) noexcept;
};

} // namespace dds
Expand Down
5 changes: 3 additions & 2 deletions iceoryx_dds/include/iceoryx_dds/gateway/iox_to_dds.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 by Robert Bosch GmbH, Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,7 +38,8 @@ class Iceoryx2DDSGateway : public gateway_t
void forward(const channel_t& channel) noexcept;

private:
cxx::expected<channel_t, gw::GatewayError> setupChannel(const capro::ServiceDescription& service) noexcept;
cxx::expected<channel_t, gw::GatewayError> setupChannel(const capro::ServiceDescription& service,
const popo::SubscriberOptions&) noexcept;
};

} // namespace dds
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 by Robert Bosch GmbH, Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,7 @@ inline void DDS2IceoryxGateway<channel_t, gateway_t>::loadConfiguration(const co
LogDebug() << "[DDS2IceoryxGateway] Setting up channel for service: {"
<< serviceDescription.getServiceIDString() << ", " << serviceDescription.getInstanceIDString()
<< ", " << serviceDescription.getEventIDString() << "}";
setupChannel(serviceDescription);
setupChannel(serviceDescription, popo::PublisherOptions());
}
}
}
Expand Down Expand Up @@ -77,9 +77,10 @@ inline void DDS2IceoryxGateway<channel_t, gateway_t>::forward(const channel_t& c
// ======================================== Private ======================================== //
template <typename channel_t, typename gateway_t>
cxx::expected<channel_t, gw::GatewayError>
DDS2IceoryxGateway<channel_t, gateway_t>::setupChannel(const capro::ServiceDescription& service) noexcept
DDS2IceoryxGateway<channel_t, gateway_t>::setupChannel(const capro::ServiceDescription& service,
const popo::PublisherOptions& publisherOptions) noexcept
{
return this->addChannel(service).and_then([&service](auto channel) {
return this->addChannel(service, publisherOptions).and_then([&service](auto channel) {
auto publisher = channel.getIceoryxTerminal();
auto reader = channel.getExternalTerminal();
publisher->offer();
Expand Down
17 changes: 11 additions & 6 deletions iceoryx_dds/include/iceoryx_dds/internal/gateway/iox_to_dds.inl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 by Robert Bosch GmbH, Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,7 +47,9 @@ inline void Iceoryx2DDSGateway<channel_t, gateway_t>::loadConfiguration(const co
LogDebug() << "[DDS2IceoryxGateway] Setting up channel for service: {"
<< serviceDescription.getServiceIDString() << ", " << serviceDescription.getInstanceIDString()
<< ", " << serviceDescription.getEventIDString() << "}";
setupChannel(serviceDescription);
popo::SubscriberOptions options;
options.queueCapacity = SUBSCRIBER_CACHE_SIZE;
setupChannel(serviceDescription, options);
}
}
}
Expand Down Expand Up @@ -76,7 +78,9 @@ inline void Iceoryx2DDSGateway<channel_t, gateway_t>::discover(const capro::Capr
{
if (!this->findChannel(msg.m_serviceDescription).has_value())
{
setupChannel(msg.m_serviceDescription);
popo::SubscriberOptions options;
options.queueCapacity = SUBSCRIBER_CACHE_SIZE;
setupChannel(msg.m_serviceDescription, options);
}
break;
}
Expand Down Expand Up @@ -112,12 +116,13 @@ inline void Iceoryx2DDSGateway<channel_t, gateway_t>::forward(const channel_t& c

template <typename channel_t, typename gateway_t>
cxx::expected<channel_t, gw::GatewayError>
Iceoryx2DDSGateway<channel_t, gateway_t>::setupChannel(const capro::ServiceDescription& service) noexcept
Iceoryx2DDSGateway<channel_t, gateway_t>::setupChannel(const capro::ServiceDescription& service,
const popo::SubscriberOptions& subscriberOptions) noexcept
{
return this->addChannel(service).and_then([](auto channel) {
return this->addChannel(service, subscriberOptions).and_then([](auto channel) {
auto subscriber = channel.getIceoryxTerminal();
auto dataWriter = channel.getExternalTerminal();
subscriber->subscribe(SUBSCRIBER_CACHE_SIZE);
subscriber->subscribe();
dataWriter->connect();
});
}
Expand Down
14 changes: 5 additions & 9 deletions iceoryx_examples/ice_multi_publisher/ice_resubscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ constexpr uint64_t UNSUBSCRIBED_TIME_SECONDS{3U};

void receive()
{
iox::popo::TypedSubscriber<CounterTopic> subscriber({"Group", "Instance", "Counter"});
iox::popo::SubscriberOptions options;
options.queueCapacity = MAX_NUMBER_OF_SAMPLES - 2U;
iox::popo::TypedSubscriber<CounterTopic> subscriber({"Group", "Instance", "Counter"}, options);

subscriber.subscribe();
uint64_t maxNumSamples = MAX_NUMBER_OF_SAMPLES - 2U;
while (!killswitch)
{
// unsubscribe and resubscribe
Expand All @@ -48,13 +49,8 @@ void receive()
// we will probably miss some data while unsubscribed
std::this_thread::sleep_for(std::chrono::seconds(UNSUBSCRIBED_TIME_SECONDS));

// we (re)subscribe with differing maximum number of samples
// and should see at most the latest last maxNumSamples
maxNumSamples =
maxNumSamples % MAX_NUMBER_OF_SAMPLES + 1U; // cycles between last 1 to MAX_NUMBER_OF_SAMPLES samples
subscriber.subscribe(maxNumSamples);

std::cout << "Subscribe with max number of samples " << maxNumSamples << std::endl;
// we (re)subscribe and should see at most the latest options.queueCapacity
subscriber.subscribe();

std::this_thread::sleep_for(std::chrono::seconds(1));

Expand Down
2 changes: 1 addition & 1 deletion iceoryx_examples/icedelivery_on_c/ice_c_subscriber.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void receiving()

iox_sub_t subscriber =
iox_sub_init(&subscriberStorage, "Radar", "FrontLeft", "Counter", queueCapacity, historyRequest);
iox_sub_subscribe(subscriber, queueCapacity);
iox_sub_subscribe(subscriber);

while (!killswitch)
{
Expand Down
7 changes: 4 additions & 3 deletions iceoryx_examples/singleprocess/single_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ void sender()

void receiver()
{
iox::popo::TypedSubscriber<TransmissionData_t> subscriber({"Single", "Process", "Demo"});
iox::popo::SubscriberOptions options;
options.queueCapacity = 10U;
iox::popo::TypedSubscriber<TransmissionData_t> subscriber({"Single", "Process", "Demo"}, options);

uint64_t cacheQueueSize = 10;
subscriber.subscribe(cacheQueueSize);
subscriber.subscribe();

while (keepRunning.load())
{
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_examples/waitset_on_c/ice_c_waitset_gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ int main()
iox_sub_t subscriber =
iox_sub_init(&(subscriberStorage[i]), "Radar", "FrontLeft", "Counter", queueCapacity, historyRequest);

iox_sub_subscribe(subscriber, queueCapacity);
iox_sub_subscribe(subscriber);
iox_sub_attach_to_waitset(subscriber, waitSet, SubscriberEvent_HAS_NEW_SAMPLES, 1, subscriberCallback);
}

Expand Down
2 changes: 1 addition & 1 deletion iceoryx_examples/waitset_on_c/ice_c_waitset_grouping.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ int main()
subscriber[i] =
iox_sub_init(&(subscriberStorage[i]), "Radar", "FrontLeft", "Counter", queueCapacity, historyRequest);

iox_sub_subscribe(subscriber[i], queueCapacity);
iox_sub_subscribe(subscriber[i]);
}

const uint64_t FIRST_GROUP_ID = 123;
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_examples/waitset_on_c/ice_c_waitset_individual.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ int main()
subscriber[1] =
iox_sub_init(&(subscriberStorage[1]), "Radar", "FrontLeft", "Counter", queueCapacity, historyRequest);

iox_sub_subscribe(subscriber[0], queueCapacity);
iox_sub_subscribe(subscriber[1], queueCapacity);
iox_sub_subscribe(subscriber[0]);
iox_sub_subscribe(subscriber[1]);

iox_sub_attach_to_waitset(subscriber[0], waitSet, SubscriberEvent_HAS_NEW_SAMPLES, 0, NULL);
iox_sub_attach_to_waitset(subscriber[1], waitSet, SubscriberEvent_HAS_NEW_SAMPLES, 0, NULL);
Expand Down
9 changes: 7 additions & 2 deletions iceoryx_posh/include/iceoryx_posh/gateway/channel.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 by Robert Bosch GmbH, Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,8 @@

#include "iceoryx_posh/capro/service_description.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/popo/publisher_options.hpp"
#include "iceoryx_posh/popo/subscriber_options.hpp"
#include "iceoryx_utils/cxx/expected.hpp"
#include "iceoryx_utils/cxx/optional.hpp"
#include "iceoryx_utils/internal/objectpool/objectpool.hpp"
Expand Down Expand Up @@ -67,9 +69,12 @@ class Channel
///
/// @brief create Creates a channel for the given service whose terminals reside in a static object pool.
/// @param service The service to create the channel for.
/// @param options The PublisherOptions or SubscriberOptions with historyCapacity and queueCapacity.
/// @return A copy of the created channel, if successful.
///
static cxx::expected<Channel, ChannelError> create(const capro::ServiceDescription& service) noexcept;
template <typename IceoryxPubSubOptions>
static cxx::expected<Channel, ChannelError> create(const capro::ServiceDescription& service,
const IceoryxPubSubOptions& options) noexcept;

capro::ServiceDescription getServiceDescription() const noexcept;
IceoryxTerminalPtr getIceoryxTerminal() const noexcept;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "iceoryx_posh/gateway/gateway_config.hpp"
#include "iceoryx_posh/iceoryx_posh_config.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/popo/publisher_options.hpp"
#include "iceoryx_posh/popo/subscriber_options.hpp"
#include "iceoryx_utils/cxx/expected.hpp"
#include "iceoryx_utils/cxx/function_ref.hpp"
#include "iceoryx_utils/cxx/optional.hpp"
Expand Down Expand Up @@ -95,6 +97,7 @@ class GatewayGeneric : public gateway_t
/// @brief addChannel Creates a channel for the given service and stores a copy of it in an internal collection for
/// later access.
/// @param service The service to create a channel for.
/// @param options The PublisherOptions or SubscriberOptions with historyCapacity and queueCapacity.
/// @return an expected containing a copy of the added channel, otherwise an error
///
/// @note Wildcard services are not allowed and will be ignored.
Expand All @@ -107,7 +110,11 @@ class GatewayGeneric : public gateway_t
/// The service description is perhaps too large for copying since they contain strings, however this should be
/// addressed with a service description repository feature.
///
cxx::expected<channel_t, GatewayError> addChannel(const capro::ServiceDescription& service) noexcept;
template <typename IceoryxPubSubOptions>
cxx::expected<channel_t, GatewayError> addChannel(const capro::ServiceDescription& service,
const IceoryxPubSubOptions& options) noexcept;
cxx::expected<channel_t, GatewayError> addChannel(const capro::ServiceDescription& service,
const popo::SubscriberOptions& subscriberOptions) noexcept;

///
/// @brief findChannel Searches for a channel for the given service in the internally stored collection and returns
Expand Down
Loading

0 comments on commit 0dd9787

Please sign in to comment.