Skip to content

Commit

Permalink
Message::SetUsedSize: add optional alignment argument, to avoid stori…
Browse files Browse the repository at this point in the history
…ng alignment with the msg object
  • Loading branch information
rbx committed Oct 19, 2023
1 parent 1b7532a commit 3c714fd
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
2 changes: 1 addition & 1 deletion fairmq/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct Message
virtual void* GetData() const = 0;
virtual size_t GetSize() const = 0;

virtual bool SetUsedSize(size_t size) = 0;
virtual bool SetUsedSize(size_t size, Alignment alignment = Alignment{0}) = 0;

virtual Transport GetType() const = 0;
TransportFactory* GetTransport() { return fTransport; }
Expand Down
10 changes: 7 additions & 3 deletions fairmq/shmem/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class Message final : public fair::mq::Message

size_t GetSize() const override { return fSize; }

bool SetUsedSize(size_t newSize) override
bool SetUsedSize(size_t newSize, Alignment alignment = Alignment{0}) override
{
if (newSize == fSize) {
return true;
Expand All @@ -266,8 +266,8 @@ class Message final : public fair::mq::Message
return true;
} else if (newSize <= fSize) {
try {
char* oldPtr = fManager.GetAddressFromHandle(fHandle, fSegmentId);
try {
char* oldPtr = fManager.GetAddressFromHandle(fHandle, fSegmentId);
uint16_t userOffset = ShmHeader::UserOffset(oldPtr);
char* ptr = fManager.ShrinkInPlace(userOffset + newSize, oldPtr, fSegmentId);
fLocalPtr = ShmHeader::UserPtr(ptr);
Expand All @@ -278,7 +278,11 @@ class Message final : public fair::mq::Message
// unused size >= 1000000 bytes: reallocate fully
// unused size < 1000000 bytes: simply reset the size and keep the rest of the buffer until message destruction
if (fSize - newSize >= 1000000) {
char* ptr = fManager.Allocate(newSize, fAlignment);
if (alignment.alignment == 0) {
// if no alignment is provided, take the minimum alignment of the old pointer, but no more than 4096
alignment.alignment = 1 << std::min(__builtin_ctz(reinterpret_cast<size_t>(oldPtr)), 12);
}
char* ptr = fManager.Allocate(newSize, alignment.alignment);
char* userPtr = ShmHeader::UserPtr(ptr);
std::memcpy(userPtr, fLocalPtr, newSize);
fManager.Deallocate(fHandle, fSegmentId);
Expand Down
2 changes: 1 addition & 1 deletion fairmq/zeromq/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class Message final : public fair::mq::Message
// destroyed. Used size is applied only once in ApplyUsedSize, which is called by the socket
// before sending. This function just updates the desired size until the actual "resizing"
// happens.
bool SetUsedSize(size_t size) override
bool SetUsedSize(size_t size, Alignment /* alignment */ = Alignment{0}) override
{
if (size == GetSize()) {
// nothing to do
Expand Down

0 comments on commit 3c714fd

Please sign in to comment.