Skip to content

Commit

Permalink
mod for dubbo response header mutation (#113)
Browse files Browse the repository at this point in the history
Signed-off-by: root <lihuang@alauda.io>
  • Loading branch information
huanghuangzym committed Jul 26, 2023
1 parent d2258d3 commit 57327a0
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 13 deletions.
72 changes: 72 additions & 0 deletions src/application_protocols/dubbo/dubbo_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void DubboCodec::encode(const MetaProtocolProxy::Metadata& metadata,
break;
}
case MetaProtocolProxy::MessageType::Response: {
encodeResponse(metadata, mutation, buffer);
break;
}
case MetaProtocolProxy::MessageType::Error: {
Expand All @@ -77,6 +78,55 @@ void DubboCodec::encode(const MetaProtocolProxy::Metadata& metadata,
}
}

void DubboCodec::encodeResponse(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Mutation& mutation,
Buffer::Instance& buffer) {

MessageMetadata msgMetadata;
toMsgMetadata(metadata, msgMetadata);
ENVOY_LOG(
debug,
"dubbo: msgdata is hasRpcResultInfo {}, hasResponseStatus {}, headersize {}, bodysize {}",
msgMetadata.hasRpcResultInfo(), msgMetadata.hasResponseStatus(), metadata.getHeaderSize(),
metadata.getBodySize());

bool has_mutation = false;
if (msgMetadata.hasRpcResultInfo()) {
auto* result = const_cast<RpcResultImpl*>(
dynamic_cast<const RpcResultImpl*>(&msgMetadata.rpcResultInfo()));
ENVOY_LOG(debug, "dubbo: codec result hasException {},result body {}", result->hasException(),
result->getRspBody());
if (result->attachment_ != nullptr) {
ENVOY_LOG(debug, "dubbo: codec result attachment_ not null offset {}",
result->attachment_->attachmentOffset(), result->getRspBody());
}

for (const auto& keyValue : mutation) {
ENVOY_LOG(debug, "dubbo: encodeResponse codec mutation {} : {}", keyValue.first,
keyValue.second);
if (msgMetadata.hasRpcResultInfo()) {
result->attachment_->remove(keyValue.first);
result->attachment_->insert(keyValue.first, keyValue.second);
}
has_mutation = true;
}

ENVOY_LOG(debug, "dubbo: encodeResponse codec attachment is {}",
result->attachment_->attachment().toDebugString());
}

if (has_mutation) {
// upstream server has mutation header: x-envoy-peer-metadata-id x-envoy-peer-metadata
// add the two headers add response
ContextImpl ctx;
ctx.setHeaderSize(metadata.getHeaderSize());
ctx.setBodySize(metadata.getBodySize());
if (!protocol_->encode(buffer, msgMetadata, ctx, "addheader")) {
throw EnvoyException("failed to encode request message");
}
}
}

void DubboCodec::onError(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Error& error, Buffer::Instance& buffer) {
ASSERT(buffer.length() == 0);
Expand Down Expand Up @@ -136,6 +186,19 @@ void DubboCodec::toMetadata(const MessageMetadata& msgMetadata,
if (msgMetadata.hasResponseStatus()) {
metadata.put("ResponseStatus", msgMetadata.responseStatus());
}
if (msgMetadata.hasRpcResultInfo()) {
auto* invo = const_cast<RpcResultImpl*>(
dynamic_cast<const RpcResultImpl*>(&msgMetadata.rpcResultInfo()));
for (const auto& pair : invo->attachment_->attachment()) {
const auto key = pair.first->toString();
const auto value = pair.second->toString();
if (!key.has_value() || !value.has_value()) {
continue;
}
metadata.putString(key.value(), value.value());
}
metadata.put("RpcResultInfo", msgMetadata.rpcResultInfoPtr());
}

switch (msgMetadata.messageType()) {
case MessageType::Request:
Expand Down Expand Up @@ -184,6 +247,12 @@ void DubboCodec::toMsgMetadata(const MetaProtocolProxy::Metadata& metadata,
msgMetadata.setInvocationInfo(std::any_cast<RpcInvocationSharedPtr>(invo));
}

ref = metadata.get("RpcResultInfo");
if (ref.has_value()) {
const auto& result = ref.value();
msgMetadata.setRpcResultInfo(std::any_cast<RpcResultSharedPtr>(result));
}

ref = metadata.get("ProtocolType");
assert(ref.has_value());
const auto& proto_type = ref.value();
Expand Down Expand Up @@ -244,6 +313,9 @@ void DubboCodec::encodeRequest(const MetaProtocolProxy::Metadata& metadata,
invo->attachment().remove(keyValue.first);
invo->attachment().insert(keyValue.first, keyValue.second);
}

ENVOY_LOG(debug, "dubbo: codec attachment is {}",
invo->attachment().attachment().toDebugString());
}
ContextImpl ctx;
ctx.setHeaderSize(metadata.getHeaderSize());
Expand Down
2 changes: 2 additions & 0 deletions src/application_protocols/dubbo/dubbo_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class DubboCodec : public MetaProtocolProxy::Codec, public Logger::Loggable<Logg
void encodeHeartbeat(const MetaProtocolProxy::Metadata& metadata, Buffer::Instance& buffer);
void encodeRequest(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Mutation& mutation, Buffer::Instance& buffer);
void encodeResponse(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Mutation& mutation, Buffer::Instance& buffer);

ProtocolPtr protocol_;
DecoderStateMachinePtr state_machine_;
Expand Down
38 changes: 38 additions & 0 deletions src/application_protocols/dubbo/dubbo_hessian2_serializer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,62 @@ DubboHessian2SerializerImpl::deserializeRpcResult(Buffer::Instance& buffer,

RpcResponseType type = static_cast<RpcResponseType>(*type_value);

bool has_attachment = false;
bool has_value = false;
switch (type) {
case RpcResponseType::ResponseWithException:
has_value_or_attachment = false;
result->setException(true);
break;
case RpcResponseType::ResponseWithExceptionWithAttachments:
has_value_or_attachment = false;
result->setException(true);
has_attachment = true;
break;
case RpcResponseType::ResponseWithNullValue:
has_value_or_attachment = false;
has_value = false;
has_attachment = false;
FALLTHRU;
case RpcResponseType::ResponseNullValueWithAttachments:
has_value = false;
has_attachment = true;
result->setException(false);
break;
case RpcResponseType::ResponseWithValue:
has_value = true;
has_attachment = false;
result->setException(false);
break;
case RpcResponseType::ResponseValueWithAttachments:
has_value = true;
has_attachment = true;
result->setException(false);
break;
default:
throw EnvoyException(fmt::format("not supported return type {}", static_cast<uint8_t>(type)));
}

size_t total_size = decoder.offset();
// decode response body
if (has_value) {
auto responesebody = decoder.decode<std::string>();
result->setRspBody(*responesebody);
}

if (has_attachment) {
size_t offset = context->headerSize() + decoder.offset();
auto attachResult = decoder.decode<Hessian2::Object>();
if (attachResult != nullptr && attachResult->type() == Hessian2::Object::Type::UntypedMap) {
result->attachment_ = std::make_unique<RpcInvocationImpl::Attachment>(
RpcInvocationImpl::Attachment::MapPtr{
dynamic_cast<RpcInvocationImpl::Attachment::Map*>(attachResult.release())},
offset);
} else {
result->attachment_ = std::make_unique<RpcInvocationImpl::Attachment>(
std::make_unique<RpcInvocationImpl::Attachment::Map>(), offset);
}
}

if (context->bodySize() < total_size) {
throw EnvoyException(fmt::format("RpcResult size({}) large than body size({})", total_size,
Expand Down
57 changes: 44 additions & 13 deletions src/application_protocols/dubbo/dubbo_protocol_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ bool DubboProtocolImpl::decodeData(Buffer::Instance& buffer, ContextSharedPtr co
if (!ret.second) {
return false;
}
metadata->setRpcResultInfo(ret.first);
if (ret.first->hasException()) {
metadata->setMessageType(MessageType::Exception);
}
Expand All @@ -175,9 +176,46 @@ bool DubboProtocolImpl::decodeData(Buffer::Instance& buffer, ContextSharedPtr co
return true;
}

void DubboProtocolImpl::rspheaderMutation(Buffer::Instance& buffer, const MessageMetadata& metadata,
const Context& ctx) {
if (metadata.hasRpcResultInfo()) {
auto* result =
const_cast<RpcResultImpl*>(dynamic_cast<const RpcResultImpl*>(&metadata.rpcResultInfo()));
if (result->attachment_ != nullptr) {
Buffer::OwnedImpl origin_buffer;
origin_buffer.move(buffer, buffer.length());

constexpr size_t body_length_size = sizeof(uint32_t);

const size_t attachment_offset = result->attachment_->attachmentOffset();
const size_t request_header_size = ctx.headerSize();
ASSERT(attachment_offset <= origin_buffer.length());

// Move the other parts of the request headers except the body size to the upstream request
// buffer.
buffer.move(origin_buffer, request_header_size - body_length_size);
// Discard the old body size.
origin_buffer.drain(body_length_size);

// Re-serialize the updated attachment.
Buffer::OwnedImpl attachment_buffer;
Hessian2::Encoder encoder(std::make_unique<BufferWriter>(attachment_buffer));
encoder.encode(result->attachment_->attachment());

size_t new_body_size = attachment_offset - request_header_size + attachment_buffer.length();

buffer.writeBEInt<uint32_t>(new_body_size);
buffer.move(origin_buffer, attachment_offset - request_header_size);
buffer.move(attachment_buffer);

origin_buffer.drain(origin_buffer.length());
}
}
}

bool DubboProtocolImpl::encode(Buffer::Instance& buffer, const MessageMetadata& metadata,
const Context& ctx, const std::string& content,
RpcResponseType type) {
RpcResponseType /*type*/) {
ASSERT(serializer_);

switch (metadata.messageType()) {
Expand All @@ -200,18 +238,11 @@ bool DubboProtocolImpl::encode(Buffer::Instance& buffer, const MessageMetadata&
}
case MessageType::Response: {
ASSERT(metadata.hasResponseStatus());
ASSERT(!content.empty());
buffer.drain(buffer.length());
Buffer::OwnedImpl body_buffer;
size_t serialized_body_size = serializer_->serializeRpcResult(body_buffer, content, type);

buffer.writeBEInt<uint16_t>(MagicNumber);
buffer.writeByte(static_cast<uint8_t>(metadata.serializationType()));
buffer.writeByte(static_cast<uint8_t>(metadata.responseStatus()));
buffer.writeBEInt<uint64_t>(metadata.requestId());
buffer.writeBEInt<uint32_t>(serialized_body_size);

buffer.move(body_buffer, serialized_body_size);
if (content == "addheader") {
// only when server sidecar response,we need add header
// client sidecar response not need
rspheaderMutation(buffer, metadata, ctx);
}
return true;
}
case MessageType::Request: {
Expand Down
2 changes: 2 additions & 0 deletions src/application_protocols/dubbo/dubbo_protocol_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class DubboProtocolImpl : public Protocol {
private:
void headerMutation(Buffer::Instance& buffer, const MessageMetadata& metadata,
const Context& context);
void rspheaderMutation(Buffer::Instance& buffer, const MessageMetadata& metadata,
const Context& context);
};

} // namespace Dubbo
Expand Down
6 changes: 6 additions & 0 deletions src/application_protocols/dubbo/message_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,14 @@ class RpcResultImpl : public RpcResult {
bool hasException() const override { return has_exception_; }
void setException(bool has_exception) { has_exception_ = has_exception; }

std::string getRspBody() { return bodyrsp; }
void setRspBody(std::string body) { bodyrsp = body; }

mutable std::unique_ptr<RpcInvocationImpl::Attachment> attachment_{};

private:
bool has_exception_{false};
std::string bodyrsp;
};

} // namespace Dubbo
Expand Down
7 changes: 7 additions & 0 deletions src/application_protocols/dubbo/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class MessageMetadata {
const RpcInvocation& invocationInfo() const { return *invocation_info_; }
const RpcInvocationSharedPtr invocationInfoPtr() const { return invocation_info_; }

void setRpcResultInfo(RpcResultSharedPtr result_info) { result_info_ = result_info; }
bool hasRpcResultInfo() const { return result_info_ != nullptr; }
const RpcResult& rpcResultInfo() const { return *result_info_; }
const RpcResultSharedPtr rpcResultInfoPtr() const { return result_info_; }

void setProtocolType(ProtocolType type) { proto_type_ = type; }
ProtocolType protocolType() const { return proto_type_; }

Expand Down Expand Up @@ -74,6 +79,8 @@ class MessageMetadata {

RpcInvocationSharedPtr invocation_info_;

RpcResultSharedPtr result_info_;

uint8_t serialization_type_{static_cast<uint8_t>(SerializationType::Hessian2)};
uint8_t protocol_version_{1};
int64_t request_id_ = 0;
Expand Down

0 comments on commit 57327a0

Please sign in to comment.