Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce CustomExecutionEngineV2 #346

Merged
merged 7 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 64 additions & 52 deletions pkg/graphql/execution_engine_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,47 +139,47 @@ func (e *internalExecutionContext) reset() {
}

type ExecutionEngineV2 struct {
logger abstractlogger.Logger
config EngineV2Configuration
planner *plan.Planner
plannerMu sync.Mutex
resolver *resolve.Resolver
internalExecutionContextPool sync.Pool
executionPlanCache *lru.Cache
logger abstractlogger.Logger
config EngineV2Configuration
planner *plan.Planner
plannerMu sync.Mutex
resolver *resolve.Resolver
executionPlanCache *lru.Cache
customExecutionEngineExecutor *CustomExecutionEngineV2Executor
}

type WebsocketBeforeStartHook interface {
OnBeforeStart(reqCtx context.Context, operation *Request) error
}

type ExecutionOptionsV2 func(ctx *internalExecutionContext)
type ExecutionOptionsV2 func(postProcessor *postprocess.Processor, resolveContext *resolve.Context)

func WithBeforeFetchHook(hook resolve.BeforeFetchHook) ExecutionOptionsV2 {
return func(ctx *internalExecutionContext) {
ctx.resolveContext.SetBeforeFetchHook(hook)
return func(postProcessor *postprocess.Processor, resolveContext *resolve.Context) {
resolveContext.SetBeforeFetchHook(hook)
}
}

func WithUpstreamHeaders(header http.Header) ExecutionOptionsV2 {
return func(ctx *internalExecutionContext) {
ctx.postProcessor.AddPostProcessor(postprocess.NewProcessInjectHeader(header))
return func(postProcessor *postprocess.Processor, resolveContext *resolve.Context) {
postProcessor.AddPostProcessor(postprocess.NewProcessInjectHeader(header))
}
}

func WithAfterFetchHook(hook resolve.AfterFetchHook) ExecutionOptionsV2 {
return func(ctx *internalExecutionContext) {
ctx.resolveContext.SetAfterFetchHook(hook)
return func(postProcessor *postprocess.Processor, resolveContext *resolve.Context) {
resolveContext.SetAfterFetchHook(hook)
}
}

func WithAdditionalHttpHeaders(headers http.Header, excludeByKeys ...string) ExecutionOptionsV2 {
return func(ctx *internalExecutionContext) {
return func(postProcessor *postprocess.Processor, resolveContext *resolve.Context) {
if len(headers) == 0 {
return
}

if ctx.resolveContext.Request.Header == nil {
ctx.resolveContext.Request.Header = make(http.Header)
if resolveContext.Request.Header == nil {
resolveContext.Request.Header = make(http.Header)
}

excludeMap := make(map[string]bool)
Expand All @@ -193,7 +193,7 @@ func WithAdditionalHttpHeaders(headers http.Header, excludeByKeys ...string) Exe
}

for _, headerValue := range headerValues {
ctx.resolveContext.Request.Header.Add(headerKey, headerValue)
resolveContext.Request.Header.Add(headerKey, headerValue)
}
}
}
Expand All @@ -216,21 +216,23 @@ func NewExecutionEngineV2(ctx context.Context, logger abstractlogger.Logger, eng
engineConfig.AddFieldConfiguration(fieldCfg)
}

return &ExecutionEngineV2{
logger: logger,
config: engineConfig,
planner: plan.NewPlanner(ctx, engineConfig.plannerConfig),
resolver: resolve.New(ctx, fetcher, engineConfig.dataLoaderConfig.EnableDataLoader),
internalExecutionContextPool: sync.Pool{
New: func() interface{} {
return newInternalExecutionContext()
},
},
executionEngine := &ExecutionEngineV2{
logger: logger,
config: engineConfig,
planner: plan.NewPlanner(ctx, engineConfig.plannerConfig),
resolver: resolve.New(ctx, fetcher, engineConfig.dataLoaderConfig.EnableDataLoader),
executionPlanCache: executionPlanCache,
}, nil
}

executor, err := NewCustomExecutionEngineV2Executor(executionEngine)
if err != nil {
return nil, err
}
executionEngine.customExecutionEngineExecutor = executor
return executionEngine, nil
}

func (e *ExecutionEngineV2) Execute(ctx context.Context, operation *Request, writer resolve.FlushWriter, options ...ExecutionOptionsV2) error {
func (e *ExecutionEngineV2) Normalize(operation *Request) error {
if !operation.IsNormalized() {
result, err := operation.Normalize(e.config.schema)
if err != nil {
Expand All @@ -241,43 +243,56 @@ func (e *ExecutionEngineV2) Execute(ctx context.Context, operation *Request, wri
return result.Errors
}
}
return nil
}

func (e *ExecutionEngineV2) ValidateForSchema(operation *Request) error {
result, err := operation.ValidateForSchema(e.config.schema)
if err != nil {
return err
}
if !result.Valid {
return result.Errors
}
return nil
}

execContext := e.getExecutionCtx()
defer e.putExecutionCtx(execContext)

execContext.prepare(ctx, operation.Variables, operation.request)

func (e *ExecutionEngineV2) Setup(ctx context.Context, postProcessor *postprocess.Processor, resolveContext *resolve.Context, operation *Request, options ...ExecutionOptionsV2) {
for i := range options {
options[i](execContext)
options[i](postProcessor, resolveContext)
}
}

var report operationreport.Report
cachedPlan := e.getCachedPlan(execContext, &operation.document, &e.config.schema.document, operation.OperationName, &report)
func (e *ExecutionEngineV2) Plan(postProcessor *postprocess.Processor, operation *Request, report *operationreport.Report) (plan.Plan, error) {
cachedPlan := e.getCachedPlan(postProcessor, &operation.document, &e.config.schema.document, operation.OperationName, report)
if report.HasErrors() {
return report
return nil, report
}
return cachedPlan, nil
}

switch p := cachedPlan.(type) {
func (e *ExecutionEngineV2) Resolve(resolveContext *resolve.Context, planResult plan.Plan, writer resolve.FlushWriter) error {
var err error
switch p := planResult.(type) {
case *plan.SynchronousResponsePlan:
err = e.resolver.ResolveGraphQLResponse(execContext.resolveContext, p.Response, nil, writer)
err = e.resolver.ResolveGraphQLResponse(resolveContext, p.Response, nil, writer)
case *plan.SubscriptionResponsePlan:
err = e.resolver.ResolveGraphQLSubscription(execContext.resolveContext, p.Response, writer)
err = e.resolver.ResolveGraphQLSubscription(resolveContext, p.Response, writer)
default:
return errors.New("execution of operation is not possible")
}

return err
}

func (e *ExecutionEngineV2) getCachedPlan(ctx *internalExecutionContext, operation, definition *ast.Document, operationName string, report *operationreport.Report) plan.Plan {
func (e *ExecutionEngineV2) Teardown() {
}

func (e *ExecutionEngineV2) Execute(ctx context.Context, operation *Request, writer resolve.FlushWriter, options ...ExecutionOptionsV2) error {
return e.customExecutionEngineExecutor.Execute(ctx, operation, writer, options...)
}

func (e *ExecutionEngineV2) getCachedPlan(postProcessor *postprocess.Processor, operation, definition *ast.Document, operationName string, report *operationreport.Report) plan.Plan {

hash := pool.Hash64.Get()
hash.Reset()
Expand All @@ -303,7 +318,7 @@ func (e *ExecutionEngineV2) getCachedPlan(ctx *internalExecutionContext, operati
return nil
}

p := ctx.postProcessor.Process(planResult)
p := postProcessor.Process(planResult)
e.executionPlanCache.Add(cacheKey, p)
return p
}
Expand All @@ -312,11 +327,8 @@ func (e *ExecutionEngineV2) GetWebsocketBeforeStartHook() WebsocketBeforeStartHo
return e.config.websocketBeforeStartHook
}

func (e *ExecutionEngineV2) getExecutionCtx() *internalExecutionContext {
return e.internalExecutionContextPool.Get().(*internalExecutionContext)
}

func (e *ExecutionEngineV2) putExecutionCtx(ctx *internalExecutionContext) {
ctx.reset()
e.internalExecutionContextPool.Put(ctx)
}
// Interface Guards
var (
_ CustomExecutionEngineV2 = (*ExecutionEngineV2)(nil)
_ ExecutionEngineV2Executor = (*ExecutionEngineV2)(nil)
)
145 changes: 145 additions & 0 deletions pkg/graphql/execution_engine_v2_custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package graphql

import (
"context"
"errors"
"sync"

"github.com/TykTechnologies/graphql-go-tools/pkg/engine/plan"
"github.com/TykTechnologies/graphql-go-tools/pkg/engine/resolve"
"github.com/TykTechnologies/graphql-go-tools/pkg/operationreport"
"github.com/TykTechnologies/graphql-go-tools/pkg/postprocess"
)

var (
ErrRequiredStagesMissing = errors.New("required stages for custom execution engine v2 are missing")
)

type CustomExecutionEngineV2NormalizerStage interface {
Normalize(operation *Request) error
}

type CustomExecutionEngineV2ValidatorStage interface {
ValidateForSchema(operation *Request) error
}

type CustomExecutionEngineV2ResolverStage interface {
Setup(ctx context.Context, postProcessor *postprocess.Processor, resolveContext *resolve.Context, operation *Request, options ...ExecutionOptionsV2)
Plan(postProcessor *postprocess.Processor, operation *Request, report *operationreport.Report) (plan.Plan, error)
Resolve(resolveContext *resolve.Context, planResult plan.Plan, writer resolve.FlushWriter) error
Teardown()
}

type CustomExecutionEngineV2 interface {
CustomExecutionEngineV2NormalizerStage
CustomExecutionEngineV2ValidatorStage
CustomExecutionEngineV2ResolverStage
}

type ExecutionEngineV2Executor interface {
Execute(ctx context.Context, operation *Request, writer resolve.FlushWriter, options ...ExecutionOptionsV2) error
}

type CustomExecutionEngineV2Stages struct {
RequiredStages CustomExecutionEngineV2RequiredStages
OptionalStages *CustomExecutionEngineV2OptionalStages
}

func (c *CustomExecutionEngineV2Stages) AllRequiredStagesProvided() bool {
return c.RequiredStages.ResolverStage != nil
}

type CustomExecutionEngineV2RequiredStages struct {
ResolverStage CustomExecutionEngineV2ResolverStage
}

type CustomExecutionEngineV2OptionalStages struct {
NormalizerStage CustomExecutionEngineV2NormalizerStage
ValidatorStage CustomExecutionEngineV2ValidatorStage
}

type CustomExecutionEngineV2Executor struct {
ExecutionStages CustomExecutionEngineV2Stages
internalExecutionContextPool sync.Pool
}

func NewCustomExecutionEngineV2Executor(executionEngineV2 CustomExecutionEngineV2) (*CustomExecutionEngineV2Executor, error) {
executionStages := CustomExecutionEngineV2Stages{
RequiredStages: CustomExecutionEngineV2RequiredStages{
ResolverStage: executionEngineV2,
},
OptionalStages: &CustomExecutionEngineV2OptionalStages{
NormalizerStage: executionEngineV2,
ValidatorStage: executionEngineV2,
},
}

return NewCustomExecutionEngineV2ExecutorByStages(executionStages)
}

func NewCustomExecutionEngineV2ExecutorByStages(executionStages CustomExecutionEngineV2Stages) (*CustomExecutionEngineV2Executor, error) {
return &CustomExecutionEngineV2Executor{
ExecutionStages: executionStages,
internalExecutionContextPool: sync.Pool{
New: func() interface{} {
return newInternalExecutionContext()
},
},
}, nil
}

func (c *CustomExecutionEngineV2Executor) getExecutionCtx() *internalExecutionContext {
return c.internalExecutionContextPool.Get().(*internalExecutionContext)
}

func (c *CustomExecutionEngineV2Executor) putExecutionCtx(ctx *internalExecutionContext) {
ctx.reset()
c.internalExecutionContextPool.Put(ctx)
}

func (c *CustomExecutionEngineV2Executor) Execute(ctx context.Context, operation *Request, writer resolve.FlushWriter, options ...ExecutionOptionsV2) error {
if !c.ExecutionStages.AllRequiredStagesProvided() {
return ErrRequiredStagesMissing
}

var err error
if c.ExecutionStages.OptionalStages != nil && c.ExecutionStages.OptionalStages.NormalizerStage != nil {
err = c.ExecutionStages.OptionalStages.NormalizerStage.Normalize(operation)
if err != nil {
return err
}
}

if c.ExecutionStages.OptionalStages != nil && c.ExecutionStages.OptionalStages.ValidatorStage != nil {
err = c.ExecutionStages.OptionalStages.ValidatorStage.ValidateForSchema(operation)
if err != nil {
return err
}
}

execContext := c.getExecutionCtx()
defer c.putExecutionCtx(execContext)
execContext.prepare(ctx, operation.Variables, operation.request)
c.ExecutionStages.RequiredStages.ResolverStage.Setup(ctx, execContext.postProcessor, execContext.resolveContext, operation, options...)

var report operationreport.Report
planResult, err := c.ExecutionStages.RequiredStages.ResolverStage.Plan(execContext.postProcessor, operation, &report)
if err != nil {
return err
} else if report.HasErrors() {
return report
}

err = c.ExecutionStages.RequiredStages.ResolverStage.Resolve(execContext.resolveContext, planResult, writer)
if err != nil {
return err
}

c.ExecutionStages.RequiredStages.ResolverStage.Teardown()
return nil
}

// Interface Guards
var (
_ ExecutionEngineV2Executor = (*CustomExecutionEngineV2Executor)(nil)
)
Loading