Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-10227] add timeout to gRPC plugin #5561

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apidef/api_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ type MiddlewareDefinition struct {
Path string `bson:"path" json:"path"`
RequireSession bool `bson:"require_session" json:"require_session"`
RawBodyOnly bool `bson:"raw_body_only" json:"raw_body_only"`
Timeout int `bson:"timeout" json:"timeout"`
}

// IDExtractorConfig specifies the configuration for ID extractor
Expand Down
4 changes: 3 additions & 1 deletion coprocess/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package coprocess

import (
context "context"

"github.com/TykTechnologies/tyk/apidef"
)

// Dispatcher defines a basic interface for the CP dispatcher, check PythonDispatcher for reference.
type Dispatcher interface {
// Dispatch takes and returns a pointer to a CoProcessMessage struct, see coprocess/api.h for details. This is used by CP bindings.
Dispatch(*Object) (*Object, error)
Dispatch(context.Context, *Object) (*Object, error)

// DispatchEvent takes an event JSON, as bytes. Doesn't return.
DispatchEvent([]byte)
Expand Down
8 changes: 4 additions & 4 deletions gateway/api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (gw *Gateway) processSpec(spec *APISpec, apisByListen map[string]int,
)
} else if mwDriver != apidef.OttoDriver {
coprocessLog.Debug("Registering coprocess middleware, hook name: ", obj.Name, "hook type: Pre", ", driver: ", mwDriver)
gw.mwAppendEnabled(&chainArray, &CoProcessMiddleware{baseMid, coprocess.HookType_Pre, obj.Name, mwDriver, obj.RawBodyOnly, nil})
gw.mwAppendEnabled(&chainArray, &CoProcessMiddleware{baseMid, coprocess.HookType_Pre, obj.Name, mwDriver, obj.RawBodyOnly, obj.Timeout, nil})
} else {
chainArray = append(chainArray, gw.createDynamicMiddleware(obj.Name, true, obj.RequireSession, baseMid))
}
Expand Down Expand Up @@ -399,7 +399,7 @@ func (gw *Gateway) processSpec(spec *APISpec, apisByListen map[string]int,
coprocessLog.Debug("Registering coprocess middleware, hook name: ", mwAuthCheckFunc.Name, "hook type: CustomKeyCheck", ", driver: ", mwDriver)

newExtractor(spec, baseMid)
gw.mwAppendEnabled(&authArray, &CoProcessMiddleware{baseMid, coprocess.HookType_CustomKeyCheck, mwAuthCheckFunc.Name, mwDriver, mwAuthCheckFunc.RawBodyOnly, nil})
gw.mwAppendEnabled(&authArray, &CoProcessMiddleware{baseMid, coprocess.HookType_CustomKeyCheck, mwAuthCheckFunc.Name, mwDriver, mwAuthCheckFunc.RawBodyOnly, mwAuthCheckFunc.Timeout, nil})
}
}

Expand All @@ -423,7 +423,7 @@ func (gw *Gateway) processSpec(spec *APISpec, apisByListen map[string]int,
)
} else {
coprocessLog.Debug("Registering coprocess middleware, hook name: ", obj.Name, "hook type: Pre", ", driver: ", mwDriver)
gw.mwAppendEnabled(&chainArray, &CoProcessMiddleware{baseMid, coprocess.HookType_PostKeyAuth, obj.Name, mwDriver, obj.RawBodyOnly, nil})
gw.mwAppendEnabled(&chainArray, &CoProcessMiddleware{baseMid, coprocess.HookType_PostKeyAuth, obj.Name, mwDriver, obj.RawBodyOnly, obj.Timeout, nil})
}
}

Expand Down Expand Up @@ -470,7 +470,7 @@ func (gw *Gateway) processSpec(spec *APISpec, apisByListen map[string]int,
)
} else if mwDriver != apidef.OttoDriver {
coprocessLog.Debug("Registering coprocess middleware, hook name: ", obj.Name, "hook type: Post", ", driver: ", mwDriver)
gw.mwAppendEnabled(&chainArray, &CoProcessMiddleware{baseMid, coprocess.HookType_Post, obj.Name, mwDriver, obj.RawBodyOnly, nil})
gw.mwAppendEnabled(&chainArray, &CoProcessMiddleware{baseMid, coprocess.HookType_Post, obj.Name, mwDriver, obj.RawBodyOnly, obj.Timeout, nil})
} else {
chainArray = append(chainArray, gw.createDynamicMiddleware(obj.Name, false, obj.RequireSession, baseMid))
}
Expand Down
16 changes: 15 additions & 1 deletion gateway/coprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gateway

import (
"bytes"
"context"
"encoding/json"
"net/url"
"reflect"
Expand Down Expand Up @@ -34,6 +35,7 @@ type CoProcessMiddleware struct {
HookName string
MiddlewareDriver apidef.MiddlewareDriver
RawBodyOnly bool
Timeout int

successHandler *SuccessHandler
}
Expand Down Expand Up @@ -642,7 +644,19 @@ func (c *CoProcessor) Dispatch(object *coprocess.Object) (*coprocess.Object, err
err := fmt.Errorf("Couldn't dispatch request, driver '%s' isn't available", c.Middleware.MiddlewareDriver)
return nil, err
}
newObject, err := dispatcher.Dispatch(object)
// Load CoProcess timeout
timeout := c.Middleware.Timeout
var ctx context.Context
var cancel context.CancelFunc
// Initialize context depending if the CoProcess has a configured timeout
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()

newObject, err := dispatcher.Dispatch(ctx, object)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions gateway/coprocess_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (gw *Gateway) dialer(addr string, timeout time.Duration) (net.Conn, error)
}

// Dispatch takes a CoProcessMessage and sends it to the CP.
func (d *GRPCDispatcher) Dispatch(object *coprocess.Object) (*coprocess.Object, error) {
return grpcClient.Dispatch(context.Background(), object)
func (d *GRPCDispatcher) Dispatch(ctx context.Context, object *coprocess.Object) (*coprocess.Object, error) {
return grpcClient.Dispatch(ctx, object)
}

// DispatchEvent dispatches a Tyk event.
Expand Down
3 changes: 2 additions & 1 deletion gateway/coprocess_python.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
python "github.com/TykTechnologies/tyk/dlpython"
)
import (
"context"
"os"
"sync"
)
Expand All @@ -39,7 +40,7 @@ type PythonDispatcher struct {
}

// Dispatch takes a CoProcessMessage and sends it to the CP.
func (d *PythonDispatcher) Dispatch(object *coprocess.Object) (*coprocess.Object, error) {
func (d *PythonDispatcher) Dispatch(ctx context.Context, object *coprocess.Object) (*coprocess.Object, error) {
// Prepare the PB object:
objectMsg, err := proto.Marshal(object)
if err != nil {
Expand Down