Skip to content

Commit

Permalink
fix : grpc server content type choose
Browse files Browse the repository at this point in the history
  • Loading branch information
hxms committed Jan 21, 2024
1 parent 23bf2d9 commit 79cd53a
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 34 deletions.
10 changes: 1 addition & 9 deletions api/handler/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rpc

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -103,16 +102,11 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

ct := r.Header.Get("Content-Type")

fmt.Println("Content-Type: ", ct)

// Strip charset from Content-Type (like `application/json; charset=UTF-8`)
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}

fmt.Println("Content-Type: ", ct)
fmt.Println(ct, hasCodec(ct, protoCodecs), hasCodec(ct, jsonCodecs))

// micro client
c := h.opts.Client

Expand Down Expand Up @@ -201,7 +195,7 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var request json.RawMessage
// if the extracted payload isn't empty lets use it
if len(br) > 0 {
request = json.RawMessage(br)
request = br
}

// create request/response
Expand All @@ -214,8 +208,6 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
client.WithContentType(ct),
)

fmt.Println("default request", ct, req.ContentType())

// make the call
if err := c.Call(cx, req, &response, client.WithSelectOption(so)); err != nil {
writeError(w, r, err)
Expand Down
42 changes: 26 additions & 16 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele
return next, nil
}

func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
func (g *grpcClient) call(
ctx context.Context, node *registry.Node, req client.Request, rsp interface{},
opts client.CallOptions,
) error {
address := node.Address

header := make(map[string]string)
Expand All @@ -114,12 +117,11 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
// set the content type for the request
header["x-content-type"] = req.ContentType()

headersInterface := make(map[string]interface{})
headerInterface := make(map[string]interface{})
for k, v := range header {
headersInterface[k] = v
headerInterface[k] = v
}
headerProto, err := structpb.NewStruct(headersInterface)
headerProto, err := structpb.NewStruct(headerInterface)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
Expand Down Expand Up @@ -188,7 +190,10 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
return grr
}

func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
func (g *grpcClient) stream(
ctx context.Context, node *registry.Node, req client.Request, rsp interface{},
opts client.CallOptions,
) error {
address := node.Address
header := make(map[string]string)
if md, ok := metadata.FromContext(ctx); ok {
Expand All @@ -204,12 +209,11 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
}
// set the content type for the request
header["x-content-type"] = req.ContentType()

headersInterface := make(map[string]interface{})
headerInterface := make(map[string]interface{})
for k, v := range header {
headersInterface[k] = v
headerInterface[k] = v
}
headerProto, err := structpb.NewStruct(headersInterface)
headerProto, err := structpb.NewStruct(headerInterface)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
Expand Down Expand Up @@ -398,7 +402,9 @@ func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.Me
return newGRPCEvent(topic, msg, g.opts.ContentType, opts...)
}

func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
func (g *grpcClient) NewRequest(
service, method string, req interface{}, reqOpts ...client.RequestOption,
) client.Request {
return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
}

Expand Down Expand Up @@ -561,7 +567,9 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
return nil, errors.InternalServerError(
"go.micro.client", "error selecting %s node: %s", service, err.Error(),
)
}

// make the call
Expand Down Expand Up @@ -657,10 +665,12 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
topic = options.Exchange
}

return g.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: body,
}, broker.PublishContext(options.Context))
return g.opts.Broker.Publish(
topic, &broker.Message{
Header: md,
Body: body,
}, broker.PublishContext(options.Context),
)
}

func (g *grpcClient) String() string {
Expand Down
27 changes: 18 additions & 9 deletions server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,10 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {

// get content type
ct := defaultContentType

if ctype, ok := md["x-content-type"]; ok {
if ctype, ok := md["content-type"]; ok {
ct = ctype
}
if ctype, ok := md["content-type"]; ok {
if ctype, ok := md["x-content-type"]; ok {
ct = ctype
}

Expand Down Expand Up @@ -342,7 +341,9 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
return g.processStream(stream, service, mtype, ct, ctx)
}

func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context) error {
func (g *grpcServer) processRequest(
stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context,
) error {
for {
var argv, replyv reflect.Value

Expand Down Expand Up @@ -399,7 +400,11 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
err = errors.InternalServerError("go.micro.server", "panic recovered: %v", r)
}
}()
returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
returnValues = function.Call(
[]reflect.Value{
service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp),
},
)

// The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil {
Expand Down Expand Up @@ -452,7 +457,9 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service,
}
}

func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context) error {
func (g *grpcServer) processStream(
stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context,
) error {
opts := g.opts

r := &rpcRequest{
Expand Down Expand Up @@ -691,9 +698,11 @@ func (g *grpcServer) Register() error {
subscriberList = append(subscriberList, e)
}
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
sort.Slice(
subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
},
)

endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, n := range handlerList {
Expand Down

0 comments on commit 79cd53a

Please sign in to comment.