Skip to content

Commit

Permalink
introduce CustomExecutionEngineV2 (#346)
Browse files Browse the repository at this point in the history
Changes in this PR are based on @buraksezer 's awesome research in this
commit:
23f7686

This PR introduces interfaces for a `CustomExecutionEngineV2` including
execution stages.
By introducing those changes it will be possible to provide its own
`ExecutionEngineV2` or decorate the already implemented
`ExecutionEngineV2`.

Changes:
- added a benchmark test for `ExecutionEngineV2.Execute` to track
performance impacts with this change (none so far)
 - introduced the following interfaces:
   - `CustomExecutionEngineV2NormalizerStage` (optional)
   - `CustomExecutionEngineV2ValidatorStage` (optional)
   - `CustomExecutionEngineV2ResolverStage` (required)
   - `ExecutionEngineV2Executor`
- added a default implementation for `ExecutionEngineV2Executor` called
`CustomExecutionEngineV2Executor`
- `ExecutionEngineV2` now implements all interfaces mentioned before and
decorates the `CustomExecutionEngineV2Executor` (so it is basically a
`CustomExecutionEngineV2`)
- moved the `internalExecutionContext` to `CustomExecutionEngineV2` as
it is an implementation detail
- `ExecutionOptionsV2` are now decoupled from `internalExecutionContext`
so that implementing them outside of the `graphql` package is now
possible
  • Loading branch information
pvormste committed Apr 26, 2023
1 parent 4dff07f commit 0d8ec19
Show file tree
Hide file tree
Showing 4 changed files with 473 additions and 58 deletions.
116 changes: 64 additions & 52 deletions pkg/graphql/execution_engine_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,47 +139,47 @@ func (e *internalExecutionContext) reset() {
}

type ExecutionEngineV2 struct {
logger abstractlogger.Logger
config EngineV2Configuration
planner *plan.Planner
plannerMu sync.Mutex
resolver *resolve.Resolver
internalExecutionContextPool sync.Pool
executionPlanCache *lru.Cache
logger abstractlogger.Logger
config EngineV2Configuration
planner *plan.Planner
plannerMu sync.Mutex
resolver *resolve.Resolver
executionPlanCache *lru.Cache
customExecutionEngineExecutor *CustomExecutionEngineV2Executor
}

type WebsocketBeforeStartHook interface {
OnBeforeStart(reqCtx context.Context, operation *Request) error
}

type ExecutionOptionsV2 func(ctx *internalExecutionContext)
type ExecutionOptionsV2 func(postProcessor *postprocess.Processor, resolveContext *resolve.Context)

func WithBeforeFetchHook(hook resolve.BeforeFetchHook) ExecutionOptionsV2 {
return func(ctx *internalExecutionContext) {
ctx.resolveContext.SetBeforeFetchHook(hook)
return func(postProcessor *postprocess.Processor, resolveContext *resolve.Context) {
resolveContext.SetBeforeFetchHook(hook)
}
}

func WithUpstreamHeaders(header http.Header) ExecutionOptionsV2 {
return func(ctx *internalExecutionContext) {
ctx.postProcessor.AddPostProcessor(postprocess.NewProcessInjectHeader(header))
return func(postProcessor *postprocess.Processor, resolveContext *resolve.Context) {
postProcessor.AddPostProcessor(postprocess.NewProcessInjectHeader(header))
}
}

func WithAfterFetchHook(hook resolve.AfterFetchHook) ExecutionOptionsV2 {
return func(ctx *internalExecutionContext) {
ctx.resolveContext.SetAfterFetchHook(hook)
return func(postProcessor *postprocess.Processor, resolveContext *resolve.Context) {
resolveContext.SetAfterFetchHook(hook)
}
}

func WithAdditionalHttpHeaders(headers http.Header, excludeByKeys ...string) ExecutionOptionsV2 {
return func(ctx *internalExecutionContext) {
return func(postProcessor *postprocess.Processor, resolveContext *resolve.Context) {
if len(headers) == 0 {
return
}

if ctx.resolveContext.Request.Header == nil {
ctx.resolveContext.Request.Header = make(http.Header)
if resolveContext.Request.Header == nil {
resolveContext.Request.Header = make(http.Header)
}

excludeMap := make(map[string]bool)
Expand All @@ -193,7 +193,7 @@ func WithAdditionalHttpHeaders(headers http.Header, excludeByKeys ...string) Exe
}

for _, headerValue := range headerValues {
ctx.resolveContext.Request.Header.Add(headerKey, headerValue)
resolveContext.Request.Header.Add(headerKey, headerValue)
}
}
}
Expand All @@ -216,21 +216,23 @@ func NewExecutionEngineV2(ctx context.Context, logger abstractlogger.Logger, eng
engineConfig.AddFieldConfiguration(fieldCfg)
}

return &ExecutionEngineV2{
logger: logger,
config: engineConfig,
planner: plan.NewPlanner(ctx, engineConfig.plannerConfig),
resolver: resolve.New(ctx, fetcher, engineConfig.dataLoaderConfig.EnableDataLoader),
internalExecutionContextPool: sync.Pool{
New: func() interface{} {
return newInternalExecutionContext()
},
},
executionEngine := &ExecutionEngineV2{
logger: logger,
config: engineConfig,
planner: plan.NewPlanner(ctx, engineConfig.plannerConfig),
resolver: resolve.New(ctx, fetcher, engineConfig.dataLoaderConfig.EnableDataLoader),
executionPlanCache: executionPlanCache,
}, nil
}

executor, err := NewCustomExecutionEngineV2Executor(executionEngine)
if err != nil {
return nil, err
}
executionEngine.customExecutionEngineExecutor = executor
return executionEngine, nil
}

func (e *ExecutionEngineV2) Execute(ctx context.Context, operation *Request, writer resolve.FlushWriter, options ...ExecutionOptionsV2) error {
func (e *ExecutionEngineV2) Normalize(operation *Request) error {
if !operation.IsNormalized() {
result, err := operation.Normalize(e.config.schema)
if err != nil {
Expand All @@ -241,43 +243,56 @@ func (e *ExecutionEngineV2) Execute(ctx context.Context, operation *Request, wri
return result.Errors
}
}
return nil
}

func (e *ExecutionEngineV2) ValidateForSchema(operation *Request) error {
result, err := operation.ValidateForSchema(e.config.schema)
if err != nil {
return err
}
if !result.Valid {
return result.Errors
}
return nil
}

execContext := e.getExecutionCtx()
defer e.putExecutionCtx(execContext)

execContext.prepare(ctx, operation.Variables, operation.request)

func (e *ExecutionEngineV2) Setup(ctx context.Context, postProcessor *postprocess.Processor, resolveContext *resolve.Context, operation *Request, options ...ExecutionOptionsV2) {
for i := range options {
options[i](execContext)
options[i](postProcessor, resolveContext)
}
}

var report operationreport.Report
cachedPlan := e.getCachedPlan(execContext, &operation.document, &e.config.schema.document, operation.OperationName, &report)
func (e *ExecutionEngineV2) Plan(postProcessor *postprocess.Processor, operation *Request, report *operationreport.Report) (plan.Plan, error) {
cachedPlan := e.getCachedPlan(postProcessor, &operation.document, &e.config.schema.document, operation.OperationName, report)
if report.HasErrors() {
return report
return nil, report
}
return cachedPlan, nil
}

switch p := cachedPlan.(type) {
func (e *ExecutionEngineV2) Resolve(resolveContext *resolve.Context, planResult plan.Plan, writer resolve.FlushWriter) error {
var err error
switch p := planResult.(type) {
case *plan.SynchronousResponsePlan:
err = e.resolver.ResolveGraphQLResponse(execContext.resolveContext, p.Response, nil, writer)
err = e.resolver.ResolveGraphQLResponse(resolveContext, p.Response, nil, writer)
case *plan.SubscriptionResponsePlan:
err = e.resolver.ResolveGraphQLSubscription(execContext.resolveContext, p.Response, writer)
err = e.resolver.ResolveGraphQLSubscription(resolveContext, p.Response, writer)
default:
return errors.New("execution of operation is not possible")
}

return err
}

func (e *ExecutionEngineV2) getCachedPlan(ctx *internalExecutionContext, operation, definition *ast.Document, operationName string, report *operationreport.Report) plan.Plan {
func (e *ExecutionEngineV2) Teardown() {
}

func (e *ExecutionEngineV2) Execute(ctx context.Context, operation *Request, writer resolve.FlushWriter, options ...ExecutionOptionsV2) error {
return e.customExecutionEngineExecutor.Execute(ctx, operation, writer, options...)
}

func (e *ExecutionEngineV2) getCachedPlan(postProcessor *postprocess.Processor, operation, definition *ast.Document, operationName string, report *operationreport.Report) plan.Plan {

hash := pool.Hash64.Get()
hash.Reset()
Expand All @@ -303,7 +318,7 @@ func (e *ExecutionEngineV2) getCachedPlan(ctx *internalExecutionContext, operati
return nil
}

p := ctx.postProcessor.Process(planResult)
p := postProcessor.Process(planResult)
e.executionPlanCache.Add(cacheKey, p)
return p
}
Expand All @@ -312,11 +327,8 @@ func (e *ExecutionEngineV2) GetWebsocketBeforeStartHook() WebsocketBeforeStartHo
return e.config.websocketBeforeStartHook
}

func (e *ExecutionEngineV2) getExecutionCtx() *internalExecutionContext {
return e.internalExecutionContextPool.Get().(*internalExecutionContext)
}

func (e *ExecutionEngineV2) putExecutionCtx(ctx *internalExecutionContext) {
ctx.reset()
e.internalExecutionContextPool.Put(ctx)
}
// Interface Guards
var (
_ CustomExecutionEngineV2 = (*ExecutionEngineV2)(nil)
_ ExecutionEngineV2Executor = (*ExecutionEngineV2)(nil)
)
145 changes: 145 additions & 0 deletions pkg/graphql/execution_engine_v2_custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package graphql

import (
"context"
"errors"
"sync"

"github.com/TykTechnologies/graphql-go-tools/pkg/engine/plan"
"github.com/TykTechnologies/graphql-go-tools/pkg/engine/resolve"
"github.com/TykTechnologies/graphql-go-tools/pkg/operationreport"
"github.com/TykTechnologies/graphql-go-tools/pkg/postprocess"
)

var (
ErrRequiredStagesMissing = errors.New("required stages for custom execution engine v2 are missing")
)

type CustomExecutionEngineV2NormalizerStage interface {
Normalize(operation *Request) error
}

type CustomExecutionEngineV2ValidatorStage interface {
ValidateForSchema(operation *Request) error
}

type CustomExecutionEngineV2ResolverStage interface {
Setup(ctx context.Context, postProcessor *postprocess.Processor, resolveContext *resolve.Context, operation *Request, options ...ExecutionOptionsV2)
Plan(postProcessor *postprocess.Processor, operation *Request, report *operationreport.Report) (plan.Plan, error)
Resolve(resolveContext *resolve.Context, planResult plan.Plan, writer resolve.FlushWriter) error
Teardown()
}

type CustomExecutionEngineV2 interface {
CustomExecutionEngineV2NormalizerStage
CustomExecutionEngineV2ValidatorStage
CustomExecutionEngineV2ResolverStage
}

type ExecutionEngineV2Executor interface {
Execute(ctx context.Context, operation *Request, writer resolve.FlushWriter, options ...ExecutionOptionsV2) error
}

type CustomExecutionEngineV2Stages struct {
RequiredStages CustomExecutionEngineV2RequiredStages
OptionalStages *CustomExecutionEngineV2OptionalStages
}

func (c *CustomExecutionEngineV2Stages) AllRequiredStagesProvided() bool {
return c.RequiredStages.ResolverStage != nil
}

type CustomExecutionEngineV2RequiredStages struct {
ResolverStage CustomExecutionEngineV2ResolverStage
}

type CustomExecutionEngineV2OptionalStages struct {
NormalizerStage CustomExecutionEngineV2NormalizerStage
ValidatorStage CustomExecutionEngineV2ValidatorStage
}

type CustomExecutionEngineV2Executor struct {
ExecutionStages CustomExecutionEngineV2Stages
internalExecutionContextPool sync.Pool
}

func NewCustomExecutionEngineV2Executor(executionEngineV2 CustomExecutionEngineV2) (*CustomExecutionEngineV2Executor, error) {
executionStages := CustomExecutionEngineV2Stages{
RequiredStages: CustomExecutionEngineV2RequiredStages{
ResolverStage: executionEngineV2,
},
OptionalStages: &CustomExecutionEngineV2OptionalStages{
NormalizerStage: executionEngineV2,
ValidatorStage: executionEngineV2,
},
}

return NewCustomExecutionEngineV2ExecutorByStages(executionStages)
}

func NewCustomExecutionEngineV2ExecutorByStages(executionStages CustomExecutionEngineV2Stages) (*CustomExecutionEngineV2Executor, error) {
return &CustomExecutionEngineV2Executor{
ExecutionStages: executionStages,
internalExecutionContextPool: sync.Pool{
New: func() interface{} {
return newInternalExecutionContext()
},
},
}, nil
}

func (c *CustomExecutionEngineV2Executor) getExecutionCtx() *internalExecutionContext {
return c.internalExecutionContextPool.Get().(*internalExecutionContext)
}

func (c *CustomExecutionEngineV2Executor) putExecutionCtx(ctx *internalExecutionContext) {
ctx.reset()
c.internalExecutionContextPool.Put(ctx)
}

func (c *CustomExecutionEngineV2Executor) Execute(ctx context.Context, operation *Request, writer resolve.FlushWriter, options ...ExecutionOptionsV2) error {
if !c.ExecutionStages.AllRequiredStagesProvided() {
return ErrRequiredStagesMissing
}

var err error
if c.ExecutionStages.OptionalStages != nil && c.ExecutionStages.OptionalStages.NormalizerStage != nil {
err = c.ExecutionStages.OptionalStages.NormalizerStage.Normalize(operation)
if err != nil {
return err
}
}

if c.ExecutionStages.OptionalStages != nil && c.ExecutionStages.OptionalStages.ValidatorStage != nil {
err = c.ExecutionStages.OptionalStages.ValidatorStage.ValidateForSchema(operation)
if err != nil {
return err
}
}

execContext := c.getExecutionCtx()
defer c.putExecutionCtx(execContext)
execContext.prepare(ctx, operation.Variables, operation.request)
c.ExecutionStages.RequiredStages.ResolverStage.Setup(ctx, execContext.postProcessor, execContext.resolveContext, operation, options...)

var report operationreport.Report
planResult, err := c.ExecutionStages.RequiredStages.ResolverStage.Plan(execContext.postProcessor, operation, &report)
if err != nil {
return err
} else if report.HasErrors() {
return report
}

err = c.ExecutionStages.RequiredStages.ResolverStage.Resolve(execContext.resolveContext, planResult, writer)
if err != nil {
return err
}

c.ExecutionStages.RequiredStages.ResolverStage.Teardown()
return nil
}

// Interface Guards
var (
_ ExecutionEngineV2Executor = (*CustomExecutionEngineV2Executor)(nil)
)
Loading

0 comments on commit 0d8ec19

Please sign in to comment.