Skip to content

Commit

Permalink
Merge branch 'master' into graphql-operation-builder
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac committed Jan 13, 2024
2 parents 5c080fe + 48aa45c commit 3784341
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 22 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ client.
WithExitWhenNoSubscription(false).
// WithRetryStatusCodes allow retry the subscription connection when receiving one of these codes
// the input parameter can be number string or range, e.g 4000-5000
WithRetryStatusCodes("4000", "4000-4050")
WithRetryStatusCodes("4000", "4000-4050").
// WithSyncMode subscription messages are executed in sequence (without goroutine)
WithSyncMode(true)
```
#### Subscription Protocols
Expand Down Expand Up @@ -934,23 +936,24 @@ Enable debug mode with the `WithDebug` function. If the request is failed, the r
}
```
Because the GraphQL query string is generated in runtime using reflection, it isn't really safe. To assure the GraphQL query is expected, it's necessary to write some unit test for query construction.
For debugging queries, you can use `Construct*` functions to see what the generated query looks like:
```go
// ConstructQuery build GraphQL query string from struct and variables
func ConstructQuery(v interface{}, variables map[string]interface{}, options ...Option) (string, error)

// ConstructQuery build GraphQL mutation string from struct and variables
// ConstructMutation build GraphQL mutation string from struct and variables
func ConstructMutation(v interface{}, variables map[string]interface{}, options ...Option) (string, error)

// ConstructSubscription build GraphQL subscription string from struct and variables
func ConstructSubscription(v interface{}, variables map[string]interface{}, options ...Option) (string, error)
func ConstructSubscription(v interface{}, variables map[string]interface{}, options ...Option) (string, string, error)

// UnmarshalGraphQL parses the JSON-encoded GraphQL response data and stores
// the result in the GraphQL query data structure pointed to by v.
func UnmarshalGraphQL(data []byte, v interface{}) error
```
Because the GraphQL query string is generated in runtime using reflection, it isn't really safe. To assure the GraphQL query is expected, it's necessary to write some unit test for query construction.
Directories
-----------
Expand Down
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ module github.com/hasura/go-graphql-client
go 1.20

require (
github.com/google/uuid v1.3.0
github.com/google/uuid v1.5.0
github.com/graph-gophers/graphql-go v1.5.0
github.com/graph-gophers/graphql-transport-ws v0.0.2
nhooyr.io/websocket v1.8.7
nhooyr.io/websocket v1.8.10
)

require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/klauspost/compress v1.17.4 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.16.0 // indirect
)

replace github.com/gin-gonic/gin v1.6.3 => github.com/gin-gonic/gin v1.7.7
replace github.com/gin-gonic/gin v1.6.3 => github.com/gin-gonic/gin v1.9.1
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -30,10 +31,14 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMNMPSVXA1yc=
github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os=
github.com/graph-gophers/graphql-transport-ws v0.0.2 h1:DbmSkbIGzj8SvHei6n8Mh9eLQin8PtA8xY9eCzjRpvo=
Expand All @@ -44,6 +49,8 @@ github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eT
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
Expand Down Expand Up @@ -71,8 +78,13 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b h1:Qwe1rC8PSniVfAFPFJeyUkB+zcysC3RgJBAGk7eqBEU=
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -81,6 +93,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand All @@ -97,3 +112,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q=
nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
83 changes: 74 additions & 9 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -102,6 +103,7 @@ func (om OperationMessage) String() string {
type WebsocketConn interface {
ReadJSON(v interface{}) error
WriteJSON(v interface{}) error
Ping() error
Close() error
// SetReadLimit sets the maximum size in bytes for a message read from the peer. If a
// message exceeds the limit, the connection sends a close message to the peer
Expand Down Expand Up @@ -292,6 +294,9 @@ func (sc *SubscriptionContext) Close() error {

sc.Cancel()

if errors.Is(err, net.ErrClosed) {
return nil
}
return err
}

Expand Down Expand Up @@ -363,6 +368,9 @@ type SubscriptionClient struct {
onError func(sc *SubscriptionClient, err error) error
errorChan chan error
exitWhenNoSubscription bool
syncMode bool
keepAliveInterval time.Duration
retryDelay time.Duration
mutex sync.Mutex
}

Expand All @@ -377,6 +385,8 @@ func NewSubscriptionClient(url string) *SubscriptionClient {
errorChan: make(chan error),
protocol: &subscriptionsTransportWS{},
exitWhenNoSubscription: true,
keepAliveInterval: 0 * time.Second,
retryDelay: 1 * time.Second,
context: &SubscriptionContext{
subscriptions: make(map[string]Subscription),
},
Expand Down Expand Up @@ -459,6 +469,45 @@ func (sc *SubscriptionClient) WithExitWhenNoSubscription(value bool) *Subscripti
return sc
}

// WithSyncMode subscription messages are executed in sequence (without goroutine)
func (sc *SubscriptionClient) WithSyncMode(value bool) *SubscriptionClient {
sc.syncMode = value
return sc
}

// Keep alive subroutine to send ping on specified interval
func startKeepAlive(ctx context.Context, c WebsocketConn, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Ping the websocket. You might want to handle any potential errors.
err := c.Ping()
if err != nil {
fmt.Printf("%s => Failed to ping server\n", time.Now().Format(time.TimeOnly))
// Handle the error, maybe log it, close the connection, etc.
}
case <-ctx.Done():
// If the context is cancelled, stop the pinging.
return
}
}
}

// WithKeepAlive programs the websocket to ping on the specified interval
func (sc *SubscriptionClient) WithKeepAlive(interval time.Duration) *SubscriptionClient {
sc.keepAliveInterval = interval
return sc
}

// WithRetryDelay set the delay time before retrying the connection
func (sc *SubscriptionClient) WithRetryDelay(delay time.Duration) *SubscriptionClient {
sc.retryDelay = delay
return sc
}

// WithLog sets logging function to print out received messages. By default, nothing is printed
func (sc *SubscriptionClient) WithLog(logger func(args ...interface{})) *SubscriptionClient {
sc.context.log = logger
Expand Down Expand Up @@ -580,8 +629,8 @@ func (sc *SubscriptionClient) init() error {
}
return err
}
ctx.Log(fmt.Sprintf("%s. retry in second...", err.Error()), "client", GQLInternal)
time.Sleep(time.Second)
ctx.Log(fmt.Sprintf("%s. retry in %d second...", err.Error(), sc.retryDelay/time.Second), "client", GQLInternal)
time.Sleep(sc.retryDelay)
}
}

Expand Down Expand Up @@ -709,6 +758,10 @@ func (sc *SubscriptionClient) Run() error {
sc.setClientStatus(scStatusRunning)
ctx := subContext.GetContext()

if sc.keepAliveInterval > 0 {
go startKeepAlive(ctx, conn, sc.keepAliveInterval)
}

go func() {
for {
select {
Expand All @@ -718,7 +771,7 @@ func (sc *SubscriptionClient) Run() error {
var message OperationMessage
if err := conn.ReadJSON(&message); err != nil {
// manual EOF check
if err == io.EOF || strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "connection reset by peer") {
if err == io.EOF || strings.Contains(err.Error(), "EOF") || errors.Is(err, net.ErrClosed) || strings.Contains(err.Error(), "connection reset by peer") {
sc.errorChan <- errRetry
return
}
Expand Down Expand Up @@ -764,13 +817,20 @@ func (sc *SubscriptionClient) Run() error {
if sub == nil {
sub = &Subscription{}
}
go func() {

execMessage := func() {
if err := sc.protocol.OnMessage(subContext, *sub, message); err != nil {
sc.errorChan <- err
}

sc.checkSubscriptionStatuses(subContext)
}()
}

if sc.syncMode {
execMessage()
} else {
go execMessage()
}
}
}
}()
Expand Down Expand Up @@ -866,7 +926,7 @@ func (sc *SubscriptionClient) close(ctx *SubscriptionContext) (err error) {
continue
}
if sub.status == SubscriptionRunning {
if err := sc.protocol.Unsubscribe(ctx, sub); err != nil {
if err := sc.protocol.Unsubscribe(ctx, sub); err != nil && !errors.Is(err, net.ErrClosed) {
unsubscribeErrors[key] = err
}
}
Expand Down Expand Up @@ -966,6 +1026,13 @@ func (wh *WebsocketHandler) ReadJSON(v interface{}) error {
return wsjson.Read(ctx, wh.Conn, v)
}

// Ping sends a ping to the peer and waits for a pong
func (wh *WebsocketHandler) Ping() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return wh.Conn.Ping(ctx)
}

// Close implements the function to close the websocket connection
func (wh *WebsocketHandler) Close() error {
return wh.Conn.Close(websocket.StatusNormalClosure, "close websocket")
Expand All @@ -977,9 +1044,7 @@ func (wh *WebsocketHandler) GetCloseStatus(err error) int32 {
// context timeout error returned from ReadJSON or WriteJSON
// try to ping the server, if failed return abnormal closeure error
if errors.Is(err, context.DeadlineExceeded) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if pingErr := wh.Ping(ctx); pingErr != nil {
if pingErr := wh.Ping(); pingErr != nil {
return int32(websocket.StatusNoStatusRcvd)
}
return -1
Expand Down
12 changes: 11 additions & 1 deletion subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"nhooyr.io/websocket"
)

func TestSubscription_LifeCycleEvents(t *testing.T) {
func testSubscription_LifeCycleEvents(t *testing.T, syncMode bool) {

server := subscription_setupServer(8082)
client, subscriptionClient := subscription_setupClients(8082)
msg := randomID()
Expand Down Expand Up @@ -84,6 +85,7 @@ func TestSubscription_LifeCycleEvents(t *testing.T) {
subscriptionClient = subscriptionClient.
WithExitWhenNoSubscription(false).
WithTimeout(3 * time.Second).
WithSyncMode(syncMode).
OnConnected(func() {
lock.Lock()
defer lock.Unlock()
Expand Down Expand Up @@ -200,6 +202,14 @@ func TestSubscription_LifeCycleEvents(t *testing.T) {
}
}

func TestSubscription_LifeCycleEvents(t *testing.T) {
testSubscription_LifeCycleEvents(t, false)
}

func TestSubscription_WithSyncMode(t *testing.T) {
testSubscription_LifeCycleEvents(t, true)
}

func TestSubscription_WithRetryStatusCodes(t *testing.T) {
stop := make(chan bool)
msg := randomID()
Expand Down

0 comments on commit 3784341

Please sign in to comment.