-
Notifications
You must be signed in to change notification settings - Fork 876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bidirectional gRPC outputs + fix server streaming gRPC outputs #1241
Changes from all commits
fa9ba19
b4022fb
15ef0cb
e09509c
47f830c
2e562fa
8e2a50e
fa41be7
93eb326
3e774f6
c345af3
2c9c583
8764bba
47e71a2
1f6dcaa
f5688f1
759aaa4
b7155bf
cd858c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,12 +16,12 @@ limitations under the License. | |
|
||
#pragma once | ||
|
||
#include "output.pb.h" | ||
#include "outputs.pb.h" | ||
#include "tbb/concurrent_queue.h" | ||
|
||
namespace falco | ||
{ | ||
namespace output | ||
namespace outputs | ||
{ | ||
typedef tbb::concurrent_queue<response> response_cq; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like The output queue is going to be very important as Falco gains adoptions so spending a few minutes to write down how others should use them might make sense. Traditionally I have also made it a point to at the minimum do this in the public members of the header files, but however we decide to do it would be great! Feel free to ignore this if we want to bring up a broader coding style/documentation discussion as a community. Just sharing thoughts as I look at the code. |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,7 @@ class context | |
{ | ||
public: | ||
context(::grpc::ServerContext* ctx); | ||
~context() = default; | ||
virtual ~context() = default; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here with context, why virtual now? and what does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The rationale is: When a base class destructor is not virtual and you have a variable pointing to an object derived from the base class, deleting the derived instance has undefined behaviour. Which can lead to memory leaks. |
||
|
||
void get_metadata(std::string key, std::string& val); | ||
|
||
|
@@ -50,7 +50,7 @@ class stream_context : public context | |
public: | ||
stream_context(::grpc::ServerContext* ctx): | ||
context(ctx){}; | ||
~stream_context() = default; | ||
virtual ~stream_context() = default; | ||
|
||
enum : char | ||
{ | ||
|
@@ -61,6 +61,15 @@ class stream_context : public context | |
|
||
mutable void* m_stream = nullptr; // todo(fntlnz, leodido) > useful in the future | ||
mutable bool m_has_more = false; | ||
mutable bool m_is_running = true; | ||
}; | ||
|
||
class bidi_context : public stream_context | ||
{ | ||
public: | ||
bidi_context(::grpc::ServerContext* ctx): | ||
stream_context(ctx){}; | ||
virtual ~bidi_context() = default; | ||
}; | ||
|
||
} // namespace grpc | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,12 +24,12 @@ namespace grpc | |
{ | ||
|
||
template<> | ||
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::start(server* srv) | ||
void request_stream_context<outputs::service, outputs::request, outputs::response>::start(server* srv) | ||
{ | ||
m_state = request_context_base::REQUEST; | ||
m_srv_ctx.reset(new ::grpc::ServerContext); | ||
auto srvctx = m_srv_ctx.get(); | ||
m_res_writer.reset(new ::grpc::ServerAsyncWriter<output::response>(srvctx)); | ||
m_res_writer.reset(new ::grpc::ServerAsyncWriter<outputs::response>(srvctx)); | ||
m_stream_ctx.reset(); | ||
m_req.Clear(); | ||
auto cq = srv->m_completion_queue.get(); | ||
|
@@ -38,7 +38,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc | |
} | ||
|
||
template<> | ||
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::process(server* srv) | ||
void request_stream_context<outputs::service, outputs::request, outputs::response>::process(server* srv) | ||
{ | ||
// When it is the 1st process call | ||
if(m_state == request_context_base::REQUEST) | ||
|
@@ -48,40 +48,46 @@ void request_stream_context<falco::output::service, falco::output::request, falc | |
} | ||
|
||
// Processing | ||
output::response res; | ||
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() | ||
outputs::response res; | ||
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // get() | ||
|
||
if(!m_stream_ctx->m_is_running) | ||
{ | ||
m_state = request_context_base::FINISH; | ||
m_res_writer->Finish(::grpc::Status::OK, this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a method from the gRPC engine, not defined by us. |
||
return; | ||
} | ||
|
||
// When there are still more responses to stream | ||
if(m_stream_ctx->m_has_more) | ||
{ | ||
// todo(leodido) > log "write: tag=this, state=m_state" | ||
m_res_writer->Write(res, this); | ||
return; | ||
} | ||
|
||
// No more responses to stream | ||
else | ||
{ | ||
// Communicate to the gRPC runtime that we have finished. | ||
// The memory address of "this" instance uniquely identifies the event. | ||
m_state = request_context_base::FINISH; | ||
// todo(leodido) > log "finish: tag=this, state=m_state" | ||
m_res_writer->Finish(::grpc::Status::OK, this); | ||
} | ||
// Communicate to the gRPC runtime that we have finished. | ||
// The memory address of "this" instance uniquely identifies the event. | ||
m_state = request_context_base::FINISH; | ||
// todo(leodido) > log "finish: tag=this, state=m_state" | ||
m_res_writer->Finish(::grpc::Status::OK, this); | ||
} | ||
|
||
template<> | ||
void request_stream_context<falco::output::service, falco::output::request, falco::output::response>::end(server* srv, bool errored) | ||
void request_stream_context<outputs::service, outputs::request, outputs::response>::end(server* srv, bool error) | ||
{ | ||
if(m_stream_ctx) | ||
{ | ||
if(errored) | ||
if(error) | ||
{ | ||
// todo(leodido) > log error "error streaming: tag=this, state=m_state, stream=m_stream_ctx->m_stream" | ||
} | ||
m_stream_ctx->m_status = errored ? stream_context::ERROR : stream_context::SUCCESS; | ||
m_stream_ctx->m_status = error ? stream_context::ERROR : stream_context::SUCCESS; | ||
|
||
// Complete the processing | ||
output::response res; | ||
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // subscribe() | ||
outputs::response res; | ||
(srv->*m_process_func)(*m_stream_ctx, m_req, res); // get() | ||
} | ||
else | ||
{ | ||
|
@@ -98,7 +104,7 @@ void request_stream_context<falco::output::service, falco::output::request, falc | |
} | ||
|
||
template<> | ||
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::start(server* srv) | ||
void request_context<version::service, version::request, version::response>::start(server* srv) | ||
{ | ||
m_state = request_context_base::REQUEST; | ||
m_srv_ctx.reset(new ::grpc::ServerContext); | ||
|
@@ -113,7 +119,7 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque | |
} | ||
|
||
template<> | ||
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::process(server* srv) | ||
void request_context<version::service, version::request, version::response>::process(server* srv) | ||
{ | ||
version::response res; | ||
(srv->*m_process_func)(m_srv_ctx.get(), m_req, res); | ||
|
@@ -125,13 +131,85 @@ void falco::grpc::request_context<falco::version::service, falco::version::reque | |
} | ||
|
||
template<> | ||
void falco::grpc::request_context<falco::version::service, falco::version::request, falco::version::response>::end(server* srv, bool errored) | ||
void request_context<version::service, version::request, version::response>::end(server* srv, bool error) | ||
{ | ||
// todo(leodido) > handle processing errors here | ||
|
||
// Ask to start processing requests | ||
start(srv); | ||
} | ||
|
||
template<> | ||
void request_bidi_context<outputs::service, outputs::request, outputs::response>::start(server* srv) | ||
{ | ||
m_state = request_context_base::REQUEST; | ||
m_srv_ctx.reset(new ::grpc::ServerContext); | ||
auto srvctx = m_srv_ctx.get(); | ||
m_reader_writer.reset(new ::grpc::ServerAsyncReaderWriter<outputs::response, outputs::request>(srvctx)); | ||
m_req.Clear(); | ||
auto cq = srv->m_completion_queue.get(); | ||
// Request to start processing given requests. | ||
// Using "this" - ie., the memory address of this context - as the tag that uniquely identifies the request. | ||
// In this way, different contexts can serve different requests concurrently. | ||
(srv->m_output_svc.*m_request_func)(srvctx, m_reader_writer.get(), cq, cq, this); | ||
}; | ||
|
||
template<> | ||
void request_bidi_context<outputs::service, outputs::request, outputs::response>::process(server* srv) | ||
{ | ||
switch(m_state) | ||
{ | ||
case request_context_base::REQUEST: | ||
m_bidi_ctx.reset(new bidi_context(m_srv_ctx.get())); | ||
m_bidi_ctx->m_status = bidi_context::STREAMING; | ||
m_state = request_context_base::WRITE; | ||
m_reader_writer->Read(&m_req, this); | ||
return; | ||
case request_context_base::WRITE: | ||
// Processing | ||
{ | ||
outputs::response res; | ||
(srv->*m_process_func)(*m_bidi_ctx, m_req, res); // sub() | ||
|
||
if(!m_bidi_ctx->m_is_running) | ||
{ | ||
m_state = request_context_base::FINISH; | ||
m_reader_writer->Finish(::grpc::Status::OK, this); | ||
return; | ||
} | ||
|
||
if(m_bidi_ctx->m_has_more) | ||
{ | ||
m_state = request_context_base::WRITE; | ||
m_reader_writer->Write(res, this); | ||
return; | ||
} | ||
|
||
m_state = request_context_base::WRITE; | ||
m_reader_writer->Read(&m_req, this); | ||
} | ||
|
||
return; | ||
default: | ||
return; | ||
} | ||
}; | ||
|
||
template<> | ||
void request_bidi_context<outputs::service, outputs::request, outputs::response>::end(server* srv, bool error) | ||
{ | ||
if(m_bidi_ctx) | ||
{ | ||
m_bidi_ctx->m_status = error ? bidi_context::ERROR : bidi_context::SUCCESS; | ||
|
||
// Complete the processing | ||
outputs::response res; | ||
(srv->*m_process_func)(*m_bidi_ctx, m_req, res); // sub() | ||
} | ||
|
||
// Ask to start processing requests | ||
start(srv); | ||
}; | ||
|
||
} // namespace grpc | ||
} // namespace falco | ||
} // namespace falco |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knowing you @leodido you put a lot of thought into this.
Can you share your thinking behind the change. As I start digging more and more into the code I will be opening up more PRs and would like to understand your thinking before suggesting changes.