Skip to content

Commit

Permalink
iox-eclipse-iceoryx#80 refactored lockfree queue methods
Browse files Browse the repository at this point in the history
Signed-off-by: Killat Matthias (CC-AD/ESW1) <matthias.killat2@de.bosch.com>
  • Loading branch information
Killat Matthias (CC-AD/ESW1) authored and Killat Matthias (CC-AD/ESW1) committed May 28, 2020
1 parent 4978601 commit 76ced19
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,16 @@ constexpr uint64_t LockFreeQueue<ElementType, Capacity>::capacity() noexcept
template <typename ElementType, uint64_t Capacity>
bool LockFreeQueue<ElementType, Capacity>::try_push(const ElementType value) noexcept
{
// note that const precludes us from changing value but std::move works (it removes the constness)

UniqueIndex index = m_freeIndices.pop();

if (!index.is_valid())
if (!index.isValid())
{
return false; // detected full queue
}

auto ptr = m_buffer.ptr(index);
new (ptr) ElementType(std::move(value));

// ensures that whenever an index is pushed into m_usedIndices, the corresponding value in m_buffer[index]
// was written before
releaseBufferChanges();
writeBufferAt(index, value);

m_usedIndices.push(index);

Expand All @@ -56,56 +53,49 @@ bool LockFreeQueue<ElementType, Capacity>::try_push(const ElementType value) noe
template <typename ElementType, uint64_t Capacity>
iox::cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::push(const ElementType value) noexcept
{
cxx::optional<ElementType> result;
// note that const precludes us from changing value but std::move works (it removes the constness)

cxx::optional<ElementType> evictedValue;

UniqueIndex index = m_freeIndices.pop();
while (!index.is_valid())

while (!index.isValid())
{
// only pop the index if the queue is still full
index = m_usedIndices.popIfFull();
if (index.is_valid())
if (index.isValid())
{
// todo: private method and sync here?
auto ptr = m_buffer.ptr(index);
result = std::move(*ptr);
ptr->~ElementType();
evictedValue = readBufferAt(index);
break;
}
// if the queue was not full we try again
// if m_usedIndices was not full we try again (m_freeIndices should contain an index in this case)
// note that it is theoretically possible to be unsuccessful indefinitely
// (and thus we would have an infinite loop)
// but this requires a timing of concurrent pushes and pops which is exceptionally unlikely in practice

index = m_freeIndices.pop();
}

// if we removed from a full queue via popIfFull it might not be full anymore when a concurrent pop occurs

auto ptr = m_buffer.ptr(index);
new (ptr) ElementType(std::move(value));

// ensures that whenever an index is pushed into m_usedIndices, the corresponding value in m_buffer[index]
// was written before
releaseBufferChanges();
writeBufferAt(index, value);

m_usedIndices.push(index);

return result;
return evictedValue;
}

template <typename ElementType, uint64_t Capacity>
iox::cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::pop() noexcept
{
UniqueIndex index = m_usedIndices.pop();

if (!index.is_valid())
if (!index.isValid())
{
return cxx::nullopt_t(); // detected empty queue
}

// combined with releaseChanges, this ensures that whenever an index is popped from m_usedIndices,
// the corresponding value in m_buffer[index] was written to before
acquireBufferChanges();

auto ptr = m_buffer.ptr(index);
cxx::optional<ElementType> result(std::move(*ptr));
ptr->~ElementType();
auto result = readBufferAt(index);

m_freeIndices.push(index);

Expand All @@ -125,14 +115,24 @@ uint64_t LockFreeQueue<ElementType, Capacity>::size()
}

template <typename ElementType, uint64_t Capacity>
void LockFreeQueue<ElementType, Capacity>::acquireBufferChanges()
cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::readBufferAt(const UniqueIndex& index)
{
// also used for buffer synchronization
m_size.fetch_sub(1u, std::memory_order_acquire);

auto& element = m_buffer[index];
cxx::optional<ElementType> result(std::move(element));
element.~ElementType();
return result;
}

template <typename ElementType, uint64_t Capacity>
void LockFreeQueue<ElementType, Capacity>::releaseBufferChanges()
void LockFreeQueue<ElementType, Capacity>::writeBufferAt(const UniqueIndex& index, const ElementType& value)
{
auto elementPtr = m_buffer.ptr(index);
new (elementPtr) ElementType(std::move(value));

// also used for buffer synchronization
m_size.fetch_add(1u, std::memory_order_release);
}
} // namespace iox
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class unique
return m_value;
}

bool is_valid() const
bool isValid() const
{
return m_valid;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class LockFreeQueue
// this has the advantage of limiting unneccessary synchronization (e.g. due to CAS failure)
// and keeps the responsibility inside the LockFreeQueue itself (which contains the buffer)

void acquireBufferChanges();
void releaseBufferChanges();
void writeBufferAt(const UniqueIndex&, const ElementType&);
cxx::optional<ElementType> readBufferAt(const UniqueIndex&);
};
} // namespace iox

Expand Down
18 changes: 9 additions & 9 deletions iceoryx_utils/test/moduletests/test_index_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ TYPED_TEST(IndexQueueTest, IndicesAreIncreasingWhenConstructedFull)

index_t expected{0};
auto index = q.pop();
while (index.is_valid())
while (index.isValid())
{
EXPECT_EQ(index, expected++);
index = q.pop();
Expand All @@ -135,7 +135,7 @@ TYPED_TEST(IndexQueueTest, queueIsEmptyWhenPopFails)
EXPECT_FALSE(q.empty());

auto index = q.pop();
while (index.is_valid())
while (index.isValid())
{
index = q.pop();
}
Expand All @@ -151,7 +151,7 @@ TYPED_TEST(IndexQueueTest, pushAndPopSingleElement)
q.push(index);
auto popped = q.pop();

EXPECT_TRUE(popped.is_valid());
EXPECT_TRUE(popped.isValid());
EXPECT_EQ(popped, index);
}

Expand All @@ -174,22 +174,22 @@ TYPED_TEST(IndexQueueTest, poppedElementsAreInFifoOrder)
for (uint64_t i = 0; i < capacity; ++i)
{
auto popped = q.pop();
ASSERT_TRUE(popped.is_valid());
ASSERT_TRUE(popped.isValid());
EXPECT_EQ(popped, expected++);
}
}

TYPED_TEST(IndexQueueTest, popReturnsNothingWhenQueueIsEmpty)
{
auto& q = this->queue;
EXPECT_FALSE(q.pop().is_valid());
EXPECT_FALSE(q.pop().isValid());
}


TYPED_TEST(IndexQueueTest, popIfFullReturnsNothingWhenQueueIsEmpty)
{
auto& q = this->queue;
EXPECT_FALSE(q.popIfFull().is_valid());
EXPECT_FALSE(q.popIfFull().isValid());
}


Expand All @@ -199,7 +199,7 @@ TYPED_TEST(IndexQueueTest, popIfFullReturnsOldestElementWhenQueueIsFull)
Queue& q = this->fullQueue;

auto index = q.popIfFull();
EXPECT_TRUE(index.is_valid());
EXPECT_TRUE(index.isValid());
EXPECT_EQ(index, 0);
}

Expand All @@ -209,8 +209,8 @@ TYPED_TEST(IndexQueueTest, popIfFullReturnsNothingWhenQueueIsNotFull)
Queue& q = this->fullQueue;

auto index = q.pop();
EXPECT_TRUE(index.is_valid());
EXPECT_FALSE(q.popIfFull().is_valid());
EXPECT_TRUE(index.isValid());
EXPECT_FALSE(q.popIfFull().isValid());
}

}

0 comments on commit 76ced19

Please sign in to comment.