From 386f1e611e62875425b50272b4f51d267faf561b Mon Sep 17 00:00:00 2001 From: ajanikow <12255597+ajanikow@users.noreply.github.com> Date: Sun, 8 May 2022 19:44:31 +0000 Subject: [PATCH] [Feature] Async Requests Support --- CHANGELOG.md | 1 + Makefile | 2 +- pkg/util/arangod/conn/async.go | 102 +++++++++++ pkg/util/arangod/conn/async_errors.go | 75 ++++++++ pkg/util/arangod/conn/async_test.go | 78 +++++++++ pkg/util/arangod/conn/connection.pass.go | 72 ++++++++ pkg/util/arangod/conn/context.go | 37 ++++ pkg/util/constants/connection.go | 27 +++ pkg/util/tests/gen.go | 9 +- pkg/util/tests/inspector.go | 7 +- pkg/util/tests/server.go | 209 +++++++++++++++++++++++ pkg/util/tests/server_async.go | 114 +++++++++++++ 12 files changed, 725 insertions(+), 8 deletions(-) create mode 100644 pkg/util/arangod/conn/async.go create mode 100644 pkg/util/arangod/conn/async_errors.go create mode 100644 pkg/util/arangod/conn/async_test.go create mode 100644 pkg/util/arangod/conn/connection.pass.go create mode 100644 pkg/util/arangod/conn/context.go create mode 100644 pkg/util/constants/connection.go create mode 100644 pkg/util/tests/server.go create mode 100644 pkg/util/tests/server_async.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 937806ae1..857c735c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - (Feature) Recursive OwnerReference discovery - (Maintenance) Add check make targets - (Feature) Create support for local variables in actions. +- (Feature) Support for asynchronous ArangoD resquests. ## [1.2.11](https://github.com/arangodb/kube-arangodb/tree/1.2.11) (2022-04-30) - (Bugfix) Orphan PVC are not removed diff --git a/Makefile b/Makefile index ab329d414..9500a2d2c 100644 --- a/Makefile +++ b/Makefile @@ -150,7 +150,7 @@ ifdef VERBOSE TESTVERBOSEOPTIONS := -v endif -EXCLUDE_DIRS := tests vendor .gobuild deps tools +EXCLUDE_DIRS := vendor .gobuild deps tools SOURCES_QUERY := find ./ -type f -name '*.go' $(foreach EXCLUDE_DIR,$(EXCLUDE_DIRS), ! -path "*/$(EXCLUDE_DIR)/*") SOURCES := $(shell $(SOURCES_QUERY)) DASHBOARDSOURCES := $(shell find $(DASHBOARDDIR)/src -name '*.js') $(DASHBOARDDIR)/package.json diff --git a/pkg/util/arangod/conn/async.go b/pkg/util/arangod/conn/async.go new file mode 100644 index 000000000..49fb22b3b --- /dev/null +++ b/pkg/util/arangod/conn/async.go @@ -0,0 +1,102 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package conn + +import ( + "context" + "net/http" + "path" + + "github.com/arangodb/go-driver" + "github.com/arangodb/kube-arangodb/pkg/util/constants" + "github.com/arangodb/kube-arangodb/pkg/util/errors" +) + +func NewAsyncConnection(c driver.Connection) driver.Connection { + return async{ + connectionPass: connectionPass{ + c: c, + wrap: asyncConnectionWrap, + }, + } +} + +func asyncConnectionWrap(c driver.Connection) (driver.Connection, error) { + return NewAsyncConnection(c), nil +} + +type async struct { + connectionPass +} + +func (a async) isAsyncIDSet(ctx context.Context) (string, bool) { + if ctx != nil { + if q := ctx.Value(asyncOperatorContextKey); q != nil { + if v, ok := q.(string); ok { + return v, true + } + } + } + + return "", false +} + +func (a async) Do(ctx context.Context, req driver.Request) (driver.Response, error) { + if id, ok := a.isAsyncIDSet(ctx); ok { + // We have ID Set, request should be done to fetch job id + req, err := a.c.NewRequest(http.MethodPut, path.Join("/_api/job", id)) + if err != nil { + return nil, err + } + + resp, err := a.c.Do(ctx, req) + if err != nil { + return nil, err + } + + switch resp.StatusCode() { + case http.StatusNotFound: + return nil, newAsyncErrorNotFound(id) + case http.StatusNoContent: + return nil, newAsyncJobInProgress(id) + default: + return resp, nil + } + } else { + req.SetHeader(constants.ArangoHeaderAsyncKey, constants.ArangoHeaderAsyncValue) + + resp, err := a.c.Do(ctx, req) + if err != nil { + return nil, err + } + + switch resp.StatusCode() { + case http.StatusAccepted: + if v := resp.Header(constants.ArangoHeaderAsyncIDKey); len(v) == 0 { + return nil, errors.Newf("Missing async key response") + } else { + return nil, newAsyncJobInProgress(v) + } + default: + return nil, resp.CheckStatus(http.StatusAccepted) + } + } +} diff --git a/pkg/util/arangod/conn/async_errors.go b/pkg/util/arangod/conn/async_errors.go new file mode 100644 index 000000000..81340e8a6 --- /dev/null +++ b/pkg/util/arangod/conn/async_errors.go @@ -0,0 +1,75 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package conn + +import "fmt" + +func IsAsyncErrorNotFound(err error) bool { + if err == nil { + return false + } + + if _, ok := err.(asyncErrorNotFound); ok { + return true + } + + return false +} + +func newAsyncErrorNotFound(id string) error { + return asyncErrorNotFound{ + jobID: id, + } +} + +type asyncErrorNotFound struct { + jobID string +} + +func (a asyncErrorNotFound) Error() string { + return fmt.Sprintf("Job with ID %s not found", a.jobID) +} + +func IsAsyncJobInProgress(err error) (string, bool) { + if err == nil { + return "", false + } + + if v, ok := err.(asyncJobInProgress); ok { + return v.jobID, true + } + + return "", false +} + +func newAsyncJobInProgress(id string) error { + return asyncJobInProgress{ + jobID: id, + } +} + +type asyncJobInProgress struct { + jobID string +} + +func (a asyncJobInProgress) Error() string { + return fmt.Sprintf("Job with ID %s in progress", a.jobID) +} diff --git a/pkg/util/arangod/conn/async_test.go b/pkg/util/arangod/conn/async_test.go new file mode 100644 index 000000000..ffdc84765 --- /dev/null +++ b/pkg/util/arangod/conn/async_test.go @@ -0,0 +1,78 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package conn + +import ( + "context" + "net/http" + "testing" + + "github.com/arangodb/go-driver" + "github.com/arangodb/kube-arangodb/pkg/util/tests" + "github.com/stretchr/testify/require" +) + +func Test_Async(t *testing.T) { + s := tests.NewServer(t) + + c := s.NewConnection() + + c = NewAsyncConnection(c) + + client, err := driver.NewClient(driver.ClientConfig{ + Connection: c, + }) + require.NoError(t, err) + + a := tests.NewAsyncHandler(t, s, http.MethodGet, "/_api/version", http.StatusOK, driver.VersionInfo{ + Server: "foo", + Version: "", + License: "", + Details: nil, + }) + + a.Start() + + _, err = client.Version(context.Background()) + require.Error(t, err) + id, ok := IsAsyncJobInProgress(err) + require.True(t, ok) + require.Equal(t, a.ID(), id) + + a.InProgress() + + ctx := WithAsyncID(context.TODO(), a.ID()) + + _, err = client.Version(ctx) + require.Error(t, err) + id, ok = IsAsyncJobInProgress(err) + require.True(t, ok) + require.Equal(t, a.ID(), id) + + a.Done() + + v, err := client.Version(ctx) + require.NoError(t, err) + + require.Equal(t, v.Server, "foo") + + defer s.Stop() +} diff --git a/pkg/util/arangod/conn/connection.pass.go b/pkg/util/arangod/conn/connection.pass.go new file mode 100644 index 000000000..051d8723d --- /dev/null +++ b/pkg/util/arangod/conn/connection.pass.go @@ -0,0 +1,72 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package conn + +import ( + "context" + + "github.com/arangodb/go-driver" +) + +type connectionWrap func(c driver.Connection) (driver.Connection, error) + +var _ driver.Connection = connectionPass{} + +type connectionPass struct { + c driver.Connection + wrap connectionWrap +} + +func (c connectionPass) NewRequest(method, path string) (driver.Request, error) { + return c.c.NewRequest(method, path) +} + +func (c connectionPass) Do(ctx context.Context, req driver.Request) (driver.Response, error) { + return c.c.Do(ctx, req) +} + +func (c connectionPass) Unmarshal(data driver.RawObject, result interface{}) error { + return c.c.Unmarshal(data, result) +} + +func (c connectionPass) Endpoints() []string { + return c.c.Endpoints() +} + +func (c connectionPass) UpdateEndpoints(endpoints []string) error { + return c.c.UpdateEndpoints(endpoints) +} + +func (c connectionPass) SetAuthentication(authentication driver.Authentication) (driver.Connection, error) { + newC, err := c.c.SetAuthentication(authentication) + if err != nil { + return nil, err + } + + if f := c.wrap; f != nil { + return f(newC) + } + return newC, nil +} + +func (c connectionPass) Protocols() driver.ProtocolSet { + return c.c.Protocols() +} diff --git a/pkg/util/arangod/conn/context.go b/pkg/util/arangod/conn/context.go new file mode 100644 index 000000000..da19b0655 --- /dev/null +++ b/pkg/util/arangod/conn/context.go @@ -0,0 +1,37 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package conn + +import "context" + +type ContextKey string + +const ( + asyncOperatorContextKey ContextKey = "operator-async-id" +) + +func WithAsyncID(ctx context.Context, id string) context.Context { + if ctx == nil { + ctx = context.Background() + } + + return context.WithValue(ctx, asyncOperatorContextKey, id) +} diff --git a/pkg/util/constants/connection.go b/pkg/util/constants/connection.go new file mode 100644 index 000000000..c2acd6bdc --- /dev/null +++ b/pkg/util/constants/connection.go @@ -0,0 +1,27 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package constants + +const ( + ArangoHeaderAsyncIDKey = "x-arango-async-id" + ArangoHeaderAsyncKey = "x-arango-async" + ArangoHeaderAsyncValue = "store" +) diff --git a/pkg/util/tests/gen.go b/pkg/util/tests/gen.go index b024562d5..c35bf8696 100644 --- a/pkg/util/tests/gen.go +++ b/pkg/util/tests/gen.go @@ -21,13 +21,14 @@ package tests import ( + "context" + "testing" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" "github.com/arangodb/kube-arangodb/pkg/util/kclient" - "testing" - "context" "github.com/stretchr/testify/require" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" ) func NewArangoDeployment(name string) *api.ArangoDeployment { diff --git a/pkg/util/tests/inspector.go b/pkg/util/tests/inspector.go index a096799b6..a461ab252 100644 --- a/pkg/util/tests/inspector.go +++ b/pkg/util/tests/inspector.go @@ -21,13 +21,14 @@ package tests import ( + "context" "testing" - "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector" - "context" - "github.com/stretchr/testify/require" inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/throttle" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/stretchr/testify/require" ) const FakeNamespace = "fake" diff --git a/pkg/util/tests/server.go b/pkg/util/tests/server.go new file mode 100644 index 000000000..d21b351d0 --- /dev/null +++ b/pkg/util/tests/server.go @@ -0,0 +1,209 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package tests + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "sync" + "testing" + + "github.com/arangodb/go-driver" + httpdriver "github.com/arangodb/go-driver/http" + "github.com/stretchr/testify/require" +) + +func NewServer(t *testing.T) Server { + s := &server{ + t: t, + stop: make(chan struct{}), + stopped: make(chan struct{}), + started: make(chan struct{}), + done: make(chan struct{}), + } + + go s.run() + + <-s.started + + return s +} + +type Server interface { + NewConnection() driver.Connection + NewClient() driver.Client + + Handle(f http.HandlerFunc) + Addr() string + Stop() +} + +type server struct { + lock sync.Mutex + + t *testing.T + + handlers []http.HandlerFunc + + port int + + stop, stopped, started, done chan struct{} +} + +func (s *server) NewClient() driver.Client { + c, err := driver.NewClient(driver.ClientConfig{ + Connection: s.NewConnection(), + }) + require.NoError(s.t, err) + + return c +} + +func (s *server) NewConnection() driver.Connection { + c, err := httpdriver.NewConnection(httpdriver.ConnectionConfig{ + Endpoints: []string{ + s.Addr(), + }, + ContentType: driver.ContentTypeJSON, + }) + require.NoError(s.t, err) + + return c +} + +func (s *server) Handle(f http.HandlerFunc) { + s.lock.Lock() + defer s.lock.Unlock() + + s.handlers = append(s.handlers, f) +} + +func (s *server) Addr() string { + return fmt.Sprintf("http://127.0.0.1:%d", s.port) +} + +func (s *server) Stop() { + s.lock.Lock() + defer s.lock.Unlock() + + close(s.stop) + + <-s.done + + if q := len(s.handlers); q > 0 { + require.Failf(s.t, "Pending messages", "Count %d", q) + } +} + +func (s *server) run() { + defer close(s.done) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.t, err) + + s.port = listener.Addr().(*net.TCPAddr).Port + + m := http.NewServeMux() + + m.HandleFunc("/", s.handle) + + server := http.Server{ + Handler: m, + } + + var serverErr error + + go func() { + defer close(s.stopped) + close(s.started) + + go func() { + <-s.stop + require.NoError(s.t, server.Close()) + }() + + serverErr = server.Serve(listener) + }() + + <-s.stopped + + if serverErr != http.ErrServerClosed { + require.NoError(s.t, serverErr) + } +} +func (s *server) handle(writer http.ResponseWriter, request *http.Request) { + s.lock.Lock() + defer s.lock.Unlock() + + var handler http.HandlerFunc + + switch len(s.handlers) { + case 0: + require.Fail(s.t, "No pending messages") + case 1: + handler = s.handlers[0] + s.handlers = nil + default: + handler = s.handlers[0] + s.handlers = s.handlers[1:] + } + + handler(writer, request) +} + +func NewSimpleHandler(t *testing.T, method string, path string, resp func(t *testing.T) (int, interface{})) http.HandlerFunc { + return NewCustomRequestHandler(t, method, path, nil, nil, resp) +} + +func NewCustomRequestHandler(t *testing.T, method string, path string, reqVerify func(t *testing.T, r *http.Request), respHeaders func(t *testing.T) map[string]string, resp func(t *testing.T) (int, interface{})) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + require.Equal(t, method, request.Method) + require.Equal(t, path, request.RequestURI) + + if reqVerify != nil { + reqVerify(t, request) + } + + code, r := resp(t) + + writer.Header().Add("content-type", "application/json") + if respHeaders != nil { + h := respHeaders(t) + + for k, v := range h { + writer.Header().Add(k, v) + } + } + + writer.WriteHeader(code) + + if r != nil { + d, err := json.Marshal(r) + require.NoError(t, err) + + s, err := writer.Write(d) + require.NoError(t, err) + require.Equal(t, len(d), s) + } + } +} diff --git a/pkg/util/tests/server_async.go b/pkg/util/tests/server_async.go new file mode 100644 index 000000000..717faec0d --- /dev/null +++ b/pkg/util/tests/server_async.go @@ -0,0 +1,114 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package tests + +import ( + "fmt" + "net/http" + "testing" + + "github.com/arangodb/kube-arangodb/pkg/util/constants" + "github.com/dchest/uniuri" + "github.com/stretchr/testify/require" +) + +func NewAsyncHandler(t *testing.T, s Server, method string, path string, retCode int, ret interface{}) AsyncHandler { + return &asyncHandler{ + t: t, + s: s, + ret: ret, + retCode: retCode, + method: method, + path: path, + id: uniuri.NewLen(32), + } +} + +type AsyncHandler interface { + ID() string + + Start() + InProgress() + Missing() + Done() +} + +type asyncHandler struct { + t *testing.T + s Server + + ret interface{} + retCode int + + method, path string + + id string +} + +func (a *asyncHandler) Missing() { + p := fmt.Sprintf("/_api/job/%s", a.id) + + a.s.Handle(NewCustomRequestHandler(a.t, http.MethodPut, p, func(t *testing.T, r *http.Request) { + v := r.Header.Get(constants.ArangoHeaderAsyncKey) + require.Equal(t, "", v) + }, nil, func(t *testing.T) (int, interface{}) { + return http.StatusNotFound, nil + })) +} + +func (a *asyncHandler) Start() { + a.s.Handle(NewCustomRequestHandler(a.t, a.method, a.path, func(t *testing.T, r *http.Request) { + v := r.Header.Get(constants.ArangoHeaderAsyncKey) + require.Equal(t, constants.ArangoHeaderAsyncValue, v) + }, func(t *testing.T) map[string]string { + return map[string]string{ + constants.ArangoHeaderAsyncIDKey: a.id, + } + }, func(t *testing.T) (int, interface{}) { + return http.StatusAccepted, nil + })) +} + +func (a *asyncHandler) InProgress() { + p := fmt.Sprintf("/_api/job/%s", a.id) + + a.s.Handle(NewCustomRequestHandler(a.t, http.MethodPut, p, func(t *testing.T, r *http.Request) { + v := r.Header.Get(constants.ArangoHeaderAsyncKey) + require.Equal(t, "", v) + }, nil, func(t *testing.T) (int, interface{}) { + return http.StatusNoContent, nil + })) +} + +func (a *asyncHandler) Done() { + p := fmt.Sprintf("/_api/job/%s", a.id) + + a.s.Handle(NewCustomRequestHandler(a.t, http.MethodPut, p, func(t *testing.T, r *http.Request) { + v := r.Header.Get(constants.ArangoHeaderAsyncKey) + require.Equal(t, "", v) + }, nil, func(t *testing.T) (int, interface{}) { + return a.retCode, a.ret + })) +} + +func (a *asyncHandler) ID() string { + return a.id +}