Skip to content
Permalink
Browse files
refactor: fix method name
  • Loading branch information
PhilYue committed Dec 5, 2021
1 parent bd1e81a commit 0e87534d63c5a6453ced599a1428f438f784d13f
Showing 1 changed file with 51 additions and 77 deletions.
@@ -23,8 +23,6 @@ import (
"io"
"io/ioutil"
stdHttp "net/http"
"os"
"path/filepath"
"strings"
"sync"
)
@@ -58,10 +56,9 @@ const (
Kind = constant.HTTPGrpcProxyFilter

loggerHeader = "[grpc-proxy]"
)

var (
fsrc fileSource
// DescriptorSourceKey current ds
DescriptorSourceKey = "DescriptorSource"
)

func init() {
@@ -75,6 +72,7 @@ type (

// Filter is grpc filter instance
Filter struct {
Descriptor
cfg *Config
// hold grpc.ClientConns, key format: cluster name + "." + endpoint
pools map[string]*sync.Pool
@@ -85,6 +83,7 @@ type (

// Config describe the config of AccessFilter
Config struct {
//DescriptorSourceStrategy string `yaml:"descriptor_source_strategy" json:"descriptor_source_strategy"`
Path string `yaml:"path" json:"path"`
Rules []*Rule `yaml:"rules" json:"rules"` //nolint
}
@@ -133,9 +132,46 @@ func getServiceAndMethod(path string) (string, string) {

// Handle use the default http to grpc transcoding strategy https://cloud.google.com/endpoints/docs/grpc/transcoding
func (f *Filter) Handle(c *http.HttpContext) {

svc, mth := getServiceAndMethod(c.GetUrl())

dscp, err := fsrc.FindSymbol(svc)
var clientConn *grpc.ClientConn
var err error

re := c.GetRouteEntry()
logger.Debugf("%s client choose endpoint from cluster :%v", loggerHeader, re.Cluster)

e := server.GetClusterManager().PickEndpoint(re.Cluster)
if e == nil {
logger.Errorf("%s err {cluster not exists}", loggerHeader)
c.Err = perrors.New("cluster not exists")
c.Next()
return
}

ep := e.Address.GetAddress()

p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")]
if !ok {
p = &sync.Pool{}
}

clientConn, ok = p.Get().(*grpc.ClientConn)
if !ok || clientConn == nil {
// TODO(Kenway): Support Credential and TLS
clientConn, err = grpc.DialContext(c.Ctx, ep, grpc.WithInsecure())
if err != nil || clientConn == nil {
logger.Errorf("%s err {failed to connect to grpc service provider}", loggerHeader)
c.Err = err
c.Next()
return
}
}

// get DescriptorSource, contain file and reflection
source := f.getDescriptorByGrpcReflect(c.Ctx, clientConn)

dscp, err := source.FindSymbol(svc)
if err != nil {
logger.Errorf("%s err {%s}", loggerHeader, "request path invalid")
c.Err = perrors.New("method not allow")
@@ -153,7 +189,7 @@ func (f *Filter) Handle(c *http.HttpContext) {

mthDesc := svcDesc.FindMethodByName(mth)

err = f.registerExtension(mthDesc)
err = f.registerExtension(source, mthDesc)
if err != nil {
logger.Errorf("%s err {%s}", loggerHeader, "register extension failed")
c.Err = err
@@ -172,37 +208,6 @@ func (f *Filter) Handle(c *http.HttpContext) {
return
}

var clientConn *grpc.ClientConn
re := c.GetRouteEntry()
logger.Debugf("%s client choose endpoint from cluster :%v", loggerHeader, re.Cluster)

e := server.GetClusterManager().PickEndpoint(re.Cluster)
if e == nil {
logger.Errorf("%s err {cluster not exists}", loggerHeader)
c.Err = perrors.New("cluster not exists")
c.Next()
return
}

ep := e.Address.GetAddress()

p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")]
if !ok {
p = &sync.Pool{}
}

clientConn, ok = p.Get().(*grpc.ClientConn)
if !ok || clientConn == nil {
// TODO(Kenway): Support Credential and TLS
clientConn, err = grpc.DialContext(c.Ctx, ep, grpc.WithInsecure())
if err != nil || clientConn == nil {
logger.Errorf("%s err {failed to connect to grpc service provider}", loggerHeader)
c.Err = err
c.Next()
return
}
}

stub := grpcdynamic.NewStubWithMessageFactory(clientConn, msgFac)

// metadata in grpc has the same feature in http
@@ -244,27 +249,27 @@ func (f *Filter) Handle(c *http.HttpContext) {
c.Next()
}

func (f *Filter) registerExtension(mthDesc *desc.MethodDescriptor) error {
err := RegisterExtension(&f.extReg, mthDesc.GetInputType(), f.registered)
func (f *Filter) registerExtension(source DescriptorSource, mthDesc *desc.MethodDescriptor) error {
err := RegisterExtension(source, &f.extReg, mthDesc.GetInputType(), f.registered)
if err != nil {
return perrors.New("register extension failed")
}

err = RegisterExtension(&f.extReg, mthDesc.GetOutputType(), f.registered)
err = RegisterExtension(source, &f.extReg, mthDesc.GetOutputType(), f.registered)
if err != nil {
return perrors.New("register extension failed")
}
return nil
}

func RegisterExtension(extReg *dynamic.ExtensionRegistry, msgDesc *desc.MessageDescriptor, registered map[string]bool) error {
func RegisterExtension(source DescriptorSource, extReg *dynamic.ExtensionRegistry, msgDesc *desc.MessageDescriptor, registered map[string]bool) error {
msgType := msgDesc.GetFullyQualifiedName()
if _, ok := registered[msgType]; ok {
return nil
}

if len(msgDesc.GetExtensionRanges()) > 0 {
fds, err := fsrc.AllExtensionsForType(msgType)
fds, err := source.AllExtensionsForType(msgType)
if err != nil {
return fmt.Errorf("failed to find msg type {%s} in file source", msgType)
}
@@ -277,7 +282,7 @@ func RegisterExtension(extReg *dynamic.ExtensionRegistry, msgDesc *desc.MessageD

for _, fd := range msgDesc.GetFields() {
if fd.GetMessageType() != nil {
err := RegisterExtension(extReg, fd.GetMessageType(), registered)
err := RegisterExtension(source, extReg, fd.GetMessageType(), registered)
if err != nil {
return err
}
@@ -328,40 +333,9 @@ func (f *Filter) Config() interface{} {
}

func (f *Filter) Apply() error {
gc := f.cfg

cur := gc.Path
if !filepath.IsAbs(cur) {
ex, err := os.Executable()
if err != nil {
return err
}
cur = filepath.Dir(ex) + string(os.PathSeparator) + gc.Path
}
f.initDescriptorSource(f.cfg)

logger.Infof("%s load proto files from %s", loggerHeader, cur)
fileLists := make([]string, 0)
items, err := ioutil.ReadDir(cur)
if err != nil {
return err
}

for _, item := range items {
if !item.IsDir() {
sp := strings.Split(item.Name(), ".")
length := len(sp)
if length >= 2 && sp[length-1] == "proto" {
fileLists = append(fileLists, item.Name())
}
}
}

if err != nil {
return err
}
err = f.initFromFileDescriptor([]string{gc.Path}, fileLists...)
if err != nil {
return err
}
return nil
}

0 comments on commit 0e87534

Please sign in to comment.