Skip to content

Commit

Permalink
fix(agw): Provides the fixes for the issue 1264 (magma#12474)
Browse files Browse the repository at this point in the history
Signed-off-by: Rashmi <rashmi.sarwad@radisys.com>
  • Loading branch information
rsarwad authored and emakeev committed Aug 5, 2022
1 parent d2ff397 commit 64a3130
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,17 @@ namespace magma {
magma::S6aProxyResponderAsyncService*
S6aProxyResponderAsyncService::s6a_async_service = 0;

magma::service303::MagmaService server(S6A_ASYNC_PROXY_SERVICE,
S6A_ASYNC_PROXY_VERSION);
magma::service303::MagmaService s6a_async_grpc_server(S6A_ASYNC_PROXY_SERVICE,
S6A_ASYNC_PROXY_VERSION);

void stop_async_s6a_grpc_server(void) {
magma::S6aProxyResponderAsyncService* s6a_proxy_async_instance =
magma::S6aProxyResponderAsyncService::getInstance();
magma::s6a_async_grpc_server.Stop(); // Stop async grpc server
s6a_proxy_async_instance->stop(); // stop queue after server shuts down
s6a_proxy_async_instance->async_grpc_message_receiver_thread.join();
delete s6a_proxy_async_instance;
}

AsyncService::AsyncService(std::unique_ptr<ServerCompletionQueue> cq)
: cq_(std::move(cq)) {}
Expand All @@ -61,18 +70,26 @@ void AsyncService::wait_for_requests() {
return;
}
if (!ok) {
OAILOG_ERROR(LOG_S6A, "encountered error while processing request");
delete static_cast<CallData*>(tag);
continue;
}
static_cast<CallData*>(tag)->proceed();
}
return;
}

void AsyncService::stop() {
running_ = false;
cq_->Shutdown();
// Pop all items in the queue until it is empty
void* tag;
bool ok;
while (cq_->Next(&tag, &ok)) {
OAILOG_INFO(LOG_S6A,
"CQ is shutdown, pop out all messages from CQ and release the "
"messages ");
delete static_cast<CallData*>(tag);
}
}

Expand Down Expand Up @@ -190,19 +207,18 @@ static void* grpc_async_service_thread(__attribute__((unused)) void* args) {
auto async_service_handler =
std::make_shared<magma::S6aProxyAsyncResponderHandler>();
s6a_proxy_async_instance->set_callback(
std::move(magma::server.GetNewCompletionQueue()), async_service_handler);
magma::server.AddServiceToServer(s6a_proxy_async_instance);
std::move(magma::s6a_async_grpc_server.GetNewCompletionQueue()),
async_service_handler);
magma::s6a_async_grpc_server.AddServiceToServer(s6a_proxy_async_instance);

magma::server.Start();
magma::s6a_async_grpc_server.Start();
OAILOG_INFO(LOG_S6A, "Started async grpc server for s6a interface \n");
std::thread proxy_thread([&]() {
s6a_proxy_async_instance
->wait_for_requests(); // block here instead of on server
s6a_proxy_async_instance->stop(); // stop queue after server shuts down
free(s6a_proxy_async_instance);
});
s6a_proxy_async_instance->async_grpc_message_receiver_thread =
std::thread([&]() {
s6a_proxy_async_instance
->wait_for_requests(); // block here instead of on server
});
zloop_start(grpc_async_service_task_zmq_ctx.event_loop);
proxy_thread.join();
AssertFatal(
0,
"Asserting as async grpc_service_thread should not be exiting on its "
Expand All @@ -223,6 +239,7 @@ extern "C" int grpc_async_service_init(void) {

static void grpc_async_service_exit(void) {
destroy_task_context(&grpc_async_service_task_zmq_ctx);
magma::stop_async_s6a_grpc_server();
OAI_FPRINTF_INFO("TASK_ASYNC_GRPC_SERVICE terminated\n");
pthread_exit(NULL);
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class S6aProxyResponderAsyncService final
std::shared_ptr<S6aProxyAsyncResponderHandler> handler);

public:
std::thread async_grpc_message_receiver_thread;
static S6aProxyResponderAsyncService* getInstance() {
if (!s6a_async_service) {
s6a_async_service = new S6aProxyResponderAsyncService(nullptr, nullptr);
Expand Down

0 comments on commit 64a3130

Please sign in to comment.