Skip to content

Commit

Permalink
Implement custom copier for binary IPC (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
Squadrick committed May 8, 2020
1 parent 09fe8fe commit 22dc762
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 43 deletions.
55 changes: 55 additions & 0 deletions include/shadesmar/memory/copier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/* MIT License
Copyright (c) 2020 Dheeraj R Reddy
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
==============================================================================*/

#ifndef INCLUDE_SHADESMAR_MEMORY_COPIER_H_
#define INCLUDE_SHADESMAR_MEMORY_COPIER_H_

#include <cstring>

namespace shm::memory {
class Copier {
public:
virtual void *alloc(size_t) = 0;
virtual void dealloc(void *) = 0;
virtual void shm_to_user(void *, void *, size_t) = 0;
virtual void user_to_shm(void *, void *, size_t) = 0;
};

class DefaultCopier : public Copier {
public:
void *alloc(size_t size) override { return malloc(size); }

void dealloc(void *ptr) override { free(ptr); }

void shm_to_user(void *dst, void *src, size_t size) override {
std::memcpy(dst, src, size);
}

void user_to_shm(void *dst, void *src, size_t size) override {
std::memcpy(dst, src, size);
}
};

} // namespace shm::memory

#endif // INCLUDE_SHADESMAR_MEMORY_COPIER_H_
3 changes: 3 additions & 0 deletions include/shadesmar/memory/dragons.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ SOFTWARE.
#include <thread>
#include <vector>

#include "shadesmar/memory/copier.h"

namespace shm::memory::dragons {

static inline void *_rep_movsb(void *d, const void *s, size_t n) {
// Slower than using regular `memcpy`
asm volatile("rep movsb"
Expand Down
20 changes: 12 additions & 8 deletions include/shadesmar/memory/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,25 @@ uint8_t *create_memory_segment(const std::string &name, size_t size,
return static_cast<uint8_t *>(ptr);
}

struct Ptr {
void *ptr;
size_t size;
bool free;

Ptr() : ptr(nullptr), size(0), free(false) {}

void no_delete() { free = false; }
};

struct Element {
size_t size;
std::atomic<bool> empty{};
managed_shared_memory::handle_t addr_hdl;

Element() {
size = 0;
empty.store(true);
addr_hdl = 0;
}
Element() : size(0), addr_hdl(0) { empty.store(true); }

Element(const Element &elem) {
size = elem.size;
Element(const Element &elem) : size(elem.size), addr_hdl(elem.addr_hdl) {
empty.store(elem.empty.load());
addr_hdl = elem.addr_hdl;
}
};

Expand Down
8 changes: 5 additions & 3 deletions include/shadesmar/pubsub/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SOFTWARE.

#include <msgpack.hpp>

#include "shadesmar/memory/copier.h"
#include "shadesmar/message.h"
#include "shadesmar/pubsub/topic.h"

Expand All @@ -41,7 +42,7 @@ namespace shm::pubsub {
template <uint32_t queue_size>
class PublisherBin {
public:
explicit PublisherBin(std::string topic_name);
explicit PublisherBin(std::string topic_name, memory::Copier *copier);
bool publish(void *data, size_t size);

private:
Expand All @@ -50,8 +51,9 @@ class PublisherBin {
};

template <uint32_t queue_size>
PublisherBin<queue_size>::PublisherBin(std::string topic_name)
: topic_name_(topic_name), topic_(Topic<queue_size>(topic_name)) {}
PublisherBin<queue_size>::PublisherBin(std::string topic_name,
memory::Copier *copier)
: topic_name_(topic_name), topic_(Topic<queue_size>(topic_name, copier)) {}

template <uint32_t queue_size>
bool PublisherBin<queue_size>::publish(void *data, size_t size) {
Expand Down
31 changes: 19 additions & 12 deletions include/shadesmar/pubsub/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ SOFTWARE.
#include <thread>
#include <utility>

#include "shadesmar/memory/copier.h"
#include "shadesmar/message.h"
#include "shadesmar/pubsub/topic.h"

Expand All @@ -48,7 +49,9 @@ class SubscriberBase {
virtual void _subscribe() = 0;

protected:
SubscriberBase(std::string topic_name, bool extra_copy);
SubscriberBase(std::string topic_name, memory::Copier *copier,
bool extra_copy);

std::string topic_name_;
std::unique_ptr<Topic<queue_size>> topic;
std::atomic<uint32_t> counter{0};
Expand All @@ -57,14 +60,13 @@ class SubscriberBase {
template <uint32_t queue_size>
class SubscriberBin : public SubscriberBase<queue_size> {
public:
SubscriberBin(
const std::string &topic_name,
std::function<void(std::unique_ptr<uint8_t[]> &, size_t)> callback)
: SubscriberBase<queue_size>(topic_name, false),
SubscriberBin(const std::string &topic_name, memory::Copier *copier,
std::function<void(memory::Ptr *)> callback)
: SubscriberBase<queue_size>(topic_name, copier, false),
callback_(std::move(callback)) {}

private:
std::function<void(std::unique_ptr<uint8_t[]> &, size_t)> callback_;
std::function<void(memory::Ptr *)> callback_;
void _subscribe();
};

Expand All @@ -77,7 +79,7 @@ class Subscriber : public SubscriberBase<queue_size> {
Subscriber(const std::string &topic_name,
std::function<void(const std::shared_ptr<msgT> &)> callback,
bool extra_copy = false)
: SubscriberBase<queue_size>(topic_name, extra_copy),
: SubscriberBase<queue_size>(topic_name, nullptr, extra_copy),
callback_(std::move(callback)) {}

private:
Expand All @@ -87,10 +89,11 @@ class Subscriber : public SubscriberBase<queue_size> {

template <uint32_t queue_size>
SubscriberBase<queue_size>::SubscriberBase(std::string topic_name,
memory::Copier *copier,
bool extra_copy)
: topic_name_(std::move(topic_name)) {
#if __cplusplus >= 201703L
topic = std::make_unique<Topic<queue_size>>(topic_name_, extra_copy);
topic = std::make_unique<Topic<queue_size>>(topic_name_, copier, extra_copy);
#else
topic = std::unique_ptr<Topic<queue_size>>(new Topic<queue_size>(
std::forward<std::string>(topic_name_, extra_copy)));
Expand Down Expand Up @@ -138,12 +141,16 @@ void SubscriberBase<queue_size>::spin() {

template <uint32_t queue_size>
void SubscriberBin<queue_size>::_subscribe() {
std::unique_ptr<uint8_t[]> msg;
size_t size = 0;
memory::Ptr ptr;
ptr.free = true;

if (!this->topic->read(&ptr, this->counter)) return;

if (!this->topic->read(msg, &size, this->counter)) return;
callback_(&ptr);

callback_(msg, size);
if (ptr.free) {
this->topic->copier()->dealloc(ptr.ptr);
}
}

template <typename msgT, uint32_t queue_size>
Expand Down
50 changes: 36 additions & 14 deletions include/shadesmar/pubsub/topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ SOFTWARE.

#include "shadesmar/concurrency/scope.h"
#include "shadesmar/macros.h"
#include "shadesmar/memory/copier.h"
#include "shadesmar/memory/memory.h"

namespace shm::pubsub {
Expand All @@ -61,8 +62,13 @@ class Topic : public memory::Memory<_TopicElem<LockT>, queue_size> {
"queue_size must be power of two");

public:
Topic(const std::string &topic, memory::Copier *copier, bool copy = false)
: memory::Memory<TopicElem, queue_size>(topic),
copy_(copy),
copier_(copier) {}

explicit Topic(const std::string &topic, bool copy = false)
: memory::Memory<TopicElem, queue_size>(topic), copy_(copy) {}
: Topic(topic, nullptr, copy) {}

bool write(void *data, size_t size) {
/*
Expand Down Expand Up @@ -102,14 +108,13 @@ class Topic : public memory::Memory<_TopicElem<LockT>, queue_size> {
return _read_without_copy(oh, elem);
}

bool read(std::unique_ptr<uint8_t[]> &msg, size_t *size, // NOLINT
uint32_t pos) {
bool read(memory::Ptr *ptr, uint32_t pos) {
/*
* Read into a raw array. We pass `size` as a reference to store
* the size of the message.
*/
TopicElem *elem = &(this->shared_queue_->elements[pos & (queue_size - 1)]);
return _read_bin(msg, size, elem);
return _read_bin(ptr, elem);
}

inline __attribute__((always_inline)) uint32_t fetch_add_counter() {
Expand All @@ -124,6 +129,10 @@ class Topic : public memory::Memory<_TopicElem<LockT>, queue_size> {
this->shared_queue_->counter.fetch_add(1);
}

inline __attribute__((always_inline)) memory::Copier *copier() const {
return copier_;
}

private:
bool _write_rcu(void *data, size_t size, TopicElem *elem) {
/*
Expand All @@ -139,7 +148,12 @@ class Topic : public memory::Memory<_TopicElem<LockT>, queue_size> {
if (new_addr == nullptr) {
return false;
}
std::memcpy(new_addr, data, size);

if (copier_ == nullptr) {
std::memcpy(new_addr, data, size);
} else {
copier_->user_to_shm(new_addr, data, size);
}

void *old_addr;
bool prev_empty;
Expand Down Expand Up @@ -195,18 +209,19 @@ class Topic : public memory::Memory<_TopicElem<LockT>, queue_size> {
* So we bite the cost of the extra function call before checking
* for emptiness.
*/
auto src = std::unique_ptr<uint8_t[]>(new uint8_t[elem->size]);
size_t size;

if (_read_bin(src, &size, elem)) {
*oh = msgpack::unpack(reinterpret_cast<const char *>(src.get()), size);
memory::Ptr ptr;

if (_read_bin(&ptr, elem)) {
*oh = msgpack::unpack(reinterpret_cast<const char *>(ptr.ptr), ptr.size);
free(ptr.ptr);
return true;
}

return false;
}

bool _read_bin(std::unique_ptr<uint8_t[]> &msg, size_t *size, // NOLINT
TopicElem *elem) {
bool _read_bin(memory::Ptr *ptr, TopicElem *elem) {
/*
* Code path:
* 1. Acquire sharable lock
Expand All @@ -218,14 +233,21 @@ class Topic : public memory::Memory<_TopicElem<LockT>, queue_size> {

if (elem->empty) return false;

*size = elem->size;
msg = std::unique_ptr<uint8_t[]>(new uint8_t[*size]);
auto *dst = this->raw_buf_->get_address_from_handle(elem->addr_hdl);
std::memcpy(msg.get(), dst, *size);

ptr->size = elem->size;
if (copier_ != nullptr) {
ptr->ptr = copier_->alloc(ptr->size);
copier_->shm_to_user(ptr->ptr, dst, ptr->size);
} else {
ptr->ptr = malloc(ptr->size);
std::memcpy(ptr->ptr, dst, ptr->size);
}

return true;
}

memory::Copier *copier_;
bool copy_{};
};
} // namespace shm::pubsub
Expand Down
16 changes: 10 additions & 6 deletions test/pubsub_bin_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
==============================================================================*/

#include <shadesmar/pubsub/publisher.h>
#include <shadesmar/pubsub/subscriber.h>
#include <chrono>
#include <iostream>
#include <numeric>

#include "shadesmar/memory/copier.h"
#include "shadesmar/pubsub/publisher.h"
#include "shadesmar/pubsub/subscriber.h"

const char topic[] = "raw_benchmark_topic";
const int QUEUE_SIZE = 16;
const int SECONDS = 10;
Expand Down Expand Up @@ -60,8 +62,8 @@ double get_stddev(const std::vector<T> &v) {
return stddev;
}

void callback(const std::unique_ptr<uint8_t[]> &data, uint32_t size) {
auto *msg = reinterpret_cast<Message *>(data.get());
void callback(shm::memory::Ptr *shm_ptr) {
auto *msg = reinterpret_cast<Message *>(shm_ptr->ptr);
++count;
++total_count;
lag += std::chrono::duration_cast<TIMESCALE>(
Expand All @@ -75,7 +77,8 @@ int main() {
std::vector<int> counts;
std::vector<double> lags;
std::this_thread::sleep_for(std::chrono::seconds(1));
shm::pubsub::SubscriberBin<QUEUE_SIZE> sub(topic, callback);
shm::memory::DefaultCopier cpy;
shm::pubsub::SubscriberBin<QUEUE_SIZE> sub(topic, &cpy, callback);
auto start = std::chrono::system_clock::now();
int seconds = 0;
while (true) {
Expand Down Expand Up @@ -114,7 +117,8 @@ int main() {
std::cout << "Lag: " << mean_lag << " ± " << stdd_lag << TIMESCALE_NAME
<< std::endl;
} else {
shm::pubsub::PublisherBin<QUEUE_SIZE> pub(topic);
shm::memory::DefaultCopier cpy;
shm::pubsub::PublisherBin<QUEUE_SIZE> pub(topic, &cpy);

Message *msg = reinterpret_cast<Message *>(malloc(VECTOR_SIZE));

Expand Down

0 comments on commit 22dc762

Please sign in to comment.