Skip to content

Commit

Permalink
Merge pull request #125 from MatthiasKillat/iox-#80-lock-free-mpmc-qu…
Browse files Browse the repository at this point in the history
…eue-release-version1

Iox #80 lock free mpmc queue release version1
  • Loading branch information
dkroenke authored Jun 15, 2020
2 parents 561852f + caa79fd commit 9325abb
Show file tree
Hide file tree
Showing 15 changed files with 3,024 additions and 0 deletions.
142 changes: 142 additions & 0 deletions iceoryx_utils/doc/lockfree_queue/lockfree_queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
Remark: This is a preliminary description and will be reworked when the capacity feature is implemented.

# Lock Free queue

We explain some details of the lock free queue in order to provide an intuition for the way the lock free queue works. This could serve as a basis for a formal proof later if desired.

## Index queue analysis

In the following all numbers are natural numbers and hence bounded unsigned integers in the implementation.

### Capacity and Cycle Length
The index queue has some capacity n and stores indices in the range [0, n[.
The number n is also called cycle length.

Assume n = 4 in this explanation, so we can store the indices 0, 1, 2, 3.


### Index representation
Each index i corresponds to an equivalence class [i] modulo n, i.e.
[i] = {j | j = c*n + i, c >= 0}

This means j = 3, 7, 11, ... all represent the same index 3 (pointing to the last element in the array)

Given j, we have i = j % n and c = j / n (where % is modulo and / integer division).

Another way to represent such a value j uniquely is as a pair (c,i) and
we call this a cyclic index with cycle c and index i.

This is done for two reasons: detection of empty queues and to eliminate the ABA problem for all practical scenarios.
It is imperative that such indices can be used in compare and swap operations (CAS), i.e.
will not exceed 64 bits on standard architectures supporting 64 bit CAS.
Therefore j can be assumed to be an unsigned 64 bit integer.


### Queue representation
The queue has a head H, tail T and an array of n values. All of them are cyclic indices as in 2.
Initially the queue is empty and head and tail point to index 0 but have both cycle 1.
Array elements are 0 and represented as (0,0).

Initially empty queue
```
[ (0,0), (0,0), (0,0), (0,0) ] H=(1,0) T=(1,0)
```
Initially full queue with all indices
```
[ (0,0), (0,1), (0,2), (0,3) ] H=(0,0) T=(1,0)
```


### Head and Tail monotonicity
We always insert (push) at Tail and remove (pop) at Head.
Head and Tail both increase strictly monotonic (making ABA problems unlikely).
I.e. a push causes tail to increase by one (causing the cycle to increase after n pushes) and similarly each pop causes Head to increase by one.


### Push operation
push(y) causes the cycle at the position Tail points to to be replaced with Tails cycle while the element there is replaced with x
via a CAS operation.
Afterwards Tail is increased by 1 (potentially not immediately, but before the next push takes effect).
```
[ (c,?), (c,x), (c-1,?), (c-1,?) ] H=(c,1) T=(c,2)
```
push(y)
```
[ (c,?), (c,x), (c,y), (c-1,?) ] H=(c,1) T=(c,3)
```

We only push if the cycle at the element Tail points to is exactly one behind Tails cycle.

Constraint: we can never push more than n elements (and our use case does not require this).


### Pop operation
Pop reads the value at the index of head and returns it if the cycle matches Heads cycle and a CAS to increase head by 1 succeeds.

```
[ (c,?), (c,y), (c,x), (c-1,?) ] H=(c,1) T=(c,3)
```
pop returns y
```
[ (c,?), (c,x), (c,y), (c-1,?) ] H=(c,2) T=(c,3)
```

If the cycle is behind Heads cycle the queue is empty (at the time of this check) and nothing is returned.


### Head Tail relationship
Head is always at most Tail, i.e. H <= T. (Technically we only have H - 1 <= T in general, cf. push implementation)

The general situation is one of the following, with c indicating the cycle of the values and Head and Tail.

(i) Head and Tail are at cycle c and ***** marks the region in the array with values logically in the queue.
```
[ c |*****c*****| c-1 ]
H = (c,i) T= (c,j)
```

(ii) Head is one cycle behind Tail.
```
[******c*****| c-1 |******c-1*****]
T = (c,i) H= (c-1,j)
```

(iii) Empty queue
```
[ c | c-1 ]
T = H = (c,i)
```
### ABA prevention

Note that the monotonicity of Head and Tail combined with suitable CAS make ABA problems not a practical concern.
Moreover, even if we require many bits for the index and thus have few bits for the cycle, we still require a complete uint64
wraparound for an ABA problem to occur (in addition to re-inserting the same value).

This is simply because if the maximum cycle is small then the cycle length n must be large and
we increase the cycle every n pushes (or pops), but the numbers representing the index increase by one every operation.


### Lock free analysis
Claim:
The queue is operation-wise lock free: pushes cannot block pops arbitrarily and vice versa.
Furthermore, between concurrent pushes and pops one of each type always succeeds in a finite amount of time.

Proof sketch:
- only pushes read/write Tail but pops do not
- pushes modify array values which pop reads and compares (CAS)
but we can have at most n pushes before this stops (push constraint in 5.) and then pop will succeed without interference from push
- only pops read/write/CAS Head
- claim follows now from loop/read/write/CAS structure in implementation

Therefore pushes and pops do not interfere to block each other.

However, pushes can block other pushes and pops other pops.
In particular operation-wise lock freedom implies the queue is lock free.

Note that there is no fairness guarantee. In principle the same push thread or pop thread might always succeed but in practice this is unlikely.
The queue is therefore not wait free.

This could technically be mitigated somewhat with a more complex logic keeping track of operation failures
but seems not to be worth it in practice. (incomplete analysis, further studies required)


130 changes: 130 additions & 0 deletions iceoryx_utils/include/iceoryx_utils/concurrent/lockfree_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef IOX_UTILS_CONCURRENT_LOCKFREE_QUEUE_HPP
#define IOX_UTILS_CONCURRENT_LOCKFREE_QUEUE_HPP

#include "iceoryx_utils/cxx/optional.hpp"
#include "iceoryx_utils/internal/concurrent/lockfree_queue/buffer.hpp"
#include "iceoryx_utils/internal/concurrent/lockfree_queue/index_queue.hpp"

#include <atomic>


namespace iox
{
namespace concurrent
{
// remark: configuration of capacity at runtime is an upcoming feature

/// @brief implements a lock free queue (i.e. container with FIFO order) of elements of type T
/// with Capacity
template <typename ElementType, uint64_t Capacity>
class LockFreeQueue
{
public:
/// @brief creates and initalizes an empty LockFreeQueue
LockFreeQueue() noexcept;

~LockFreeQueue() = default;

// remark: a thread-safe and lockfree implementation of copy seems impossible
// but unsafe copying (i.e. where synchronization is up to the user) would be possible
// can be implemented when it is needed
LockFreeQueue(const LockFreeQueue&) = delete;
LockFreeQueue(LockFreeQueue&&) = delete;
LockFreeQueue& operator=(const LockFreeQueue&) = delete;
LockFreeQueue& operator=(LockFreeQueue&&) = delete;

/// @brief returns the capacity of the queue
/// threadsafe, lockfree
constexpr uint64_t capacity() const noexcept;

/// @brief tries to insert value in FIFO order, moves the value internally
/// @param value to be inserted
/// @return true if insertion was successful (i.e. queue was not full during push), false otherwise
/// threadsafe, lockfree
bool tryPush(ElementType&& value) noexcept;

/// @brief tries to insert value in FIFO order, copies the value internally
/// @param value to be inserted
/// @return true if insertion was successful (i.e. queue was not full during push), false otherwise
/// threadsafe, lockfree
bool tryPush(const ElementType& value) noexcept;

/// @brief inserts value in FIFO order, always succeeds by removing the oldest value
/// when the queue is detected to be full (overflow)
/// @param value to be inserted is copied into the queue
/// @return removed value if an overflow occured, empty optional otherwise
/// threadsafe, lockfree
iox::cxx::optional<ElementType> push(const ElementType& value) noexcept;

/// @brief inserts value in FIFO order, always succeeds by removing the oldest value
/// when the queue is detected to be full (overflow)
/// @param value to be inserted is moved into the queue if possible
/// @return removed value if an overflow occured, empty optional otherwise
/// threadsafe, lockfree
iox::cxx::optional<ElementType> push(ElementType&& value) noexcept;

/// @brief tries to remove value in FIFO order
/// @return value if removal was successful, empty optional otherwise
/// threadsafe, lockfree
iox::cxx::optional<ElementType> pop() noexcept;

/// @brief check whether the queue is empty
/// @return true iff the queue is empty
/// note that if the queue is used concurrently it might
/// not be empty anymore after the call
/// (but it was at some point during the call)
bool empty() const noexcept;

/// @brief get the number of stored elements in the queue
/// @return number of stored elements in the queue
/// note that this will not be perfectly in sync with the actual number of contained elements
/// during concurrent operation but will always be at most capacity
uint64_t size() const noexcept;

private:
using Queue = IndexQueue<Capacity>;
using UniqueIndex = typename Queue::UniqueIndex;
using BufferIndex = typename Queue::value_t;

// remark: actually m_freeIndices do not have to be in a queue, it could be another
// multi-push multi-pop capable lockfree container (e.g. a stack or a list)
Queue m_freeIndices;

// required to be a queue for LockFreeQueue to exhibit FIFO behaviour
Queue m_usedIndices;

Buffer<ElementType, Capacity, BufferIndex> m_buffer;

std::atomic<uint64_t> m_size{0u};

// template is needed to distinguish between lvalue and rvalue T references
// (universal reference type deduction)
template <typename T>
void writeBufferAt(const UniqueIndex&, T&&);

// needed to avoid code duplication (via universal reference type deduction)
template <typename T>
iox::cxx::optional<ElementType> pushImpl(T&& value) noexcept;

cxx::optional<ElementType> readBufferAt(const UniqueIndex&);
};
} // namespace concurrent
} // namespace iox

#include "iceoryx_utils/internal/concurrent/lockfree_queue/lockfree_queue.inl"

#endif // IOX_UTILS_CONCURRENT_LOCKFREE_QUEUE_HPP
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef IOX_UTILS_CONCURRENT_LOCKFREE_QUEUE_BUFFER_HPP
#define IOX_UTILS_CONCURRENT_LOCKFREE_QUEUE_BUFFER_HPP

#include <stdint.h>

namespace iox
{
namespace concurrent
{
// remark: we can add more functionality (policies for cache line size, redzoning)

template <typename ElementType, uint64_t Capacity, typename index_t = uint64_t>
class Buffer
{
public:
Buffer() = default;
~Buffer() = default;

Buffer(const Buffer&) = delete;
Buffer(Buffer&&) = delete;
Buffer& operator=(const Buffer&) = delete;
Buffer& operator=(Buffer&&) = delete;

ElementType& operator[](const index_t index) noexcept;

const ElementType& operator[](const index_t index) const noexcept;

ElementType* ptr(const index_t index) noexcept;

const ElementType* ptr(const index_t index) const noexcept;

uint64_t capacity() const noexcept;

private:
using byte_t = uint8_t;

alignas(alignof(ElementType)) byte_t m_buffer[Capacity * sizeof(ElementType)];

ElementType* toPtr(index_t index) const noexcept;
};

} // namespace concurrent
} // namespace iox

#include "buffer.inl"

#endif // IOX_UTILS_CONCURRENT_LOCKFREE_QUEUE_BUFFER_HPP
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace iox
{
namespace concurrent
{
template <typename ElementType, uint64_t Capacity, typename index_t>
ElementType& Buffer<ElementType, Capacity, index_t>::operator[](const index_t index) noexcept
{
return *toPtr(index);
}

template <typename ElementType, uint64_t Capacity, typename index_t>
const ElementType& Buffer<ElementType, Capacity, index_t>::operator[](const index_t index) const noexcept
{
return *toPtr(index);
}

template <typename ElementType, uint64_t Capacity, typename index_t>
ElementType* Buffer<ElementType, Capacity, index_t>::ptr(const index_t index) noexcept
{
return toPtr(index);
}

template <typename ElementType, uint64_t Capacity, typename index_t>
const ElementType* Buffer<ElementType, Capacity, index_t>::ptr(const index_t index) const noexcept
{
return toPtr(index);
}

template <typename ElementType, uint64_t Capacity, typename index_t>
uint64_t Buffer<ElementType, Capacity, index_t>::capacity() const noexcept
{
return Capacity;
}

template <typename ElementType, uint64_t Capacity, typename index_t>
ElementType* Buffer<ElementType, Capacity, index_t>::toPtr(index_t index) const noexcept
{
auto ptr = &(m_buffer[index * sizeof(ElementType)]);
return reinterpret_cast<ElementType*>(const_cast<byte_t*>(ptr));
}

} // namespace concurrent
} // namespace iox
Loading

0 comments on commit 9325abb

Please sign in to comment.