Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[15118] Adding DataReader::get_unread_count (bool mark_as_read) (backport #2825) #2843

Merged
merged 1 commit into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/fastdds/dds/subscriber/DataReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,16 @@ class DataReader : public DomainEntity
*/
RTPS_DllAPI uint64_t get_unread_count() const;

/**
* Get the number of samples pending to be read.
*
* @param mark_as_read Whether the unread samples should be marked as read or not.
*
* @return the number of samples on the reader history that have never been read.
*/
RTPS_DllAPI uint64_t get_unread_count(
bool mark_as_read) const;

/**
* Get associated GUID.
*
Expand Down
3 changes: 3 additions & 0 deletions include/fastdds/rtps/reader/RTPSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ class RTPSReader

RTPS_DllAPI uint64_t get_unread_count() const;

RTPS_DllAPI uint64_t get_unread_count(
bool mark_as_read);

/**
* @return True if the reader expects Inline QOS.
*/
Expand Down
8 changes: 7 additions & 1 deletion src/cpp/fastdds/subscriber/DataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,13 @@ ReturnCode_t DataReader::get_first_untaken_info(

uint64_t DataReader::get_unread_count() const
{
return impl_->get_unread_count();
return impl_->get_unread_count(false);
}

uint64_t DataReader::get_unread_count(
bool mark_as_read) const
{
return impl_->get_unread_count(mark_as_read);
}

const GUID_t& DataReader::guid()
Expand Down
5 changes: 3 additions & 2 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,10 @@ ReturnCode_t DataReaderImpl::get_first_untaken_info(
return ReturnCode_t::RETCODE_NO_DATA;
}

uint64_t DataReaderImpl::get_unread_count() const
uint64_t DataReaderImpl::get_unread_count(
bool mark_as_read)
{
return reader_ ? reader_->get_unread_count() : 0;
return reader_ ? history_.get_unread_count(mark_as_read) : 0;
}

const GUID_t& DataReaderImpl::guid() const
Expand Down
9 changes: 7 additions & 2 deletions src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,14 @@ class DataReaderImpl
SampleInfo* info);

/**
* @return the number of samples pending to be read.
* Get the number of samples pending to be read.
*
* @param mark_as_read Whether the unread samples should be marked as read or not.
*
* @return the number of samples on the reader history that have never been read.
*/
uint64_t get_unread_count() const;
uint64_t get_unread_count(
bool mark_as_read);

/**
* Get associated GUID
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,12 @@ bool DataReaderHistory::get_next_deadline(
return true;
}

uint64_t DataReaderHistory::get_unread_count(
bool mark_as_read)
{
return mp_reader->get_unread_count(mark_as_read);
}

std::pair<bool, DataReaderHistory::instance_info> DataReaderHistory::lookup_instance(
const InstanceHandle_t& handle,
bool exact) const
Expand Down
10 changes: 10 additions & 0 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory
InstanceHandle_t& handle,
std::chrono::steady_clock::time_point& next_deadline_us);

/**
* Get the number of samples pending to be read.
*
* @param mark_as_read Whether the unread samples should be marked as read or not.
*
* @return the number of samples on the reader history that have never been read.
*/
uint64_t get_unread_count(
bool mark_as_read);

/**
* @brief Get the list of changes corresponding to an instance handle.
* @param handle The handle to the instance.
Expand Down
22 changes: 22 additions & 0 deletions src/cpp/rtps/reader/RTPSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,28 @@ uint64_t RTPSReader::get_unread_count() const
return total_unread_;
}

uint64_t RTPSReader::get_unread_count(
bool mark_as_read)
{
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
uint64_t ret_val = total_unread_;

if (mark_as_read)
{
for (auto it = mp_history->changesBegin(); 0 < total_unread_ && it != mp_history->changesEnd(); ++it)
{
CacheChange_t* change = *it;
if (!change->isRead)
{
change->isRead = true;
--total_unread_;
}
}
assert(0 == total_unread_);
}
return ret_val;
}

bool RTPSReader::is_datasharing_compatible_with(
const WriterProxyData& wdata)
{
Expand Down
2 changes: 2 additions & 0 deletions test/mock/rtps/RTPSReader/fastdds/rtps/reader/RTPSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class RTPSReader : public Endpoint

MOCK_METHOD0(get_unread_count, uint64_t());

MOCK_METHOD1(get_unread_count, uint64_t(bool));

MOCK_METHOD1(set_content_filter, void (eprosima::fastdds::rtps::IReaderDataFilter* filter));

// *INDENT-ON*
Expand Down
74 changes: 74 additions & 0 deletions test/unittest/dds/subscriber/DataReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,80 @@ TEST_F(DataReaderTests, read_unread)
}
}

/*
* This test checks the behaviour of the two overloads of get_unread_count.
*/
TEST_F(DataReaderTests, get_unread_count)
{
static const Duration_t time_to_wait(0, 100 * 1000 * 1000);
static constexpr int32_t num_samples = 10;
static constexpr uint64_t num_samples_check = static_cast<uint64_t>(num_samples);

const ReturnCode_t& ok_code = ReturnCode_t::RETCODE_OK;

DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT;
writer_qos.history().kind = KEEP_LAST_HISTORY_QOS;
writer_qos.history().depth = num_samples;
writer_qos.publish_mode().kind = SYNCHRONOUS_PUBLISH_MODE;
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;

DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT;
reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
reader_qos.history().kind = KEEP_ALL_HISTORY_QOS;
reader_qos.resource_limits().max_instances = 1;
reader_qos.resource_limits().max_samples_per_instance = num_samples;
reader_qos.resource_limits().max_samples = 3 * num_samples;

create_instance_handles();
create_entities(nullptr, reader_qos, SUBSCRIBER_QOS_DEFAULT, writer_qos);

FooType data;
data.index(0);
data.message()[1] = '\0';

// Send a bunch of samples
for (char i = 0; i < num_samples; ++i)
{
data.message()[0] = i + '0';
EXPECT_EQ(ok_code, data_writer_->write(&data, handle_ok_));
}

// Reader should have 10 unread samples

// There are unread samples, so wait_for_unread should be ok
EXPECT_TRUE(data_reader_->wait_for_unread_message(time_to_wait));

// Calling get_unread_count() several times should always return the same value
for (char i = 0; i < num_samples; ++i)
{
EXPECT_EQ(num_samples_check, data_reader_->get_unread_count());
}

SampleInfo sample_info;
ASSERT_EQ(ReturnCode_t::RETCODE_OK, data_reader_->get_first_untaken_info(&sample_info));
ASSERT_EQ(SampleStateKind::NOT_READ_SAMPLE_STATE, sample_info.sample_state);

// Calling get_unread_count(false) several times should always return the same value
for (char i = 0; i < num_samples; ++i)
{
EXPECT_EQ(num_samples_check, data_reader_->get_unread_count(false));
}

ASSERT_EQ(ReturnCode_t::RETCODE_OK, data_reader_->get_first_untaken_info(&sample_info));
ASSERT_EQ(SampleStateKind::NOT_READ_SAMPLE_STATE, sample_info.sample_state);

// Calling get_unread_count(true) once will return the correct value
EXPECT_EQ(num_samples_check, data_reader_->get_unread_count(true));

ASSERT_EQ(ReturnCode_t::RETCODE_OK, data_reader_->get_first_untaken_info(&sample_info));
ASSERT_EQ(SampleStateKind::READ_SAMPLE_STATE, sample_info.sample_state);

// All variants should then return 0
EXPECT_EQ(0, data_reader_->get_unread_count(true));
EXPECT_EQ(0, data_reader_->get_unread_count(false));
EXPECT_EQ(0, data_reader_->get_unread_count());
}

template<typename DataType>
void lookup_instance_test(
DataType& data,
Expand Down