Skip to content

Commit

Permalink
Support interactions in eb thread model
Browse files Browse the repository at this point in the history
Summary: In this model the tile must be constructed and destroyed in the eb thread in addition to the function processing as usual. This only makes sense for homogenous interactions, so we use the previously-deleted viral annotation `process_in_event_base` on the interaction to make this behavior clear.

Reviewed By: rhodo

Differential Revision: D24507398

fbshipit-source-id: 97e64af8c332386b455178743537d130371dccc0
  • Loading branch information
iahs authored and facebook-github-bot committed Oct 28, 2020
1 parent 1d02b84 commit 27b5c70
Show file tree
Hide file tree
Showing 14 changed files with 987 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

%><%#service:functions%><%#function:returnType%><%^function:starts_interaction?%>

<%^service:interaction?%><%^function:eb%>
<%^function:eb%><%^service:interaction?%>
<% > service_common/function_return_type %> <%service:name%>SvIf::<%function:cpp_name%>(<% > service_common/function_return_param %><% > service_common/function_param_list_commented_out%>) {
apache::thrift::detail::si::throw_app_exn_unimplemented("<%function:name%>");
}
Expand Down Expand Up @@ -47,7 +47,7 @@ folly::coro::Task<<% > service_common/callback_type %>> <%service:name%>SvIf::co
}
#endif // FOLLY_HAS_COROUTINES
<%/function:coroutine?%>
<%/function:eb%><%/service:interaction?%>
<%/service:interaction?%>
<%#service:interaction?%>
#if FOLLY_HAS_COROUTINES
folly::coro::Task<<% > service_common/callback_type %>> <%service:parent_service_name%>SvIf::<%service:name%>If::co_<%function:cpp_name%>(<% > service_common/function_param_list%>) {
Expand All @@ -61,7 +61,7 @@ folly::coro::Task<<% > service_common/callback_type %>> <%service:parent_service
folly::SemiFuture<<% > service_common/async_return_type %>> <%service:parent_service_name%>SvIf::<%service:name%>If::semifuture_<%function:cpp_name%>(<% > service_common/function_param_list_commented_out%>) {
apache::thrift::detail::si::throw_app_exn_unimplemented("semifuture_<%function:name%>");
}
<%/service:interaction?%>
<%/service:interaction?%><%/function:eb%>
<%#function:oneway?%>
<%^function:eb%>
<%^function:coroutine?%>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
<%/service:interaction?%>
<%/function:eb%>
<%#function:oneway?%>
void async_<%#function:eb%>eb<%/function:eb%><%^function:eb%>tm<%/function:eb%>_<%function:cpp_name%>(std::unique_ptr<apache::thrift::HandlerCallbackBase> callback<%function:comma%><% > service_common/function_param_list%>)<%^service:interaction?%> override<%/service:interaction?%>;
<%#service:interaction?%>virtual <%/service:interaction?%>void async_<%#function:eb%>eb<%/function:eb%><%^function:eb%>tm<%/function:eb%>_<%function:cpp_name%>(std::unique_ptr<apache::thrift::HandlerCallbackBase> callback<%function:comma%><% > service_common/function_param_list%>)<%^service:interaction?%> override<%/service:interaction?%>;
<%/function:oneway?%>
<%^function:oneway?%>
void async_<%#function:eb%>eb<%/function:eb%><%^function:eb%>tm<%/function:eb%>_<%function:cpp_name%>(std::unique_ptr<apache::thrift::HandlerCallback<<% > types/unique_ptr_type%>>> callback<%function:comma%><% > service_common/function_param_list%>)<%^service:interaction?%> override<%/service:interaction?%>;
<%#service:interaction?%>virtual <%/service:interaction?%>void async_<%#function:eb%>eb<%/function:eb%><%^function:eb%>tm<%/function:eb%>_<%function:cpp_name%>(std::unique_ptr<apache::thrift::HandlerCallback<<% > types/unique_ptr_type%>>> callback<%function:comma%><% > service_common/function_param_list%>)<%^service:interaction?%> override<%/service:interaction?%>;
<%/function:oneway?%>
<%/function:starts_interaction?%><%#function:starts_interaction?%>
virtual std::unique_ptr<<%type:name%>If> <%function:cpp_name%>();
Expand Down
18 changes: 17 additions & 1 deletion thrift/compiler/parse/thrifty.yy
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ Interaction:
{
lineno_stack.push(LineType::kService, driver.scanner->get_lineno());
}
tok_identifier "{" FlagArgs FunctionList UnflagArgs "}"
tok_identifier "{" FlagArgs FunctionList UnflagArgs "}" TypeAnnotations
{
driver.debug("Interaction -> tok_interaction tok_identifier { FunctionList }");
$$ = $6;
Expand All @@ -1055,6 +1055,22 @@ Interaction:
$$->set_lineno(lineno_stack.pop(LineType::kService));
for (auto* func : $$->get_functions()) {
func->set_is_interaction_member();
if (func->annotations_.count("thread")) {
driver.failure("Interaction methods cannot be individually annotated with "
"thread='eb'. Use process_in_event_base on the interaction instead.");
}
}

if ($9) {
for (const auto& it : $9->annotations_) {
if (it.first == "process_in_event_base") {
for (auto* func : $$->get_functions()) {
func->annotations_["thread"] = "eb";
}
} else {
driver.failure("Unknown interaction annotation '%s'", it.first.c_str());
}
}
}
}

Expand Down
28 changes: 28 additions & 0 deletions thrift/compiler/test/fixtures/interactions/gen-cpp2/MyService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ std::unique_ptr<apache::thrift::AsyncProcessor> MyServiceSvIf::getProcessor() {
std::unique_ptr<MyServiceSvIf::MyInteractionIf> MyServiceSvIf::createMyInteraction() {
apache::thrift::detail::si::throw_app_exn_unimplemented("createMyInteraction");
}
std::unique_ptr<MyServiceSvIf::MyInteractionFastIf> MyServiceSvIf::createMyInteractionFast() {
apache::thrift::detail::si::throw_app_exn_unimplemented("createMyInteractionFast");
}

void MyServiceSvIf::foo() {
apache::thrift::detail::si::throw_app_exn_unimplemented("foo");
Expand Down Expand Up @@ -201,6 +204,22 @@ void MyServiceSvIf::MyInteractionIf::async_tm_encode(std::unique_ptr<apache::thr
#endif // FOLLY_HAS_COROUTINES
}

void MyServiceSvIf::MyInteractionFastIf::async_eb_frobnicate(std::unique_ptr<apache::thrift::HandlerCallback<int32_t>> callback) {
callback->exception(apache::thrift::TApplicationException("Function frobnicate is unimplemented"));
}

void MyServiceSvIf::MyInteractionFastIf::async_eb_ping(std::unique_ptr<apache::thrift::HandlerCallbackBase> /*callback*/) {
LOG(DFATAL) << "Function ping is unimplemented";
}

void MyServiceSvIf::MyInteractionFastIf::async_eb_truthify(std::unique_ptr<apache::thrift::HandlerCallback<apache::thrift::ServerStream<bool>>> callback) {
callback->exception(apache::thrift::TApplicationException("Function truthify is unimplemented"));
}

void MyServiceSvIf::MyInteractionFastIf::async_eb_encode(std::unique_ptr<apache::thrift::HandlerCallback<apache::thrift::ResponseAndSinkConsumer<::std::set<float>,::std::string,::std::string>>> callback) {
callback->exception(apache::thrift::TApplicationException("Function encode is unimplemented"));
}

const char* MyServiceAsyncProcessor::getServiceName() {
return "MyService";
}
Expand All @@ -227,6 +246,10 @@ const MyServiceAsyncProcessor::ProcessMap MyServiceAsyncProcessor::binaryProcess
{"MyInteraction.ping", &MyServiceAsyncProcessor::setUpAndProcess_MyInteraction_ping<apache::thrift::BinaryProtocolReader, apache::thrift::BinaryProtocolWriter>},
{"MyInteraction.truthify", &MyServiceAsyncProcessor::setUpAndProcess_MyInteraction_truthify<apache::thrift::BinaryProtocolReader, apache::thrift::BinaryProtocolWriter>},
{"MyInteraction.encode", &MyServiceAsyncProcessor::setUpAndProcess_MyInteraction_encode<apache::thrift::BinaryProtocolReader, apache::thrift::BinaryProtocolWriter>},
{"MyInteractionFast.frobnicate", &MyServiceAsyncProcessor::setUpAndProcess_MyInteractionFast_frobnicate<apache::thrift::BinaryProtocolReader, apache::thrift::BinaryProtocolWriter>},
{"MyInteractionFast.ping", &MyServiceAsyncProcessor::setUpAndProcess_MyInteractionFast_ping<apache::thrift::BinaryProtocolReader, apache::thrift::BinaryProtocolWriter>},
{"MyInteractionFast.truthify", &MyServiceAsyncProcessor::setUpAndProcess_MyInteractionFast_truthify<apache::thrift::BinaryProtocolReader, apache::thrift::BinaryProtocolWriter>},
{"MyInteractionFast.encode", &MyServiceAsyncProcessor::setUpAndProcess_MyInteractionFast_encode<apache::thrift::BinaryProtocolReader, apache::thrift::BinaryProtocolWriter>},
};

const MyServiceAsyncProcessor::ProcessMap& MyServiceAsyncProcessor::getCompactProtocolProcessMap() {
Expand All @@ -239,6 +262,10 @@ const MyServiceAsyncProcessor::ProcessMap MyServiceAsyncProcessor::compactProces
{"MyInteraction.ping", &MyServiceAsyncProcessor::setUpAndProcess_MyInteraction_ping<apache::thrift::CompactProtocolReader, apache::thrift::CompactProtocolWriter>},
{"MyInteraction.truthify", &MyServiceAsyncProcessor::setUpAndProcess_MyInteraction_truthify<apache::thrift::CompactProtocolReader, apache::thrift::CompactProtocolWriter>},
{"MyInteraction.encode", &MyServiceAsyncProcessor::setUpAndProcess_MyInteraction_encode<apache::thrift::CompactProtocolReader, apache::thrift::CompactProtocolWriter>},
{"MyInteractionFast.frobnicate", &MyServiceAsyncProcessor::setUpAndProcess_MyInteractionFast_frobnicate<apache::thrift::CompactProtocolReader, apache::thrift::CompactProtocolWriter>},
{"MyInteractionFast.ping", &MyServiceAsyncProcessor::setUpAndProcess_MyInteractionFast_ping<apache::thrift::CompactProtocolReader, apache::thrift::CompactProtocolWriter>},
{"MyInteractionFast.truthify", &MyServiceAsyncProcessor::setUpAndProcess_MyInteractionFast_truthify<apache::thrift::CompactProtocolReader, apache::thrift::CompactProtocolWriter>},
{"MyInteractionFast.encode", &MyServiceAsyncProcessor::setUpAndProcess_MyInteractionFast_encode<apache::thrift::CompactProtocolReader, apache::thrift::CompactProtocolWriter>},
};

const MyServiceAsyncProcessor::InteractionConstructorMap& MyServiceAsyncProcessor::getInteractionConstructorMap() {
Expand All @@ -247,6 +274,7 @@ const MyServiceAsyncProcessor::InteractionConstructorMap& MyServiceAsyncProcesso

const MyServiceAsyncProcessor::InteractionConstructorMap MyServiceAsyncProcessor::interactionConstructorMap_ {
{"MyInteraction", &MyServiceAsyncProcessor::createMyInteraction},
{"MyInteractionFast", &MyServiceAsyncProcessor::createMyInteractionFast},
};

std::unique_ptr<apache::thrift::Tile> MyServiceAsyncProcessor::createInteractionImpl(const std::string& name) {
Expand Down
51 changes: 47 additions & 4 deletions thrift/compiler/test/fixtures/interactions/gen-cpp2/MyService.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,39 @@ class MyInteractionIf : public apache::thrift::Tile, public apache::thrift::Serv
virtual folly::coro::Task<int32_t> co_frobnicate(apache::thrift::RequestParams params);
#endif
virtual folly::SemiFuture<int32_t> semifuture_frobnicate();
void async_tm_frobnicate(std::unique_ptr<apache::thrift::HandlerCallback<int32_t>> callback);
virtual void async_tm_frobnicate(std::unique_ptr<apache::thrift::HandlerCallback<int32_t>> callback);
#if FOLLY_HAS_COROUTINES
virtual folly::coro::Task<void> co_ping();
virtual folly::coro::Task<void> co_ping(apache::thrift::RequestParams params);
#endif
virtual folly::SemiFuture<folly::Unit> semifuture_ping();
void async_tm_ping(std::unique_ptr<apache::thrift::HandlerCallbackBase> callback);
virtual void async_tm_ping(std::unique_ptr<apache::thrift::HandlerCallbackBase> callback);
#if FOLLY_HAS_COROUTINES
virtual folly::coro::Task<apache::thrift::ServerStream<bool>> co_truthify();
virtual folly::coro::Task<apache::thrift::ServerStream<bool>> co_truthify(apache::thrift::RequestParams params);
#endif
virtual folly::SemiFuture<apache::thrift::ServerStream<bool>> semifuture_truthify();
void async_tm_truthify(std::unique_ptr<apache::thrift::HandlerCallback<apache::thrift::ServerStream<bool>>> callback);
virtual void async_tm_truthify(std::unique_ptr<apache::thrift::HandlerCallback<apache::thrift::ServerStream<bool>>> callback);
#if FOLLY_HAS_COROUTINES
virtual folly::coro::Task<apache::thrift::ResponseAndSinkConsumer<::std::set<float>,::std::string,::std::string>> co_encode();
virtual folly::coro::Task<apache::thrift::ResponseAndSinkConsumer<::std::set<float>,::std::string,::std::string>> co_encode(apache::thrift::RequestParams params);
#endif
virtual folly::SemiFuture<apache::thrift::ResponseAndSinkConsumer<::std::set<float>,::std::string,::std::string>> semifuture_encode();
void async_tm_encode(std::unique_ptr<apache::thrift::HandlerCallback<apache::thrift::ResponseAndSinkConsumer<::std::set<float>,::std::string,::std::string>>> callback);
virtual void async_tm_encode(std::unique_ptr<apache::thrift::HandlerCallback<apache::thrift::ResponseAndSinkConsumer<::std::set<float>,::std::string,::std::string>>> callback);
};class MyInteractionFastIf : public apache::thrift::Tile, public apache::thrift::ServerInterface {
public:
typedef MyServiceAsyncProcessor ProcessorType;
virtual ~MyInteractionFastIf() = default;
std::unique_ptr<apache::thrift::AsyncProcessor> getProcessor() override {
std::terminate();
}
virtual void async_eb_frobnicate(std::unique_ptr<apache::thrift::HandlerCallback<int32_t>> callback);
virtual void async_eb_ping(std::unique_ptr<apache::thrift::HandlerCallbackBase> callback);
virtual void async_eb_truthify(std::unique_ptr<apache::thrift::HandlerCallback<apache::thrift::ServerStream<bool>>> callback);
virtual void async_eb_encode(std::unique_ptr<apache::thrift::HandlerCallback<apache::thrift::ResponseAndSinkConsumer<::std::set<float>,::std::string,::std::string>>> callback);
};
virtual std::unique_ptr<MyInteractionIf> createMyInteraction();
virtual std::unique_ptr<MyInteractionFastIf> createMyInteractionFast();
virtual void foo();
folly::Future<folly::Unit> future_foo() override;
folly::SemiFuture<folly::Unit> semifuture_foo() override;
Expand Down Expand Up @@ -113,6 +125,9 @@ class MyServiceAsyncProcessor : public ::apache::thrift::GeneratedAsyncProcessor
std::unique_ptr<apache::thrift::Tile> createMyInteraction() {
return iface_->createMyInteraction();
}
std::unique_ptr<apache::thrift::Tile> createMyInteractionFast() {
return iface_->createMyInteractionFast();
}
template <typename ProtocolIn_, typename ProtocolOut_>
void setUpAndProcess_foo(apache::thrift::ResponseChannelRequest::UniquePtr req, apache::thrift::SerializedRequest&& serializedRequest, apache::thrift::Cpp2RequestContext* ctx, folly::EventBase* eb, apache::thrift::concurrency::ThreadManager* tm);
template <typename ProtocolIn_, typename ProtocolOut_>
Expand Down Expand Up @@ -149,6 +164,34 @@ class MyServiceAsyncProcessor : public ::apache::thrift::GeneratedAsyncProcessor
static std::pair<folly::IOBufQueue, apache::thrift::detail::SinkConsumerImpl> return_MyInteraction_encode(apache::thrift::ContextStack* ctx, apache::thrift::ResponseAndSinkConsumer<::std::set<float>,::std::string,::std::string>&& _return, folly::Executor::KeepAlive<> executor);
template <class ProtocolIn_, class ProtocolOut_>
static void throw_wrapped_MyInteraction_encode(apache::thrift::ResponseChannelRequest::UniquePtr req,int32_t protoSeqId,apache::thrift::ContextStack* ctx,folly::exception_wrapper ew,apache::thrift::Cpp2RequestContext* reqCtx);
template <typename ProtocolIn_, typename ProtocolOut_>
void setUpAndProcess_MyInteractionFast_frobnicate(apache::thrift::ResponseChannelRequest::UniquePtr req, apache::thrift::SerializedRequest&& serializedRequest, apache::thrift::Cpp2RequestContext* ctx, folly::EventBase* eb, apache::thrift::concurrency::ThreadManager* tm);
template <typename ProtocolIn_, typename ProtocolOut_>
void process_MyInteractionFast_frobnicate(apache::thrift::ResponseChannelRequest::UniquePtr req, apache::thrift::SerializedRequest&& serializedRequest, apache::thrift::Cpp2RequestContext* ctx,folly::EventBase* eb, apache::thrift::concurrency::ThreadManager* tm);
template <class ProtocolIn_, class ProtocolOut_>
static folly::IOBufQueue return_MyInteractionFast_frobnicate(int32_t protoSeqId, apache::thrift::ContextStack* ctx, int32_t const& _return);
template <class ProtocolIn_, class ProtocolOut_>
static void throw_wrapped_MyInteractionFast_frobnicate(apache::thrift::ResponseChannelRequest::UniquePtr req,int32_t protoSeqId,apache::thrift::ContextStack* ctx,folly::exception_wrapper ew,apache::thrift::Cpp2RequestContext* reqCtx);
template <typename ProtocolIn_, typename ProtocolOut_>
void setUpAndProcess_MyInteractionFast_ping(apache::thrift::ResponseChannelRequest::UniquePtr req, apache::thrift::SerializedRequest&& serializedRequest, apache::thrift::Cpp2RequestContext* ctx, folly::EventBase* eb, apache::thrift::concurrency::ThreadManager* tm);
template <typename ProtocolIn_, typename ProtocolOut_>
void process_MyInteractionFast_ping(apache::thrift::ResponseChannelRequest::UniquePtr req, apache::thrift::SerializedRequest&& serializedRequest, apache::thrift::Cpp2RequestContext* ctx,folly::EventBase* eb, apache::thrift::concurrency::ThreadManager* tm);
template <typename ProtocolIn_, typename ProtocolOut_>
void setUpAndProcess_MyInteractionFast_truthify(apache::thrift::ResponseChannelRequest::UniquePtr req, apache::thrift::SerializedRequest&& serializedRequest, apache::thrift::Cpp2RequestContext* ctx, folly::EventBase* eb, apache::thrift::concurrency::ThreadManager* tm);
template <typename ProtocolIn_, typename ProtocolOut_>
void process_MyInteractionFast_truthify(apache::thrift::ResponseChannelRequest::UniquePtr req, apache::thrift::SerializedRequest&& serializedRequest, apache::thrift::Cpp2RequestContext* ctx,folly::EventBase* eb, apache::thrift::concurrency::ThreadManager* tm);
template <class ProtocolIn_, class ProtocolOut_>
static apache::thrift::ResponseAndServerStreamFactory return_MyInteractionFast_truthify(int32_t protoSeqId, apache::thrift::ContextStack* ctx, folly::Executor::KeepAlive<> executor, apache::thrift::ServerStream<bool> _return);
template <class ProtocolIn_, class ProtocolOut_>
static void throw_wrapped_MyInteractionFast_truthify(apache::thrift::ResponseChannelRequest::UniquePtr req,int32_t protoSeqId,apache::thrift::ContextStack* ctx,folly::exception_wrapper ew,apache::thrift::Cpp2RequestContext* reqCtx);
template <typename ProtocolIn_, typename ProtocolOut_>
void setUpAndProcess_MyInteractionFast_encode(apache::thrift::ResponseChannelRequest::UniquePtr req, apache::thrift::SerializedRequest&& serializedRequest, apache::thrift::Cpp2RequestContext* ctx, folly::EventBase* eb, apache::thrift::concurrency::ThreadManager* tm);
template <typename ProtocolIn_, typename ProtocolOut_>
void process_MyInteractionFast_encode(apache::thrift::ResponseChannelRequest::UniquePtr req, apache::thrift::SerializedRequest&& serializedRequest, apache::thrift::Cpp2RequestContext* ctx,folly::EventBase* eb, apache::thrift::concurrency::ThreadManager* tm);
template <class ProtocolIn_, class ProtocolOut_>
static std::pair<folly::IOBufQueue, apache::thrift::detail::SinkConsumerImpl> return_MyInteractionFast_encode(apache::thrift::ContextStack* ctx, apache::thrift::ResponseAndSinkConsumer<::std::set<float>,::std::string,::std::string>&& _return, folly::Executor::KeepAlive<> executor);
template <class ProtocolIn_, class ProtocolOut_>
static void throw_wrapped_MyInteractionFast_encode(apache::thrift::ResponseChannelRequest::UniquePtr req,int32_t protoSeqId,apache::thrift::ContextStack* ctx,folly::exception_wrapper ew,apache::thrift::Cpp2RequestContext* reqCtx);
public:
MyServiceAsyncProcessor(MyServiceSvIf* iface) :
iface_(iface) {}
Expand Down
Loading

0 comments on commit 27b5c70

Please sign in to comment.