Skip to content

Commit

Permalink
fix : break change for grpc metadata transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
hxms committed Jan 12, 2024
1 parent 797bca6 commit b7eb3d4
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 11 deletions.
41 changes: 32 additions & 9 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ package grpc
import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"net"
"reflect"
"strings"
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"c-z.dev/go-micro/broker"
"c-z.dev/go-micro/client"
"c-z.dev/go-micro/client/selector"
Expand Down Expand Up @@ -111,13 +115,21 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
// set the content type for the request
header["x-content-type"] = req.ContentType()

// fix : grpc error "stream terminated by RST_STREAM with error code: PROTOCOL_ERROR"
delete(header, "connection")

fmt.Printf("grpcClient.call: header: %v\n", header)
header = map[string]string{}
headersInterface := make(map[string]interface{})
for k, v := range header {
headersInterface[k] = v
}
headerProto, err := structpb.NewStruct(headersInterface)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
headerData, err := proto.Marshal(headerProto)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
embeddedHeader := base64.StdEncoding.EncodeToString(headerData)

md := gmetadata.New(header)
md := gmetadata.New(map[string]string{"X-Micro-Metadata": embeddedHeader})
ctx = gmetadata.NewOutgoingContext(ctx, md)

cf, err := g.newGRPCCodec(req.ContentType())
Expand Down Expand Up @@ -193,10 +205,21 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
// set the content type for the request
header["x-content-type"] = req.ContentType()

// fix : grpc error "stream terminated by RST_STREAM with error code: PROTOCOL_ERROR"
delete(header, "connection")
headersInterface := make(map[string]interface{})
for k, v := range header {
headersInterface[k] = v
}
headerProto, err := structpb.NewStruct(headersInterface)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
headerData, err := proto.Marshal(headerProto)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
embeddedHeader := base64.StdEncoding.EncodeToString(headerData)

md := gmetadata.New(header)
md := gmetadata.New(map[string]string{"X-Micro-Metadata": embeddedHeader})
ctx = gmetadata.NewOutgoingContext(ctx, md)

cf, err := g.newGRPCCodec(req.ContentType())
Expand Down
24 changes: 22 additions & 2 deletions server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package grpc
import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"net"
"reflect"
Expand All @@ -14,6 +15,9 @@ import (
"sync"
"time"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"c-z.dev/go-micro/broker"
"c-z.dev/go-micro/errors"
"c-z.dev/go-micro/logger"
Expand Down Expand Up @@ -208,10 +212,26 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
gmd = metadata.MD{}
}

// extract embedded metadata
embeddedHeader := ""
if embeddedHeaders := gmd.Get("X-Micro-Metadata"); len(embeddedHeaders) > 0 {
embeddedHeader = embeddedHeaders[0]
}

// copy the metadata to go-micro.metadata
md := meta.Metadata{}
for k, v := range gmd {
md[k] = strings.Join(v, ", ")
if embeddedHeader != "" {
headerData, err := base64.StdEncoding.DecodeString(embeddedHeader)
if err != nil {
return status.New(codes.InvalidArgument, err.Error()).Err()
}
var s structpb.Struct
if err = proto.Unmarshal(headerData, &s); err != nil {
return status.New(codes.InvalidArgument, err.Error()).Err()
}
for k, v := range s.Fields {
md[k] = v.GetStringValue()
}
}

// timeout for server deadline
Expand Down

0 comments on commit b7eb3d4

Please sign in to comment.