Skip to content

Commit

Permalink
Merge branch 'main' into pipe
Browse files Browse the repository at this point in the history
Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk committed Jun 13, 2024
2 parents edc3df4 + a0b2ec2 commit f495819
Show file tree
Hide file tree
Showing 15 changed files with 599 additions and 344 deletions.
3 changes: 1 addition & 2 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ build:docs-ci --action_env=DOCS_RST_CHECK=1 --host_action_env=DOCS_RST_CHECK=1
build --incompatible_config_setting_private_default_visibility
build --incompatible_enforce_config_setting_visibility

test --bes_upload_mode=nowait_for_upload_complete
test --bes_timeout=30s
test --test_verbose_timeout_warnings

# Allow tags to influence execution requirements
Expand Down Expand Up @@ -511,6 +509,7 @@ build:rbe-google --config=cache-google

build:rbe-google-bes --bes_backend=grpcs://buildeventservice.googleapis.com
build:rbe-google-bes --bes_results_url=https://source.cloud.google.com/results/invocations/
build:rbe-google-bes --bes_upload_mode=fully_async

# RBE (Engflow mobile)
build:rbe-engflow --google_default_credentials=false
Expand Down
37 changes: 28 additions & 9 deletions contrib/generic_proxy/filters/network/source/codecs/dubbo/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ class DubboRequest : public Request {
ASSERT(inner_metadata_ != nullptr);
ASSERT(inner_metadata_->hasContext());
ASSERT(inner_metadata_->hasRequest());

uint32_t frame_flags = FrameFlags::FLAG_END_STREAM; // Dubbo message only has one frame.
if (!inner_metadata_->context().isTwoWay()) {
frame_flags |= FrameFlags::FLAG_ONE_WAY;
}
if (inner_metadata_->context().heartbeat()) {
frame_flags |= FrameFlags::FLAG_HEARTBEAT;
}

stream_frame_flags_ = {static_cast<uint64_t>(inner_metadata_->requestId()), frame_flags};
}

// Request
Expand All @@ -43,9 +53,10 @@ class DubboRequest : public Request {
// StreamFrame
FrameFlags frameFlags() const override { return stream_frame_flags_; }

FrameFlags stream_frame_flags_;

Common::Dubbo::MessageMetadataSharedPtr inner_metadata_;

private:
FrameFlags stream_frame_flags_;
};

class DubboResponse : public Response {
Expand All @@ -56,6 +67,16 @@ class DubboResponse : public Response {
ASSERT(inner_metadata_->hasContext());
ASSERT(inner_metadata_->hasResponse());
refreshStatus();

uint32_t frame_flags = FrameFlags::FLAG_END_STREAM; // Dubbo message only has one frame.
if (!inner_metadata_->context().isTwoWay()) {
frame_flags |= FrameFlags::FLAG_ONE_WAY;
}
if (inner_metadata_->context().heartbeat()) {
frame_flags |= FrameFlags::FLAG_HEARTBEAT;
}

stream_frame_flags_ = {static_cast<uint64_t>(inner_metadata_->requestId()), frame_flags};
}

void refreshStatus();
Expand All @@ -67,10 +88,11 @@ class DubboResponse : public Response {
// StreamFrame
FrameFlags frameFlags() const override { return stream_frame_flags_; }

FrameFlags stream_frame_flags_;

StreamStatus status_;
Common::Dubbo::MessageMetadataSharedPtr inner_metadata_;

private:
FrameFlags stream_frame_flags_;
};

class DubboCodecBase : public Logger::Loggable<Logger::Id::connection> {
Expand Down Expand Up @@ -136,12 +158,9 @@ class DubboDecoderBase : public DubboCodecBase, public CodecType {
return Common::Dubbo::DecodeStatus::Success;
}

auto message = std::make_unique<DecoderMessageType>(metadata_);
message->stream_frame_flags_ = {{static_cast<uint64_t>(metadata_->requestId()),
!metadata_->context().isTwoWay(), false,
metadata_->context().heartbeat()},
true};
auto message = std::make_unique<DecoderMessageType>(std::move(metadata_));
metadata_.reset();

callback_->onDecodingSuccess(std::move(message));

return Common::Dubbo::DecodeStatus::Success;
Expand Down
14 changes: 11 additions & 3 deletions contrib/generic_proxy/filters/network/source/codecs/http1/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class HttpRequestFrame : public HttpHeaderFrame<StreamRequest> {
HttpRequestFrame(Http::RequestHeaderMapPtr request, bool end_stream)
: request_(std::move(request)) {
ASSERT(request_ != nullptr);
frame_flags_ = {StreamFlags{}, end_stream};
frame_flags_ = {0, end_stream ? FrameFlags::FLAG_END_STREAM : FrameFlags::FLAG_EMPTY};
}

absl::string_view host() const override { return request_->getHostValue(); }
Expand All @@ -80,7 +80,15 @@ class HttpResponseFrame : public HttpHeaderFrame<StreamResponse> {
const bool drain_close = Envoy::StringUtil::caseFindToken(
response_->getConnectionValue(), ",", Http::Headers::get().ConnectionValues.Close);

frame_flags_ = {StreamFlags{0, false, drain_close, false}, end_stream};
uint32_t flags = 0;
if (end_stream) {
flags |= FrameFlags::FLAG_END_STREAM;
}
if (drain_close) {
flags |= FrameFlags::FLAG_DRAIN_CLOSE;
}

frame_flags_ = {0, flags};
}

StreamStatus status() const override {
Expand All @@ -101,7 +109,7 @@ class HttpResponseFrame : public HttpHeaderFrame<StreamResponse> {
class HttpRawBodyFrame : public CommonFrame {
public:
HttpRawBodyFrame(Envoy::Buffer::Instance& buffer, bool end_stream)
: frame_flags_({StreamFlags{}, end_stream}) {
: frame_flags_(0, end_stream ? FrameFlags::FLAG_END_STREAM : FrameFlags::FLAG_EMPTY) {
buffer_.move(buffer);
}
FrameFlags frameFlags() const override { return frame_flags_; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class KafkaRequestFrame : public GenericProxy::StreamRequest {
if (request_ == nullptr) {
return FrameFlags{};
}
return FrameFlags{
StreamFlags{static_cast<uint64_t>(request_->request_header_.correlation_id_)}};
return FrameFlags{static_cast<uint64_t>(request_->request_header_.correlation_id_)};
}

absl::string_view protocol() const override { return "kafka"; }
Expand All @@ -46,7 +45,7 @@ class KafkaResponseFrame : public GenericProxy::StreamResponse {
if (response_ == nullptr) {
return FrameFlags{};
}
return FrameFlags{StreamFlags{static_cast<uint64_t>(response_->metadata_.correlation_id_)}};
return FrameFlags{static_cast<uint64_t>(response_->metadata_.correlation_id_)};
}

absl::string_view protocol() const override { return "kafka"; }
Expand Down
116 changes: 41 additions & 75 deletions contrib/generic_proxy/filters/network/source/interface/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,92 +17,35 @@ namespace Extensions {
namespace NetworkFilters {
namespace GenericProxy {

/**
* Stream flags from request or response to control the behavior of the
* generic proxy filter. This is mainly used as part of FrameFlags.
* All these flags could be ignored for the simple ping-pong use case.
*/
class StreamFlags {
public:
StreamFlags(uint64_t stream_id = 0, bool one_way_stream = false, bool drain_close = false,
bool is_heartbeat = false)
: stream_id_(stream_id), one_way_stream_(one_way_stream), drain_close_(drain_close),
is_heartbeat_(is_heartbeat) {}

/**
* @return the stream id of the request or response. This is used to match the
* downstream request with the upstream response.
* NOTE: In most cases, the stream id is not needed and will be ignored completely.
* The stream id is only used when we can't match the downstream request
* with the upstream response by the active stream instance self directly.
* For example, when the multiple downstream requests are multiplexed into one
* upstream connection.
*/
uint64_t streamId() const { return stream_id_; }

/**
* @return whether the stream is one way stream. If request is one way stream, the
* generic proxy filter will not wait for the response from the upstream.
*/
bool oneWayStream() const { return one_way_stream_; }

/**
* @return whether the downstream/upstream connection should be drained after
* current active stream are finished.
*/
bool drainClose() const { return drain_close_; }

/**
* @return whether the current request/response is a heartbeat request/response.
* NOTE: It would be better to handle heartbeat request/response by another L4
* filter. Then the generic proxy filter can be used for the simple ping-pong
* use case.
*/
bool isHeartbeat() const { return is_heartbeat_; }

private:
uint64_t stream_id_{0};

bool one_way_stream_{false};
bool drain_close_{false};
bool is_heartbeat_{false};
};

/**
* Flags of stream frame. This is used to control the behavior of the generic proxy filter.
* All these flags could be ignored for the simple ping-pong use case.
*/
class FrameFlags {
public:
static constexpr uint32_t FLAG_EMPTY = 0x0000;
static constexpr uint32_t FLAG_END_STREAM = 0x0001;
static constexpr uint32_t FLAG_ONE_WAY = 0x0002;
static constexpr uint32_t FLAG_DRAIN_CLOSE = 0x0004;
static constexpr uint32_t FLAG_HEARTBEAT = 0x0008;

/**
* Construct FrameFlags with stream flags and end stream flag. The stream flags MUST be
* same for all frames of the same stream.
* @param stream_flags StreamFlags of the stream.
* @param end_stream whether the current frame is the last frame of the request or response.
* @param stream_id the stream id of the request or response.
* @param flags flags of the current frame. Only the flags that defined in FrameFlags
* could be used. Multiple flags could be combined by bitwise OR.
* @param frame_tags frame tags of the current frame. The meaning of the frame tags is
* application protocol specific.
*/
FrameFlags(StreamFlags stream_flags = StreamFlags(), bool end_stream = true,
uint32_t frame_tags = 0)
: stream_flags_(stream_flags), frame_tags_(frame_tags), end_stream_(end_stream) {}
FrameFlags(uint64_t stream_id = 0, uint32_t flags = FLAG_END_STREAM, uint32_t frame_tags = 0)
: stream_id_(stream_id), flags_(flags), frame_tags_(frame_tags) {}

/**
* Get flags of stream that the frame belongs to. The flags MUST be same for all frames of the
* same stream. Copy semantics is used because the flags are lightweight (only 16 bytes for now).
* @return StreamFlags of the stream.
* @return the stream id of the request or response. All frames of the same stream
* MUST have the same stream id.
*/
StreamFlags streamFlags() const { return stream_flags_; }

/**
* @return the stream id of the request or response.
*/
uint64_t streamId() const { return stream_flags_.streamId(); }

/**
* @return whether the current frame is the last frame of the request or response.
*/
bool endStream() const { return end_stream_; }
uint64_t streamId() const { return stream_id_; }

/**
* @return frame tags of the current frame. The meaning of the frame tags is application
Expand All @@ -114,12 +57,35 @@ class FrameFlags {
*/
uint32_t frameTags() const { return frame_tags_; }

private:
StreamFlags stream_flags_{};
/**
* @return whether the current frame is the last frame of the request or response.
*/
bool endStream() const { return flags_ & FLAG_END_STREAM; }

/**
* @return whether the downstream/upstream connection should be drained after
* current active stream are finished.
* NOTE: Only the response header frame's drainClose() flag will be used.
*/
bool drainClose() const { return flags_ & FLAG_DRAIN_CLOSE; }

/**
* @return whether the stream is one way stream. If request is one way stream, the
* generic proxy filter will not wait for the response from the upstream.
* NOTE: Only the request header frame's oneWayStream() flag will be used.
*/
bool oneWayStream() const { return flags_ & FLAG_ONE_WAY; }

/**
* @return whether the current request/response is a heartbeat request/response.
* NOTE: Only the header frame's isHeartbeat() flag will be used.
*/
bool isHeartbeat() const { return flags_ & FLAG_HEARTBEAT; }

private:
uint64_t stream_id_{};
uint32_t flags_{};
uint32_t frame_tags_{};
// Default to true for backward compatibility.
bool end_stream_{true};
};

/**
Expand Down
50 changes: 41 additions & 9 deletions contrib/generic_proxy/filters/network/source/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,27 @@ bool ActiveStream::sendFrameToDownstream(const StreamFrame& frame, bool header_f
return true;
}

// TODO(wbpcode): add the short_response_flags support to the sendLocalReply
// method.
void ActiveStream::sendLocalReply(Status status, absl::string_view data,
ResponseUpdateFunction func) {
// Clear possible response frames.
// To ensure only one response header frame is sent for local reply.
// If there is a previsous response header but has not been sent to the downstream,
// we should send the latest local reply to the downstream directly without any
// filter processing.
// If the previous response header has been sent to the downstream, it is an invalid
// situation that we cannot handle. Then, we should reset the stream and return.
// In other cases, we should continue the filter chain to process the local reply.

const bool has_prev_response = response_header_frame_ != nullptr;
const bool has_sent_response =
has_prev_response && encoder_filter_iter_header_ == encoder_filters_.end();

if (has_sent_response) {
ENVOY_LOG(error, "Generic proxy: invalid local reply: response is already sent.");
resetStream(DownstreamStreamResetReason::ProtocolError);
return;
}

// Clear possible response frames anyway now.
response_header_frame_ = nullptr;
response_common_frames_.clear();

Expand All @@ -167,14 +183,20 @@ void ActiveStream::sendLocalReply(Status status, absl::string_view data,
}

response_header_frame_ = std::move(response);
parent_.stream_drain_decision_ |= response_header_frame_->frameFlags().drainClose();
local_reply_ = true;
// status message will be used as response code details.
stream_info_.setResponseCodeDetails(status.message());
// Set the response code to the stream info.
stream_info_.setResponseCode(response_header_frame_->status().code());

// Send the response header frame to downstream.
sendFrameToDownstream(*response_header_frame_, true);
if (has_prev_response) {
// There was a previous response. Send the new response to the downstream directly.
sendFrameToDownstream(*response_header_frame_, true);
return;
} else {
continueEncoding();
}
}

void ActiveStream::processRequestHeaderFrame() {
Expand Down Expand Up @@ -277,7 +299,7 @@ void ActiveStream::processResponseHeaderFrame() {
response_header_frame_->frameFlags().endStream());

// Send the header frame to downstream.
if (!sendFrameToDownstream(*response_header_frame_, false)) {
if (!sendFrameToDownstream(*response_header_frame_, true)) {
stop_encoder_filter_chain_ = true;
}
};
Expand Down Expand Up @@ -465,15 +487,25 @@ void ActiveStream::onRequestCommonFrame(RequestCommonFramePtr request_common_fra
}

void ActiveStream::onResponseHeaderFrame(ResponsePtr response) {
ASSERT(response_header_frame_ == nullptr);
// To ensure only one response header frame is handled for upstream response.
// If there is a previous response header frame, no matter it is come from
// the upstream or the local reply, it is an invalid situation that we cannot
// handle. Then, we should reset the stream and return.
const bool has_prev_response = response_header_frame_ != nullptr;
if (has_prev_response) {
ENVOY_LOG(error, "Generic proxy: invalid response: has previous response.");
resetStream(DownstreamStreamResetReason::ProtocolError);
return;
}

response_header_frame_ = std::move(response);
ASSERT(response_header_frame_ != nullptr);
parent_.stream_drain_decision_ = response_header_frame_->frameFlags().streamFlags().drainClose();
parent_.stream_drain_decision_ |= response_header_frame_->frameFlags().drainClose();

// The response code details always be "via_upstream" for response from upstream.
stream_info_.setResponseCodeDetails("via_upstream");
// Set the response code to the stream info.
stream_info_.setResponseCode(response_header_frame_->status().code());

continueEncoding();
}

Expand Down
Loading

0 comments on commit f495819

Please sign in to comment.