Skip to content

Commit

Permalink
feat: add ServiceProxyResponseQueue (philips-software#338)
Browse files Browse the repository at this point in the history
* feat: add ServiceProxyResponseQueue

* chore: resolved pr comments

* chore: resolved pr comments

* chore: updated feature with additional RequestSend overload to also add requestedSize

* feat: added test for EchoServiceResponseQueue
  • Loading branch information
daantimmer committed Jul 17, 2023
1 parent 879f608 commit 88cb197
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 5 deletions.
4 changes: 1 addition & 3 deletions protobuf/echo/Echo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ namespace services

void ServiceProxy::RequestSend(infra::Function<void()> onGranted)
{
this->onGranted = onGranted;
currentRequestedSize = MaxMessageSize();
echo.RequestSend(*this);
RequestSend(onGranted, MaxMessageSize());
}

void ServiceProxy::RequestSend(infra::Function<void()> onGranted, uint32_t requestedSize)
Expand Down
77 changes: 75 additions & 2 deletions protobuf/echo/Echo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "infra/syntax/ProtoFormatter.hpp"
#include "infra/syntax/ProtoParser.hpp"
#include "infra/util/BoundedDeque.hpp"
#include "infra/util/Compatibility.hpp"
#include "infra/util/Function.hpp"
#include "infra/util/Optional.hpp"
Expand Down Expand Up @@ -197,8 +198,8 @@ namespace services
ServiceProxy(Echo& echo, uint32_t maxMessageSize);

Echo& Rpc();
void RequestSend(infra::Function<void()> onGranted);
void RequestSend(infra::Function<void()> onGranted, uint32_t requestedSize);
virtual void RequestSend(infra::Function<void()> onGranted);
virtual void RequestSend(infra::Function<void()> onGranted, uint32_t requestedSize);
void GrantSend();
uint32_t MaxMessageSize() const;
uint32_t CurrentRequestedSize() const;
Expand All @@ -210,6 +211,37 @@ namespace services
uint32_t currentRequestedSize = 0;
};

template<class ServiceProxyType>
class ServiceProxyResponseQueue
: public ServiceProxyType
{
public:
struct Request
{
infra::Function<void()> onRequestGranted;
uint32_t requestedSize;
};

using Container = infra::BoundedDeque<Request>;

template<std::size_t Max>
using WithStorage = infra::WithStorage<ServiceProxyResponseQueue, typename Container::template WithMaxSize<Max>>;

template<class... Args>
explicit ServiceProxyResponseQueue(Container& container, Args&&... args);

void RequestSend(infra::Function<void()> onRequestGranted) override;
void RequestSend(infra::Function<void()> onRequestGranted, uint32_t requestedSize) override;

private:
void ProcessSendQueue();

private:
Container& container;

bool responseInProgress{ false };
};

class EchoOnStreams
: public Echo
, public infra::EnableSharedFromThis<EchoOnStreams>
Expand Down Expand Up @@ -497,6 +529,47 @@ namespace services
if (field.first.Is<infra::ProtoLengthDelimited>())
field.first.Get<infra::ProtoLengthDelimited>().GetStringReference(value);
}

template<class ServiceProxyType>
template<class... Args>
ServiceProxyResponseQueue<ServiceProxyType>::ServiceProxyResponseQueue(Container& container, Args&&... args)
: ServiceProxyType{ std::forward<Args>(args)... }
, container{ container }
{}

template<class ServiceProxyType>
void ServiceProxyResponseQueue<ServiceProxyType>::RequestSend(infra::Function<void()> onRequestGranted)
{
RequestSend(onRequestGranted, ServiceProxyType::MaxMessageSize());
}

template<class ServiceProxyType>
void ServiceProxyResponseQueue<ServiceProxyType>::RequestSend(infra::Function<void()> onRequestGranted, uint32_t requestedSize)
{
if (container.full())
return;

container.push_back({ onRequestGranted, requestedSize });
ProcessSendQueue();
}

template<class ServiceProxyType>
void ServiceProxyResponseQueue<ServiceProxyType>::ProcessSendQueue()
{
if (!responseInProgress && !container.empty())
{
responseInProgress = true;
ServiceProxyType::RequestSend([this]
{
container.front().onRequestGranted();
container.pop_front();

responseInProgress = false;
ProcessSendQueue();
},
container.front().requestedSize);
}
}
}

#endif
2 changes: 2 additions & 0 deletions protobuf/echo/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ emil_build_for(protobuf.echo_test BOOL EMIL_BUILD_TESTS)
emil_add_test(protobuf.echo_test)

target_sources(protobuf.echo_test PRIVATE
TestEchoServiceResponseQueue.cpp
TestServiceForwarder.cpp
)

Expand All @@ -11,4 +12,5 @@ target_link_libraries(protobuf.echo_test PUBLIC
infra.event_test_helper
infra.util_test_helper
protobuf.echo
protobuf.test_doubles
)
66 changes: 66 additions & 0 deletions protobuf/echo/test/TestEchoServiceResponseQueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include "infra/util/Function.hpp"
#include "infra/util/test_helper/MockCallback.hpp"
#include "protobuf/echo/Echo.hpp"
#include "protobuf/echo/test_doubles/EchoMock.hpp"
#include "protobuf/echo/test_doubles/ServiceStub.hpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace services
{
class EchoServiceResponseQueueTest
: public testing::Test
{
public:
testing::StrictMock<EchoMock> echo;
testing::StrictMock<ServiceProxyResponseQueue<ServiceStubProxy>>::WithStorage<2> serviceProxy{ echo };
};

TEST_F(EchoServiceResponseQueueTest, RequestSendWhileEmptyImmediatelyForwardsRequestToEcho)
{
EXPECT_CALL(echo, RequestSend(testing::Ref(serviceProxy)));

serviceProxy.RequestSend(infra::emptyFunction);
}

TEST_F(EchoServiceResponseQueueTest, RequestsNextRequestAfterGrantingAccess)
{
infra::VerifyingFunctionMock<void()> request1{};
infra::VerifyingFunctionMock<void()> request2{};

EXPECT_CALL(echo, RequestSend(testing::Ref(serviceProxy))).Times(2);

serviceProxy.RequestSend(request1, 128);
serviceProxy.RequestSend(request2, 50);

EXPECT_THAT(serviceProxy.CurrentRequestedSize(), testing::Eq(128));
serviceProxy.GrantSend();

EXPECT_THAT(serviceProxy.CurrentRequestedSize(), testing::Eq(50));
serviceProxy.GrantSend();
}

TEST_F(EchoServiceResponseQueueTest, DiscardsRequestWhenQueueFull)
{
infra::VerifyingFunctionMock<void()> request1{};
infra::VerifyingFunctionMock<void()> request2{};
infra::VerifyingFunctionMock<void()> request4{};

EXPECT_CALL(echo, RequestSend(testing::Ref(serviceProxy)))
.Times(3);

serviceProxy.RequestSend(request1);
serviceProxy.RequestSend(request2);

serviceProxy.RequestSend([]
{
FAIL();
});

serviceProxy.GrantSend();
serviceProxy.GrantSend();

serviceProxy.RequestSend(request4);
serviceProxy.GrantSend();
}
}

0 comments on commit 88cb197

Please sign in to comment.