Skip to content

Commit

Permalink
fix for dubbo source metric destination label unknown
Browse files Browse the repository at this point in the history
  • Loading branch information
huanghuangzym committed Jun 12, 2023
1 parent bb29799 commit f0ef0df
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 13 deletions.
74 changes: 74 additions & 0 deletions src/application_protocols/dubbo/dubbo_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void DubboCodec::encode(const MetaProtocolProxy::Metadata& metadata,
break;
}
case MetaProtocolProxy::MessageType::Response: {
encodeResponse(metadata, mutation, buffer);
break;
}
case MetaProtocolProxy::MessageType::Error: {
Expand All @@ -77,6 +78,57 @@ void DubboCodec::encode(const MetaProtocolProxy::Metadata& metadata,
}
}

void DubboCodec::encodeResponse(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Mutation& mutation,
Buffer::Instance& buffer) {

MessageMetadata msgMetadata;
toMsgMetadata(metadata, msgMetadata);
ENVOY_LOG(
debug,
"dubbo: msgdata is hasRpcResultInfo {}, hasResponseStatus {}, headersize {}, bodysize {}",
msgMetadata.hasRpcResultInfo(), msgMetadata.hasResponseStatus(), metadata.getHeaderSize(),
metadata.getBodySize());

bool has_mutation = false;
if (msgMetadata.hasRpcResultInfo()) {
auto* result = const_cast<RpcResultImpl*>(
dynamic_cast<const RpcResultImpl*>(&msgMetadata.rpcResultInfo()));
ENVOY_LOG(debug, "dubbo: codec result hasException {},result body {}", result->hasException(),
result->getRspBody());
if (result->attachment_ != nullptr) {
ENVOY_LOG(debug, "dubbo: codec result attachment_ not null offset {}",
result->attachment_->attachmentOffset(), result->getRspBody());
}

for (const auto& keyValue : mutation) {
ENVOY_LOG(debug, "dubbo: encodeResponse codec mutation {} : {}", keyValue.first,
keyValue.second);
if (msgMetadata.hasRpcResultInfo()) {
result->attachment_->remove(keyValue.first);
result->attachment_->insert(keyValue.first, keyValue.second);
}
has_mutation = true;
}

ENVOY_LOG(debug, "dubbo: encodeResponse codec attachment is {}",
result->attachment_->attachment().toDebugString());
}

if (has_mutation) {
//服务端的response包含了mutation header
// x-envoy-peer-metadata-id
// x-envoy-peer-metadata
//那么这里,吧这两个header 加到response header里
ContextImpl ctx;
ctx.setHeaderSize(metadata.getHeaderSize());
ctx.setBodySize(metadata.getBodySize());
if (!protocol_->encode(buffer, msgMetadata, ctx, "addheader")) {
throw EnvoyException("failed to encode request message");
}
}
}

void DubboCodec::onError(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Error& error, Buffer::Instance& buffer) {
ASSERT(buffer.length() == 0);
Expand Down Expand Up @@ -136,6 +188,19 @@ void DubboCodec::toMetadata(const MessageMetadata& msgMetadata,
if (msgMetadata.hasResponseStatus()) {
metadata.put("ResponseStatus", msgMetadata.responseStatus());
}
if (msgMetadata.hasRpcResultInfo()) {
auto* invo = const_cast<RpcResultImpl*>(
dynamic_cast<const RpcResultImpl*>(&msgMetadata.rpcResultInfo()));
for (const auto& pair : invo->attachment_->attachment()) {
const auto key = pair.first->toString();
const auto value = pair.second->toString();
if (!key.has_value() || !value.has_value()) {
continue;
}
metadata.putString(key.value(), value.value());
}
metadata.put("RpcResultInfo", msgMetadata.rpcResultInfoPtr());
}

switch (msgMetadata.messageType()) {
case MessageType::Request:
Expand Down Expand Up @@ -184,6 +249,12 @@ void DubboCodec::toMsgMetadata(const MetaProtocolProxy::Metadata& metadata,
msgMetadata.setInvocationInfo(std::any_cast<RpcInvocationSharedPtr>(invo));
}

ref = metadata.get("RpcResultInfo");
if (ref.has_value()) {
const auto& result = ref.value();
msgMetadata.setRpcResultInfo(std::any_cast<RpcResultSharedPtr>(result));
}

ref = metadata.get("ProtocolType");
assert(ref.has_value());
const auto& proto_type = ref.value();
Expand Down Expand Up @@ -244,6 +315,9 @@ void DubboCodec::encodeRequest(const MetaProtocolProxy::Metadata& metadata,
invo->attachment().remove(keyValue.first);
invo->attachment().insert(keyValue.first, keyValue.second);
}

ENVOY_LOG(debug, "dubbo: codec attachment is {}",
invo->attachment().attachment().toDebugString());
}
ContextImpl ctx;
ctx.setHeaderSize(metadata.getHeaderSize());
Expand Down
2 changes: 2 additions & 0 deletions src/application_protocols/dubbo/dubbo_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class DubboCodec : public MetaProtocolProxy::Codec, public Logger::Loggable<Logg
void encodeHeartbeat(const MetaProtocolProxy::Metadata& metadata, Buffer::Instance& buffer);
void encodeRequest(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Mutation& mutation, Buffer::Instance& buffer);
void encodeResponse(const MetaProtocolProxy::Metadata& metadata,
const MetaProtocolProxy::Mutation& mutation, Buffer::Instance& buffer);

ProtocolPtr protocol_;
DecoderStateMachinePtr state_machine_;
Expand Down
45 changes: 43 additions & 2 deletions src/application_protocols/dubbo/dubbo_hessian2_serializer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,66 @@ DubboHessian2SerializerImpl::deserializeRpcResult(Buffer::Instance& buffer,

RpcResponseType type = static_cast<RpcResponseType>(*type_value);

bool has_attachment = false;
bool has_value = false;
switch (type) {
case RpcResponseType::ResponseWithException:
has_value_or_attachment = false;
result->setException(true);
break;
case RpcResponseType::ResponseWithExceptionWithAttachments:
has_value_or_attachment = false;
result->setException(true);
has_attachment = true;
break;
case RpcResponseType::ResponseWithNullValue:
has_value_or_attachment = false;
has_value = false;
has_attachment = false;
FALLTHRU;
case RpcResponseType::ResponseNullValueWithAttachments:
has_value = false;
has_attachment = true;
result->setException(false);
break;
case RpcResponseType::ResponseWithValue:
has_value = true;
has_attachment = false;
result->setException(false);
break;
case RpcResponseType::ResponseValueWithAttachments:
has_value = true;
has_attachment = true;
result->setException(false);
break;
default:
throw EnvoyException(fmt::format("not supported return type {}", static_cast<uint8_t>(type)));
}

size_t total_size = decoder.offset();
// size_t total_size = decoder.offset();
//解析出返回体
if (has_value) {
auto responesebody = decoder.decode<std::string>();
result->setRspBody(*responesebody);
}

if (has_attachment) {
size_t offset = context->headerSize() + decoder.offset();
auto attachResult = decoder.decode<Hessian2::Object>();
if (attachResult != nullptr && attachResult->type() == Hessian2::Object::Type::UntypedMap) {
result->attachment_ = std::make_unique<RpcInvocationImpl::Attachment>(
RpcInvocationImpl::Attachment::MapPtr{
dynamic_cast<RpcInvocationImpl::Attachment::Map*>(attachResult.release())},
offset);
} else {
result->attachment_ = std::make_unique<RpcInvocationImpl::Attachment>(
std::make_unique<RpcInvocationImpl::Attachment::Map>(), offset);
}
}

//解析出响应的attachment

/*
if (context->bodySize() < total_size) {
throw EnvoyException(fmt::format("RpcResult size({}) large than body size({})", total_size,
context->bodySize()));
Expand All @@ -130,7 +171,7 @@ DubboHessian2SerializerImpl::deserializeRpcResult(Buffer::Instance& buffer,
throw EnvoyException(
fmt::format("RpcResult is no value, but the rest of the body size({}) not equal 0",
(context->bodySize() - total_size)));
}
}*/

return std::pair<RpcResultSharedPtr, bool>(result, true);
}
Expand Down
48 changes: 46 additions & 2 deletions src/application_protocols/dubbo/dubbo_protocol_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ bool DubboProtocolImpl::decodeData(Buffer::Instance& buffer, ContextSharedPtr co
if (!ret.second) {
return false;
}
metadata->setRpcResultInfo(ret.first);
if (ret.first->hasException()) {
metadata->setMessageType(MessageType::Exception);
}
Expand All @@ -175,9 +176,46 @@ bool DubboProtocolImpl::decodeData(Buffer::Instance& buffer, ContextSharedPtr co
return true;
}

void DubboProtocolImpl::rspheaderMutation(Buffer::Instance& buffer, const MessageMetadata& metadata,
const Context& ctx) {
if (metadata.hasRpcResultInfo()) {
auto* result =
const_cast<RpcResultImpl*>(dynamic_cast<const RpcResultImpl*>(&metadata.rpcResultInfo()));
if (result->attachment_ != nullptr) {
Buffer::OwnedImpl origin_buffer;
origin_buffer.move(buffer, buffer.length());

constexpr size_t body_length_size = sizeof(uint32_t);

const size_t attachment_offset = result->attachment_->attachmentOffset();
const size_t request_header_size = ctx.headerSize();
ASSERT(attachment_offset <= origin_buffer.length());

// Move the other parts of the request headers except the body size to the upstream request
// buffer.
buffer.move(origin_buffer, request_header_size - body_length_size);
// Discard the old body size.
origin_buffer.drain(body_length_size);

// Re-serialize the updated attachment.
Buffer::OwnedImpl attachment_buffer;
Hessian2::Encoder encoder(std::make_unique<BufferWriter>(attachment_buffer));
encoder.encode(result->attachment_->attachment());

size_t new_body_size = attachment_offset - request_header_size + attachment_buffer.length();

buffer.writeBEInt<uint32_t>(new_body_size);
buffer.move(origin_buffer, attachment_offset - request_header_size);
buffer.move(attachment_buffer);

origin_buffer.drain(origin_buffer.length());
}
}
}

bool DubboProtocolImpl::encode(Buffer::Instance& buffer, const MessageMetadata& metadata,
const Context& ctx, const std::string& content,
RpcResponseType type) {
RpcResponseType /*type*/) {
ASSERT(serializer_);

switch (metadata.messageType()) {
Expand All @@ -200,7 +238,12 @@ bool DubboProtocolImpl::encode(Buffer::Instance& buffer, const MessageMetadata&
}
case MessageType::Response: {
ASSERT(metadata.hasResponseStatus());
ASSERT(!content.empty());
if (content == "addheader") {
//只要server端的sidecar在响应的时候,需要吧header加上
rspheaderMutation(buffer, metadata, ctx);
}
// ASSERT(!content.empty());
/*
buffer.drain(buffer.length());
Buffer::OwnedImpl body_buffer;
size_t serialized_body_size = serializer_->serializeRpcResult(body_buffer, content, type);
Expand All @@ -212,6 +255,7 @@ bool DubboProtocolImpl::encode(Buffer::Instance& buffer, const MessageMetadata&
buffer.writeBEInt<uint32_t>(serialized_body_size);
buffer.move(body_buffer, serialized_body_size);
*/
return true;
}
case MessageType::Request: {
Expand Down
2 changes: 2 additions & 0 deletions src/application_protocols/dubbo/dubbo_protocol_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class DubboProtocolImpl : public Protocol {
private:
void headerMutation(Buffer::Instance& buffer, const MessageMetadata& metadata,
const Context& context);
void rspheaderMutation(Buffer::Instance& buffer, const MessageMetadata& metadata,
const Context& context);
};

} // namespace Dubbo
Expand Down
6 changes: 6 additions & 0 deletions src/application_protocols/dubbo/message_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,14 @@ class RpcResultImpl : public RpcResult {
bool hasException() const override { return has_exception_; }
void setException(bool has_exception) { has_exception_ = has_exception; }

std::string getRspBody() { return bodyrsp; }
void setRspBody(std::string body) { bodyrsp = body; }

mutable std::unique_ptr<RpcInvocationImpl::Attachment> attachment_{};

private:
bool has_exception_{false};
std::string bodyrsp;
};

} // namespace Dubbo
Expand Down
7 changes: 7 additions & 0 deletions src/application_protocols/dubbo/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class MessageMetadata {
const RpcInvocation& invocationInfo() const { return *invocation_info_; }
const RpcInvocationSharedPtr invocationInfoPtr() const { return invocation_info_; }

void setRpcResultInfo(RpcResultSharedPtr result_info) { result_info_ = result_info; }
bool hasRpcResultInfo() const { return result_info_ != nullptr; }
const RpcResult& rpcResultInfo() const { return *result_info_; }
const RpcResultSharedPtr rpcResultInfoPtr() const { return result_info_; }

void setProtocolType(ProtocolType type) { proto_type_ = type; }
ProtocolType protocolType() const { return proto_type_; }

Expand Down Expand Up @@ -74,6 +79,8 @@ class MessageMetadata {

RpcInvocationSharedPtr invocation_info_;

RpcResultSharedPtr result_info_;

uint8_t serialization_type_{static_cast<uint8_t>(SerializationType::Hessian2)};
uint8_t protocol_version_{1};
int64_t request_id_ = 0;
Expand Down
7 changes: 4 additions & 3 deletions src/meta_protocol_proxy/filters/istio_stats/istio_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ IstioStats::IstioStats(Server::Configuration::FactoryContext& context,
response_flags_(pool_.add("response_flags")),
connection_security_policy_(pool_.add("connection_security_policy")),
response_code_(pool_.add("response_code")) {
traffic_direction_ = traffic_direction;
straffic_direction_ = traffic_direction;
local_node_info_ = Wasm::Common::extractEmptyNodeFlatBuffer();
if (context.localInfo().node().has_metadata()) {
local_node_info_ =
Expand All @@ -62,12 +62,13 @@ static inline absl::string_view GetFromFbStringView(const flatbuffers::String* s
}

void IstioStats::report(const Wasm::Common::FlatNode& peer_node, MetadataSharedPtr metadata,
const std::string& destination_service) {
const std::string& destination_service,
envoy::config::core::v3::TrafficDirection direction) {
Stats::StatNameTagVector tags;
tags.reserve(25);
const auto& local_node = *flatbuffers::GetRoot<Wasm::Common::FlatNode>(local_node_info_.data());

if (traffic_direction_ == envoy::config::core::v3::TrafficDirection::INBOUND) {
if (direction == envoy::config::core::v3::TrafficDirection::INBOUND) {
tags.push_back({reporter_, destination_});
populateSourceNodeTags(peer_node, tags);
populateDestinationNodeTags(local_node, tags);
Expand Down
5 changes: 3 additions & 2 deletions src/meta_protocol_proxy/filters/istio_stats/istio_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ class IstioStats {
envoy::config::core::v3::TrafficDirection traffic_direction);

void report(const ::Wasm::Common::FlatNode& node, MetadataSharedPtr metadata,
const std::string& destination_service);
const std::string& destination_service,
envoy::config::core::v3::TrafficDirection direction);

private:
void populateSourceNodeTags(const Wasm::Common::FlatNode& node, Stats::StatNameTagVector& tags);
void populateDestinationNodeTags(const Wasm::Common::FlatNode& node,
Stats::StatNameTagVector& tags);
// traffic direction, inbound or outbound
envoy::config::core::v3::TrafficDirection traffic_direction_;
envoy::config::core::v3::TrafficDirection straffic_direction_;
flatbuffers::DetachedBuffer local_node_info_;
Stats::Scope& scope_;
Stats::StatNameDynamicPool pool_;
Expand Down

0 comments on commit f0ef0df

Please sign in to comment.