Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription execution #1

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 147 additions & 42 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,161 @@ import (
"fmt"

"github.com/graphql-go/graphql/gqlerrors"
"github.com/graphql-go/graphql/language/ast"
"github.com/graphql-go/graphql/language/parser"
"github.com/graphql-go/graphql/language/source"
)

// SubscribeParams parameters for subscribing
type SubscribeParams struct {
Schema Schema
Document *ast.Document
RootValue interface{}
ContextValue context.Context
Schema Schema
RequestString string
RootValue interface{}
// ContextValue context.Context
VariableValues map[string]interface{}
OperationName string
FieldResolver FieldResolveFn
FieldSubscriber FieldResolveFn
}

// Subscribe performs a subscribe operation
func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
resultChannel := make(chan *Result)
// SubscriptableSchema implements `graphql-transport-ws` `GraphQLService` interface: https://github.com/graph-gophers/graphql-transport-ws/blob/40c0484322990a129cac2f2d2763c3315230280c/graphqlws/internal/connection/connection.go#L53
// you can pass `SubscriptableSchema` to `graphql-transport-ws` `NewHandlerFunc`
type SubscriptableSchema struct {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree this is useful, there is no reason this needs to be part of the library. It can be implemented outside of the library as all the types it relies on are exported.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where should this type live? handler package?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i removed it, this will probably live in the handler package as a NewSubscriptionHandler function that uses graphql-transport-ws under the hood

If you want to try some subscirption i made an example here

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i removed it, this will probably live in the handler package as a NewSubscriptionHandler function that uses graphql-transport-ws under the hood

If you want to try some subscirption i made an example here

yes, a different package. Feel free to PR it into the graphql-go-tools if you don't have another place for it. My request there is that it be its own package in a subfolder

Schema Schema
RootObject map[string]interface{}
}

// Subscribe method let you use SubscriptableSchema with graphql-transport-ws https://github.com/graph-gophers/graphql-transport-ws
func (self *SubscriptableSchema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan interface{}, error) {
c := Subscribe(Params{
Schema: self.Schema,
Context: ctx,
OperationName: operationName,
RequestString: queryString,
RootObject: self.RootObject,
VariableValues: variables,
})
to := make(chan interface{})
go func() {
defer close(to)
select {
case <-ctx.Done():
return
case res, more := <-c:
if !more {
return
}
to <- res
}
}()
return to, nil
}

// Subscribe performs a subscribe operation on the given query and schema
// currently does not support extensions hooks
func Subscribe(p Params) chan *Result {

source := source.NewSource(&source.Source{
Body: []byte(p.RequestString),
Name: "GraphQL request",
})

// TODO run extensions hooks

// parse the source
AST, err := parser.Parse(parser.ParseParams{Source: source})
if err != nil {

// merge the errors from extensions and the original error from parser
return sendOneResultAndClose(&Result{
Errors: gqlerrors.FormatErrors(err),
})
}

// validate document
validationResult := ValidateDocument(&p.Schema, AST, nil)

if !validationResult.IsValid {
// run validation finish functions for extensions
return sendOneResultAndClose(&Result{
Errors: validationResult.Errors,
})

}
return ExecuteSubscription(ExecuteParams{
Schema: p.Schema,
Root: p.RootObject,
AST: AST,
OperationName: p.OperationName,
Args: p.VariableValues,
Context: p.Context,
})
}

func sendOneResultAndClose(res *Result) chan *Result {
resultChannel := make(chan *Result, 1)
resultChannel <- res
close(resultChannel)
return resultChannel
}

// ExecuteSubscription is similar to graphql.Execute but returns a channel instead of a Result
// currently does not support extensions
func ExecuteSubscription(p ExecuteParams) chan *Result {

if p.Context == nil {
p.Context = context.Background()
remorses marked this conversation as resolved.
Show resolved Hide resolved
}

var mapSourceToResponse = func(payload interface{}) *Result {
return Execute(ExecuteParams{
Schema: p.Schema,
Root: payload,
AST: p.Document,
AST: p.AST,
OperationName: p.OperationName,
Args: p.VariableValues,
Context: p.ContextValue,
Args: p.Args,
Context: p.Context,
})
}

var resultChannel = make(chan *Result)
go func() {
result := &Result{}
defer close(resultChannel)
defer func() {
if err := recover(); err != nil {
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
resultChannel <- result
e, ok := err.(error)
if !ok {
fmt.Println("strange program path")
return
}
resultChannel <- &Result{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the resultChannel is closed before this runs, this will panic which is why I had it in this deferred function at the end

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only way the channel can close is the defer close(resultChannel), this means the channel will be closed after ther error handling (defer functions are run from last to first)

I will add a test case for a panic

Errors: gqlerrors.FormatErrors(e),
}
}
close(resultChannel)
return
}()

exeContext, err := buildExecutionContext(buildExecutionCtxParams{
Schema: p.Schema,
Root: p.RootValue,
AST: p.Document,
Root: p.Root,
AST: p.AST,
OperationName: p.OperationName,
Args: p.VariableValues,
Result: result,
Context: p.ContextValue,
Args: p.Args,
Context: p.Context,
})

if err != nil {
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
resultChannel <- result
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(err),
}

return
}

operationType, err := getOperationRootType(p.Schema, exeContext.Operation)
if err != nil {
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
resultChannel <- result
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(err),
}

return
}

Expand All @@ -85,18 +180,20 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
fieldDef := getFieldDef(p.Schema, operationType, fieldName)

if fieldDef == nil {
err := fmt.Errorf("the subscription field %q is not defined", fieldName)
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
resultChannel <- result
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription field %q is not defined", fieldName)),
}

return
}

resolveFn := p.FieldSubscriber
resolveFn := fieldDef.Subscribe

if resolveFn == nil {
resolveFn = DefaultResolveFn
}
if fieldDef.Subscribe != nil {
resolveFn = fieldDef.Subscribe
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(fmt.Errorf("the subscription function %q is not defined", fieldName)),
}
return
}
fieldPath := &ResponsePath{
Key: responseName,
Expand All @@ -117,21 +214,24 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
}

fieldResult, err := resolveFn(ResolveParams{
Source: p.RootValue,
Source: p.Root,
Args: args,
Info: info,
Context: p.ContextValue,
Context: p.Context,
})
if err != nil {
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
resultChannel <- result
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(err),
}

return
}

if fieldResult == nil {
err := fmt.Errorf("no field result")
result.Errors = append(result.Errors, gqlerrors.FormatError(err.(error)))
resultChannel <- result
resultChannel <- &Result{
Errors: gqlerrors.FormatErrors(fmt.Errorf("no field result")),
}

return
}

Expand All @@ -140,14 +240,19 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
sub := fieldResult.(chan interface{})
for {
select {
case <-ctx.Done():
case <-p.Context.Done():
// TODO send the context error to the resultchannel?
return

case res := <-sub:
case res, more := <-sub:
remorses marked this conversation as resolved.
Show resolved Hide resolved
if !more {
return
}
resultChannel <- mapSourceToResponse(res)
}
}
default:
fmt.Println(fieldResult)
remorses marked this conversation as resolved.
Show resolved Hide resolved
resultChannel <- mapSourceToResponse(fieldResult)
return
}
Expand Down
Loading