diff --git a/.golangci.yaml b/.golangci.yaml index 0440af6bb..6591b88d8 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -37,6 +37,10 @@ linters-settings: pkg: github.com/arangodb/kube-arangodb/integrations/meta/v1 - alias: pbMetaV1 pkg: github.com/arangodb/kube-arangodb/integrations/meta/v1/definition + - alias: pbImplEventsV1 + pkg: github.com/arangodb/kube-arangodb/integrations/events/v1 + - alias: pbEventsV1 + pkg: github.com/arangodb/kube-arangodb/integrations/events/v1/definition - alias: pbImplAuthenticationV1 pkg: github.com/arangodb/kube-arangodb/integrations/authentication/v1 - alias: pbAuthenticationV1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 306be2c12..51f6d17b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - (Feature) (License) Activation CLI - (Bugfix) (DP) Propagate Timeout Across Subcommands - (Maintenance) Bump Dependencies +- (Feature) (Platform) EventsV1 Integration ## [1.3.1](https://github.com/arangodb/kube-arangodb/tree/1.3.1) (2025-10-07) - (Documentation) Add ArangoPlatformStorage Docs & Examples diff --git a/README.md b/README.md index 5c428ff67..71ac0cfdd 100644 --- a/README.md +++ b/README.md @@ -204,7 +204,7 @@ Flags: --kubernetes.qps float32 Number of queries per second for k8s API. If set to 0 or less, API calls won't be throttled (default 32) --leader.label.skip Skips Leader Label for the Pod --log.format string Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used (default "pretty") - --log.level stringArray Set log levels in format or =. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-authn-v1, integration-config-v1, integration-envoy-auth-v3, integration-envoy-auth-v3-impl-auth-bearer, integration-envoy-auth-v3-impl-auth-cookie, integration-envoy-auth-v3-impl-custom-openid, integration-envoy-auth-v3-impl-pass-mode, integration-meta-v1, integration-scheduler-v2, integration-shutdown-v1, integration-storage-v1-s3, integration-storage-v2, integrations, k8s-client, kubernetes, kubernetes-access, kubernetes-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-service-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication, webhook (default [info]) + --log.level stringArray Set log levels in format or =. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-authn-v1, integration-config-v1, integration-envoy-auth-v3, integration-envoy-auth-v3-impl-auth-bearer, integration-envoy-auth-v3-impl-auth-cookie, integration-envoy-auth-v3-impl-custom-openid, integration-envoy-auth-v3-impl-pass-mode, integration-events-v1, integration-meta-v1, integration-pong-v1, integration-scheduler-v2, integration-shutdown-v1, integration-storage-v1-s3, integration-storage-v2, integrations, k8s-client, kubernetes, kubernetes-access, kubernetes-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-service-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication, webhook (default [info]) --log.sampling If true, operator will try to minimize duplication of logging events (default true) --log.stdout If true, operator will log to the stdout (default true) --memory-limit uint Define memory limit for hard shutdown and the dump of goroutines. Used for testing diff --git a/docs/cli/arangodb_operator.md b/docs/cli/arangodb_operator.md index e5a635305..7e3a7a169 100644 --- a/docs/cli/arangodb_operator.md +++ b/docs/cli/arangodb_operator.md @@ -86,7 +86,7 @@ Flags: --kubernetes.qps float32 Number of queries per second for k8s API. If set to 0 or less, API calls won't be throttled (default 32) --leader.label.skip Skips Leader Label for the Pod --log.format string Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used (default "pretty") - --log.level stringArray Set log levels in format or =. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-authn-v1, integration-config-v1, integration-envoy-auth-v3, integration-envoy-auth-v3-impl-auth-bearer, integration-envoy-auth-v3-impl-auth-cookie, integration-envoy-auth-v3-impl-custom-openid, integration-envoy-auth-v3-impl-pass-mode, integration-meta-v1, integration-scheduler-v2, integration-shutdown-v1, integration-storage-v1-s3, integration-storage-v2, integrations, k8s-client, kubernetes, kubernetes-access, kubernetes-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-service-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication, webhook (default [info]) + --log.level stringArray Set log levels in format or =. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-authn-v1, integration-config-v1, integration-envoy-auth-v3, integration-envoy-auth-v3-impl-auth-bearer, integration-envoy-auth-v3-impl-auth-cookie, integration-envoy-auth-v3-impl-custom-openid, integration-envoy-auth-v3-impl-pass-mode, integration-events-v1, integration-meta-v1, integration-pong-v1, integration-scheduler-v2, integration-shutdown-v1, integration-storage-v1-s3, integration-storage-v2, integrations, k8s-client, kubernetes, kubernetes-access, kubernetes-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-service-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication, webhook (default [info]) --log.sampling If true, operator will try to minimize duplication of logging events (default true) --log.stdout If true, operator will log to the stdout (default true) --memory-limit uint Define memory limit for hard shutdown and the dump of goroutines. Used for testing diff --git a/docs/cli/arangodb_operator_integration.md b/docs/cli/arangodb_operator_integration.md index ee4ad1191..db5492d9d 100644 --- a/docs/cli/arangodb_operator_integration.md +++ b/docs/cli/arangodb_operator_integration.md @@ -58,6 +58,13 @@ Flags: --integration.envoy.auth.v3.extensions.users.create Defines if UserCreation extension is enabled (Env: INTEGRATION_ENVOY_AUTH_V3_EXTENSIONS_USERS_CREATE) --integration.envoy.auth.v3.external Defines if External access to service envoy.auth.v3 is enabled (Env: INTEGRATION_ENVOY_AUTH_V3_EXTERNAL) --integration.envoy.auth.v3.internal Defines if Internal access to service envoy.auth.v3 is enabled (Env: INTEGRATION_ENVOY_AUTH_V3_INTERNAL) (default true) + --integration.events.v1 Enable EventsV1 Integration Service (Env: INTEGRATION_EVENTS_V1) + --integration.events.v1.async Enables async injection of the events (Env: INTEGRATION_EVENTS_V1_ASYNC) (default true) + --integration.events.v1.async.retry.delay duration Delay of the retries (Env: INTEGRATION_EVENTS_V1_ASYNC_RETRY_DELAY) (default 1s) + --integration.events.v1.async.retry.timeout duration Timeout for the event injection (Env: INTEGRATION_EVENTS_V1_ASYNC_RETRY_TIMEOUT) (default 1m0s) + --integration.events.v1.async.size int Size of the async queue (Env: INTEGRATION_EVENTS_V1_ASYNC_SIZE) (default 16) + --integration.events.v1.external Defines if External access to service events.v1 is enabled (Env: INTEGRATION_EVENTS_V1_EXTERNAL) + --integration.events.v1.internal Defines if Internal access to service events.v1 is enabled (Env: INTEGRATION_EVENTS_V1_INTERNAL) (default true) --integration.meta.v1 Enable MetaV1 Integration Service (Env: INTEGRATION_META_V1) --integration.meta.v1.external Defines if External access to service meta.v1 is enabled (Env: INTEGRATION_META_V1_EXTERNAL) --integration.meta.v1.internal Defines if Internal access to service meta.v1 is enabled (Env: INTEGRATION_META_V1_INTERNAL) (default true) diff --git a/docs/integration-sidecar.md b/docs/integration-sidecar.md index eb200c98e..614242156 100644 --- a/docs/integration-sidecar.md +++ b/docs/integration-sidecar.md @@ -12,14 +12,9 @@ nav_order: 2 ### Resource Types -Integration Sidecar is supported in a few resources managed by Operator: +Integration Sidecar is supported in a basic resources managed by Kubernetes: -- ArangoSchedulerDeployment (scheduler.arangodb.com/v1beta1) -- ArangoSchedulerBatchJob (scheduler.arangodb.com/v1beta1) -- ArangoSchedulerCronJob (scheduler.arangodb.com/v1beta1) -- ArangoSchedulerPod (scheduler.arangodb.com/v1beta1) - -Standard Kubernetes Resources (like Pod) are also supported with Webhook extension is enabled. +- Pod To enable integration sidecar for specific deployment label needs to be defined: @@ -140,6 +135,18 @@ metadata: integration.profiles.arangodb.com/meta: v1 ``` +#### [Events V1](./integration/events.v1.md) + +Events Integration Sidecar + +To enable: + +```yaml +metadata: + labels: + integration.profiles.arangodb.com/events: v1 +``` + ### Envs #### INTEGRATION_API_ADDRESS diff --git a/docs/integration/authentication.v1.md b/docs/integration/authentication.v1.md index 194b1cd9c..f19f79c00 100644 --- a/docs/integration/authentication.v1.md +++ b/docs/integration/authentication.v1.md @@ -9,4 +9,4 @@ parent: Integration Sidecars Definitions: -- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.0/integrations/authentication/v1/definition/definition.proto) +- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.1/integrations/authentication/v1/definition/definition.proto) diff --git a/docs/integration/authorization.v0.md b/docs/integration/authorization.v0.md index 6c32bbcf6..fba0fcd36 100644 --- a/docs/integration/authorization.v0.md +++ b/docs/integration/authorization.v0.md @@ -9,5 +9,5 @@ parent: Integration Sidecars Definitions: -- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.0/integrations/authorization/v0/definition/definition.proto) +- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.1/integrations/authorization/v0/definition/definition.proto) diff --git a/docs/integration/events.v1.md b/docs/integration/events.v1.md new file mode 100644 index 000000000..497699009 --- /dev/null +++ b/docs/integration/events.v1.md @@ -0,0 +1,12 @@ +--- +layout: page +title: Integration Sidecar Meta V1 +grand_parent: ArangoDBPlatform +parent: Integration Sidecars +--- + +# Events V1 + +Definitions: + +- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.1/integrations/events/v1/definition/definition.proto) diff --git a/docs/integration/meta.v1.md b/docs/integration/meta.v1.md index 6cbe5a07e..1500473e0 100644 --- a/docs/integration/meta.v1.md +++ b/docs/integration/meta.v1.md @@ -9,4 +9,4 @@ parent: Integration Sidecars Definitions: -- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.0/integrations/meta/v1/definition/definition.proto) +- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.1/integrations/meta/v1/definition/definition.proto) diff --git a/docs/integration/scheduler.v1.md b/docs/integration/scheduler.v1.md index bc38c8f7a..8f5ef87e8 100644 --- a/docs/integration/scheduler.v1.md +++ b/docs/integration/scheduler.v1.md @@ -9,5 +9,5 @@ parent: Integration Sidecars Definitions: -- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.0/integrations/scheduler/v1/definition/definition.proto) +- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.1/integrations/scheduler/v1/definition/definition.proto) diff --git a/docs/integration/scheduler.v2.md b/docs/integration/scheduler.v2.md index 55b26c127..5da0422d1 100644 --- a/docs/integration/scheduler.v2.md +++ b/docs/integration/scheduler.v2.md @@ -9,5 +9,5 @@ parent: Integration Sidecars Definitions: -- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.0/integrations/scheduler/v2/definition/definition.proto) +- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.1/integrations/scheduler/v2/definition/definition.proto) diff --git a/docs/integration/shutdown.v1.md b/docs/integration/shutdown.v1.md index c529b72ef..fb301437e 100644 --- a/docs/integration/shutdown.v1.md +++ b/docs/integration/shutdown.v1.md @@ -9,7 +9,7 @@ parent: Integration Sidecars Definitions: -- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.0/integrations/shutdown/v1/definition/shutdown.proto) +- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.1/integrations/shutdown/v1/definition/shutdown.proto) Operator will send shutdown request once all containers marked with annotation are stopped. diff --git a/docs/integration/storage.v2.md b/docs/integration/storage.v2.md index b649dcfc1..25f666e41 100644 --- a/docs/integration/storage.v2.md +++ b/docs/integration/storage.v2.md @@ -9,7 +9,7 @@ parent: Integration Sidecars Definitions: -- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.0/integrations/storage/v2/definition/storage.proto) +- [Service](https://github.com/arangodb/kube-arangodb/blob/1.3.1/integrations/storage/v2/definition/storage.proto) ## Configuration diff --git a/docs/platform/storage/gcs.md b/docs/platform/storage/gcs.md index ffc16b98f..aefb1f9d0 100644 --- a/docs/platform/storage/gcs.md +++ b/docs/platform/storage/gcs.md @@ -2,6 +2,7 @@ layout: page title: Google Cloud Storage parent: Storage +grand_parent: ArangoDBPlatform nav_order: 2 --- diff --git a/docs/platform/storage/minio.md b/docs/platform/storage/minio.md index a416ba005..b083cde1f 100644 --- a/docs/platform/storage/minio.md +++ b/docs/platform/storage/minio.md @@ -2,6 +2,7 @@ layout: page title: MinIO parent: Storage +grand_parent: ArangoDBPlatform nav_order: 3 --- diff --git a/docs/platform/storage/s3.md b/docs/platform/storage/s3.md index ac82a5f4a..7a5526d56 100644 --- a/docs/platform/storage/s3.md +++ b/docs/platform/storage/s3.md @@ -2,6 +2,7 @@ layout: page title: AWS S3 parent: Storage +grand_parent: ArangoDBPlatform nav_order: 1 --- diff --git a/integrations/authentication/v1/definition/definition.pb.gw.go b/integrations/authentication/v1/definition/definition.pb.gw.go index 13e404863..bd440d2bb 100644 --- a/integrations/authentication/v1/definition/definition.pb.gw.go +++ b/integrations/authentication/v1/definition/definition.pb.gw.go @@ -14,7 +14,7 @@ import ( "io" "net/http" - definition_8 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" + definition_9 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/grpc-ecosystem/grpc-gateway/v2/utilities" "google.golang.org/grpc" @@ -92,7 +92,7 @@ func local_request_AuthenticationV1_CreateToken_0(ctx context.Context, marshaler func request_AuthenticationV1_Identity_0(ctx context.Context, marshaler runtime.Marshaler, client AuthenticationV1Client, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var ( - protoReq definition_8.Empty + protoReq definition_9.Empty metadata runtime.ServerMetadata ) if req.Body != nil { @@ -104,7 +104,7 @@ func request_AuthenticationV1_Identity_0(ctx context.Context, marshaler runtime. func local_request_AuthenticationV1_Identity_0(ctx context.Context, marshaler runtime.Marshaler, server AuthenticationV1Server, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var ( - protoReq definition_8.Empty + protoReq definition_9.Empty metadata runtime.ServerMetadata ) msg, err := server.Identity(ctx, &protoReq) diff --git a/integrations/events/v1/async.go b/integrations/events/v1/async.go new file mode 100644 index 000000000..7f051567d --- /dev/null +++ b/integrations/events/v1/async.go @@ -0,0 +1,122 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + "time" + + "github.com/pkg/errors" + "google.golang.org/protobuf/proto" + + "github.com/arangodb/kube-arangodb/pkg/util/globals" +) + +func WithAsync[IN proto.Message, H RemoteStore[IN]](in H, size int, timeout time.Duration, delay time.Duration) RemoteStore[IN] { + return &asyncRemoteWriter[IN, H]{ + upstream: in, + cache: make(chan []IN, size), + timeout: timeout, + delay: delay, + } +} + +type asyncRemoteWriter[IN proto.Message, H RemoteStore[IN]] struct { + upstream H + + cache chan []IN + + timeout time.Duration + delay time.Duration +} + +func (a *asyncRemoteWriter[IN, H]) Init(ctx context.Context) error { + return a.upstream.Init(ctx) +} + +func (a *asyncRemoteWriter[IN, H]) Background(ctx context.Context) { + logger.Info("Async background started") + defer func() { + logger.Info("Async background completed") + }() + + for { + select { + case <-ctx.Done(): + close(a.cache) + for events := range a.cache { + // Cleanup the queue + a.emitEvents(events...) + } + return + case events := <-a.cache: + a.emitEvents(events...) + } + } +} + +func (a *asyncRemoteWriter[IN, H]) emitEvents(events ...IN) { + if len(events) == 0 { + return + } + + timeoutTimer := time.NewTimer(a.timeout) + defer timeoutTimer.Stop() + + delayTimer := time.NewTicker(a.delay) + defer delayTimer.Stop() + + for { + err := globals.GetGlobals().Timeouts().ArangoD().RunWithTimeout(context.Background(), func(ctxChild context.Context) error { + return a.upstream.Emit(ctxChild, events...) + }) + if err != nil { + logger.Err(err).Warn("Unable to send events batch, retry") + } else { + logger.Debug("Batch sent") + return + } + + select { + case <-delayTimer.C: + continue + case <-timeoutTimer.C: + logger.Error("Unable to send events in expected time") + return + } + } +} + +func (a *asyncRemoteWriter[IN, H]) Emit(ctx context.Context, events ...IN) error { + if len(events) == 0 { + return nil + } + + timeout := time.NewTimer(time.Second) + defer timeout.Stop() + + select { + case a.cache <- events: + return nil + case <-timeout.C: + return errors.Errorf("timeout waiting for events to be scheduled") + } +} diff --git a/integrations/events/v1/collection.go b/integrations/events/v1/collection.go new file mode 100644 index 000000000..d58d998bf --- /dev/null +++ b/integrations/events/v1/collection.go @@ -0,0 +1,47 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + "time" + + "github.com/arangodb/go-driver/v2/arangodb" + + "github.com/arangodb/kube-arangodb/pkg/util/cache" +) + +func withTTLIndex(in cache.Object[arangodb.Collection]) cache.Object[arangodb.Collection] { + return cache.NewObject(func(ctx context.Context) (arangodb.Collection, time.Duration, error) { + col, err := in.Get(ctx) + if err != nil { + return nil, 0, err + } + + if _, _, err := col.EnsureTTLIndex(ctx, []string{"created"}, int(DefaultTTL/time.Second), &arangodb.CreateTTLIndexOptions{ + Name: "system_events_created_ttl_index", + }); err != nil { + return nil, 0, err + } + + return col, time.Hour, nil + }) +} diff --git a/integrations/events/v1/configuration.go b/integrations/events/v1/configuration.go new file mode 100644 index 000000000..28c8f979c --- /dev/null +++ b/integrations/events/v1/configuration.go @@ -0,0 +1,105 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "time" + + shared "github.com/arangodb/kube-arangodb/pkg/apis/shared" + integrationsShared "github.com/arangodb/kube-arangodb/pkg/integrations/shared" + "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/errors" +) + +func NewConfiguration() Configuration { + return Configuration{} +} + +type Configuration struct { + integrationsShared.Endpoint + integrationsShared.Database + + Async ConfigurationAsync +} + +func (c Configuration) Validate() error { + return errors.Errors( + shared.PrefixResourceError("async", c.Async.Validate()), + shared.PrefixResourceError("endpoint", c.Endpoint.Validate()), + shared.PrefixResourceError("database", c.Database.Validate()), + ) +} + +type ConfigurationAsync struct { + Enabled bool + Size int + Retry ConfigurationRetry +} + +func (c ConfigurationAsync) Validate() error { + if !c.Enabled { + return nil + } + return errors.Errors( + shared.PrefixResourceErrorFunc("size", func() error { + if c.Size <= 0 { + return errors.Errorf("size must be greater than zero") + } + + return nil + }), + shared.PrefixResourceError("retry", c.Retry.Validate()), + ) +} + +type ConfigurationRetry struct { + Timeout time.Duration + Delay time.Duration +} + +func (c ConfigurationRetry) Validate() error { + return errors.Errors( + shared.PrefixResourceErrorFunc("timeout", func() error { + if c.Timeout <= 0 { + return errors.Errorf("timeout must be greater than zero") + } + + return nil + }), + shared.PrefixResourceErrorFunc("delay", func() error { + if c.Delay <= 0 { + return errors.Errorf("delay must be greater than zero") + } + + return nil + }), + ) +} + +func (c Configuration) With(mods ...util.ModR[Configuration]) Configuration { + n := c + + for _, mod := range mods { + n = mod(n) + } + + return n +} diff --git a/integrations/events/v1/consts.go b/integrations/events/v1/consts.go new file mode 100644 index 000000000..cb08474e5 --- /dev/null +++ b/integrations/events/v1/consts.go @@ -0,0 +1,28 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import "time" + +const ( + MaxEventCount = 1024 + DefaultTTL = 60 * 24 * time.Hour +) diff --git a/integrations/events/v1/definition/consts.go b/integrations/events/v1/definition/consts.go new file mode 100644 index 000000000..563a86f3b --- /dev/null +++ b/integrations/events/v1/definition/consts.go @@ -0,0 +1,25 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package definition + +const ( + Name = "events.v1" +) diff --git a/integrations/events/v1/definition/definition.pb.go b/integrations/events/v1/definition/definition.pb.go new file mode 100644 index 000000000..fd633f169 --- /dev/null +++ b/integrations/events/v1/definition/definition.pb.go @@ -0,0 +1,279 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v6.33.0 +// source: integrations/events/v1/definition/definition.proto + +package definition + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Event V1 Request +type EventsV1Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // List of the Events to produce as batch + Events []*Event `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` +} + +func (x *EventsV1Request) Reset() { + *x = EventsV1Request{} + if protoimpl.UnsafeEnabled { + mi := &file_integrations_events_v1_definition_definition_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EventsV1Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EventsV1Request) ProtoMessage() {} + +func (x *EventsV1Request) ProtoReflect() protoreflect.Message { + mi := &file_integrations_events_v1_definition_definition_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EventsV1Request.ProtoReflect.Descriptor instead. +func (*EventsV1Request) Descriptor() ([]byte, []int) { + return file_integrations_events_v1_definition_definition_proto_rawDescGZIP(), []int{0} +} + +func (x *EventsV1Request) GetEvents() []*Event { + if x != nil { + return x.Events + } + return nil +} + +// Event V1 Response +type EventsV1Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Number of the processed Events + Processed int32 `protobuf:"varint,1,opt,name=processed,proto3" json:"processed,omitempty"` + // Defines if production was sync or async + Completed bool `protobuf:"varint,2,opt,name=completed,proto3" json:"completed,omitempty"` + // Creation Timestamp of the events. Defaults to the current timestamp + Created *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=created,proto3,oneof" json:"created,omitempty"` +} + +func (x *EventsV1Response) Reset() { + *x = EventsV1Response{} + if protoimpl.UnsafeEnabled { + mi := &file_integrations_events_v1_definition_definition_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EventsV1Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EventsV1Response) ProtoMessage() {} + +func (x *EventsV1Response) ProtoReflect() protoreflect.Message { + mi := &file_integrations_events_v1_definition_definition_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EventsV1Response.ProtoReflect.Descriptor instead. +func (*EventsV1Response) Descriptor() ([]byte, []int) { + return file_integrations_events_v1_definition_definition_proto_rawDescGZIP(), []int{1} +} + +func (x *EventsV1Response) GetProcessed() int32 { + if x != nil { + return x.Processed + } + return 0 +} + +func (x *EventsV1Response) GetCompleted() bool { + if x != nil { + return x.Completed + } + return false +} + +func (x *EventsV1Response) GetCreated() *timestamppb.Timestamp { + if x != nil { + return x.Created + } + return nil +} + +var File_integrations_events_v1_definition_definition_proto protoreflect.FileDescriptor + +var file_integrations_events_v1_definition_definition_proto_rawDesc = []byte{ + 0x0a, 0x32, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x1a, 0x1f, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x2d, 0x69, + 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x38, 0x0a, 0x0f, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x56, 0x31, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x25, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x0d, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x95, 0x01, 0x0a, 0x10, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x73, 0x56, 0x31, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x70, + 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, + 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6d, + 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x63, 0x6f, + 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x39, 0x0a, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x48, 0x00, 0x52, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x88, + 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x32, 0x47, + 0x0a, 0x08, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x56, 0x31, 0x12, 0x3b, 0x0a, 0x04, 0x45, 0x6d, + 0x69, 0x74, 0x12, 0x17, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x73, 0x56, 0x31, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x56, 0x31, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x42, 0x45, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x6b, + 0x75, 0x62, 0x65, 0x2d, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x69, 0x6e, 0x74, + 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, + 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_integrations_events_v1_definition_definition_proto_rawDescOnce sync.Once + file_integrations_events_v1_definition_definition_proto_rawDescData = file_integrations_events_v1_definition_definition_proto_rawDesc +) + +func file_integrations_events_v1_definition_definition_proto_rawDescGZIP() []byte { + file_integrations_events_v1_definition_definition_proto_rawDescOnce.Do(func() { + file_integrations_events_v1_definition_definition_proto_rawDescData = protoimpl.X.CompressGZIP(file_integrations_events_v1_definition_definition_proto_rawDescData) + }) + return file_integrations_events_v1_definition_definition_proto_rawDescData +} + +var file_integrations_events_v1_definition_definition_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_integrations_events_v1_definition_definition_proto_goTypes = []interface{}{ + (*EventsV1Request)(nil), // 0: events.EventsV1Request + (*EventsV1Response)(nil), // 1: events.EventsV1Response + (*Event)(nil), // 2: events.Event + (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp +} +var file_integrations_events_v1_definition_definition_proto_depIdxs = []int32{ + 2, // 0: events.EventsV1Request.events:type_name -> events.Event + 3, // 1: events.EventsV1Response.created:type_name -> google.protobuf.Timestamp + 0, // 2: events.EventsV1.Emit:input_type -> events.EventsV1Request + 1, // 3: events.EventsV1.Emit:output_type -> events.EventsV1Response + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_integrations_events_v1_definition_definition_proto_init() } +func file_integrations_events_v1_definition_definition_proto_init() { + if File_integrations_events_v1_definition_definition_proto != nil { + return + } + file_integrations_events_v1_definition_event_proto_init() + if !protoimpl.UnsafeEnabled { + file_integrations_events_v1_definition_definition_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EventsV1Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_integrations_events_v1_definition_definition_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EventsV1Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_integrations_events_v1_definition_definition_proto_msgTypes[1].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_integrations_events_v1_definition_definition_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_integrations_events_v1_definition_definition_proto_goTypes, + DependencyIndexes: file_integrations_events_v1_definition_definition_proto_depIdxs, + MessageInfos: file_integrations_events_v1_definition_definition_proto_msgTypes, + }.Build() + File_integrations_events_v1_definition_definition_proto = out.File + file_integrations_events_v1_definition_definition_proto_rawDesc = nil + file_integrations_events_v1_definition_definition_proto_goTypes = nil + file_integrations_events_v1_definition_definition_proto_depIdxs = nil +} diff --git a/integrations/events/v1/definition/definition.proto b/integrations/events/v1/definition/definition.proto new file mode 100644 index 000000000..af6d07415 --- /dev/null +++ b/integrations/events/v1/definition/definition.proto @@ -0,0 +1,54 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +syntax = "proto3"; + +package events; + +import "google/protobuf/timestamp.proto"; +import "integrations/events/v1/definition/event.proto"; + +option go_package = "github.com/arangodb/kube-arangodb/integrations/events/v1/definition"; + +// EventsV1 Service implementation +service EventsV1 { + // Sends events to the server once the stream is closed + rpc Emit(stream EventsV1Request) returns (EventsV1Response); +} + +// Responses + +// Event V1 Request +message EventsV1Request { + // List of the Events to produce as batch + repeated Event events = 1; +} + +// Event V1 Response +message EventsV1Response { + // Number of the processed Events + int32 processed = 1; + + // Defines if production was sync or async + bool completed = 2; + + // Creation Timestamp of the events. Defaults to the current timestamp + optional google.protobuf.Timestamp created = 3; +} diff --git a/integrations/events/v1/definition/definition_grpc.pb.go b/integrations/events/v1/definition/definition_grpc.pb.go new file mode 100644 index 000000000..8ba84f724 --- /dev/null +++ b/integrations/events/v1/definition/definition_grpc.pb.go @@ -0,0 +1,140 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.33.0 +// source: integrations/events/v1/definition/definition.proto + +package definition + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + EventsV1_Emit_FullMethodName = "/events.EventsV1/Emit" +) + +// EventsV1Client is the client API for EventsV1 service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// EventsV1 Service implementation +type EventsV1Client interface { + // Sends events to the server once the stream is closed + Emit(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[EventsV1Request, EventsV1Response], error) +} + +type eventsV1Client struct { + cc grpc.ClientConnInterface +} + +func NewEventsV1Client(cc grpc.ClientConnInterface) EventsV1Client { + return &eventsV1Client{cc} +} + +func (c *eventsV1Client) Emit(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[EventsV1Request, EventsV1Response], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &EventsV1_ServiceDesc.Streams[0], EventsV1_Emit_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[EventsV1Request, EventsV1Response]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type EventsV1_EmitClient = grpc.ClientStreamingClient[EventsV1Request, EventsV1Response] + +// EventsV1Server is the server API for EventsV1 service. +// All implementations must embed UnimplementedEventsV1Server +// for forward compatibility. +// +// EventsV1 Service implementation +type EventsV1Server interface { + // Sends events to the server once the stream is closed + Emit(grpc.ClientStreamingServer[EventsV1Request, EventsV1Response]) error + mustEmbedUnimplementedEventsV1Server() +} + +// UnimplementedEventsV1Server must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedEventsV1Server struct{} + +func (UnimplementedEventsV1Server) Emit(grpc.ClientStreamingServer[EventsV1Request, EventsV1Response]) error { + return status.Errorf(codes.Unimplemented, "method Emit not implemented") +} +func (UnimplementedEventsV1Server) mustEmbedUnimplementedEventsV1Server() {} +func (UnimplementedEventsV1Server) testEmbeddedByValue() {} + +// UnsafeEventsV1Server may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to EventsV1Server will +// result in compilation errors. +type UnsafeEventsV1Server interface { + mustEmbedUnimplementedEventsV1Server() +} + +func RegisterEventsV1Server(s grpc.ServiceRegistrar, srv EventsV1Server) { + // If the following call pancis, it indicates UnimplementedEventsV1Server was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&EventsV1_ServiceDesc, srv) +} + +func _EventsV1_Emit_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(EventsV1Server).Emit(&grpc.GenericServerStream[EventsV1Request, EventsV1Response]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type EventsV1_EmitServer = grpc.ClientStreamingServer[EventsV1Request, EventsV1Response] + +// EventsV1_ServiceDesc is the grpc.ServiceDesc for EventsV1 service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var EventsV1_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "events.EventsV1", + HandlerType: (*EventsV1Server)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Emit", + Handler: _EventsV1_Emit_Handler, + ClientStreams: true, + }, + }, + Metadata: "integrations/events/v1/definition/definition.proto", +} diff --git a/integrations/events/v1/definition/event.pb.go b/integrations/events/v1/definition/event.pb.go new file mode 100644 index 000000000..7a35a1327 --- /dev/null +++ b/integrations/events/v1/definition/event.pb.go @@ -0,0 +1,235 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v6.33.0 +// source: integrations/events/v1/definition/event.proto + +package definition + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Event Message Definition +type Event struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Type of the Event + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` + // Creation Timestamp of the events. Defaults to the current timestamp + Created *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=created,proto3,oneof" json:"created,omitempty"` + // Service ID of the Event + ServiceId string `protobuf:"bytes,3,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"` + // Event Dimensions + Dimensions map[string]string `protobuf:"bytes,4,rep,name=dimensions,proto3" json:"dimensions,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Event Body values + Body map[string]float32 `protobuf:"bytes,5,rep,name=body,proto3" json:"body,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"fixed32,2,opt,name=value,proto3"` +} + +func (x *Event) Reset() { + *x = Event{} + if protoimpl.UnsafeEnabled { + mi := &file_integrations_events_v1_definition_event_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Event) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event) ProtoMessage() {} + +func (x *Event) ProtoReflect() protoreflect.Message { + mi := &file_integrations_events_v1_definition_event_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event.ProtoReflect.Descriptor instead. +func (*Event) Descriptor() ([]byte, []int) { + return file_integrations_events_v1_definition_event_proto_rawDescGZIP(), []int{0} +} + +func (x *Event) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *Event) GetCreated() *timestamppb.Timestamp { + if x != nil { + return x.Created + } + return nil +} + +func (x *Event) GetServiceId() string { + if x != nil { + return x.ServiceId + } + return "" +} + +func (x *Event) GetDimensions() map[string]string { + if x != nil { + return x.Dimensions + } + return nil +} + +func (x *Event) GetBody() map[string]float32 { + if x != nil { + return x.Body + } + return nil +} + +var File_integrations_events_v1_definition_event_proto protoreflect.FileDescriptor + +var file_integrations_events_v1_definition_event_proto_rawDesc = []byte{ + 0x0a, 0x2d, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe5, 0x02, 0x0a, 0x05, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x39, 0x0a, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x48, 0x00, 0x52, 0x07, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x88, 0x01, + 0x01, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x49, 0x64, + 0x12, 0x3d, 0x0a, 0x0a, 0x64, 0x69, 0x6d, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x04, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x2e, 0x44, 0x69, 0x6d, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x52, 0x0a, 0x64, 0x69, 0x6d, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, + 0x2b, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x42, 0x6f, 0x64, + 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x1a, 0x3d, 0x0a, 0x0f, + 0x44, 0x69, 0x6d, 0x65, 0x6e, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, + 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, + 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x37, 0x0a, 0x09, 0x42, + 0x6f, 0x64, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x02, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, + 0x42, 0x45, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, + 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x2d, 0x61, 0x72, 0x61, + 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, + 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_integrations_events_v1_definition_event_proto_rawDescOnce sync.Once + file_integrations_events_v1_definition_event_proto_rawDescData = file_integrations_events_v1_definition_event_proto_rawDesc +) + +func file_integrations_events_v1_definition_event_proto_rawDescGZIP() []byte { + file_integrations_events_v1_definition_event_proto_rawDescOnce.Do(func() { + file_integrations_events_v1_definition_event_proto_rawDescData = protoimpl.X.CompressGZIP(file_integrations_events_v1_definition_event_proto_rawDescData) + }) + return file_integrations_events_v1_definition_event_proto_rawDescData +} + +var file_integrations_events_v1_definition_event_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_integrations_events_v1_definition_event_proto_goTypes = []interface{}{ + (*Event)(nil), // 0: events.Event + nil, // 1: events.Event.DimensionsEntry + nil, // 2: events.Event.BodyEntry + (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp +} +var file_integrations_events_v1_definition_event_proto_depIdxs = []int32{ + 3, // 0: events.Event.created:type_name -> google.protobuf.Timestamp + 1, // 1: events.Event.dimensions:type_name -> events.Event.DimensionsEntry + 2, // 2: events.Event.body:type_name -> events.Event.BodyEntry + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_integrations_events_v1_definition_event_proto_init() } +func file_integrations_events_v1_definition_event_proto_init() { + if File_integrations_events_v1_definition_event_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_integrations_events_v1_definition_event_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Event); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_integrations_events_v1_definition_event_proto_msgTypes[0].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_integrations_events_v1_definition_event_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_integrations_events_v1_definition_event_proto_goTypes, + DependencyIndexes: file_integrations_events_v1_definition_event_proto_depIdxs, + MessageInfos: file_integrations_events_v1_definition_event_proto_msgTypes, + }.Build() + File_integrations_events_v1_definition_event_proto = out.File + file_integrations_events_v1_definition_event_proto_rawDesc = nil + file_integrations_events_v1_definition_event_proto_goTypes = nil + file_integrations_events_v1_definition_event_proto_depIdxs = nil +} diff --git a/integrations/events/v1/definition/event.proto b/integrations/events/v1/definition/event.proto new file mode 100644 index 000000000..b3e51e334 --- /dev/null +++ b/integrations/events/v1/definition/event.proto @@ -0,0 +1,41 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +syntax = "proto3"; + +package events; +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/arangodb/kube-arangodb/integrations/events/v1/definition"; + +// Event Message Definition +message Event { + // Type of the Event + string type = 1; + // Creation Timestamp of the events. Defaults to the current timestamp + optional google.protobuf.Timestamp created = 2; + // Service ID of the Event + string service_id = 3; + // Event Dimensions + map dimensions = 4; + // Event Body values + map body = 5; +} + diff --git a/integrations/events/v1/events_test.go b/integrations/events/v1/events_test.go new file mode 100644 index 000000000..9b96dcc34 --- /dev/null +++ b/integrations/events/v1/events_test.go @@ -0,0 +1,146 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" + + pbEventsV1 "github.com/arangodb/kube-arangodb/integrations/events/v1/definition" + "github.com/arangodb/kube-arangodb/pkg/util/shutdown" +) + +func Test_EventsStream(t *testing.T) { + ctx, c := context.WithCancel(shutdown.Context()) + defer c() + + client, cache := Client(t, ctx) + + em, err := client.Emit(ctx) + require.NoError(t, err) + + require.NoError(t, em.Send(&pbEventsV1.EventsV1Request{Events: []*pbEventsV1.Event{{ + Type: "TYPE", + }}})) + + require.NoError(t, em.Send(&pbEventsV1.EventsV1Request{Events: []*pbEventsV1.Event{{ + Type: "TYPE1", + }}})) + + require.NoError(t, em.Send(&pbEventsV1.EventsV1Request{Events: []*pbEventsV1.Event{{ + Type: "TYPE2", + }}})) + + resp, err := em.CloseAndRecv() + require.NoError(t, err) + + require.EqualValues(t, resp.Processed, 3) + + ret := cache.Events(t) + + require.Len(t, ret, 3) + require.Equal(t, "TYPE", ret[0].Type) + require.Equal(t, "TYPE1", ret[1].Type) + require.Equal(t, "TYPE2", ret[2].Type) +} + +func Test_EventsStream_Exceed(t *testing.T) { + ctx, c := context.WithCancel(shutdown.Context()) + defer c() + + client, cache := Client(t, ctx) + + em, err := client.Emit(ctx) + require.NoError(t, err) + + all := make([]*pbEventsV1.Event, MaxEventCount) + + for id := range all { + all[id] = &pbEventsV1.Event{ + Type: fmt.Sprintf("TYPE%d", id), + Dimensions: map[string]string{ + "ID": fmt.Sprintf("%d", id), + }, + ServiceId: fmt.Sprintf("TYPE%d", id), + Body: map[string]float32{ + "ID": float32(id), + }, + } + } + + require.NoError(t, em.Send(&pbEventsV1.EventsV1Request{Events: all})) + + require.NoError(t, em.Send(&pbEventsV1.EventsV1Request{Events: []*pbEventsV1.Event{all[0]}})) + + _, err = em.CloseAndRecv() + require.Error(t, err) + + require.Len(t, cache.Events(t), 0) +} + +func Test_EventsStream_Time(t *testing.T) { + ctx, c := context.WithCancel(shutdown.Context()) + defer c() + + start := time.Now() + + client, cache := Client(t, ctx) + + em, err := client.Emit(ctx) + require.NoError(t, err) + + require.NoError(t, em.Send(&pbEventsV1.EventsV1Request{Events: []*pbEventsV1.Event{{ + Type: "TYPE", + }}})) + + require.NoError(t, em.Send(&pbEventsV1.EventsV1Request{Events: []*pbEventsV1.Event{{ + Type: "TYPE1", + Created: timestamppb.New(start.Add(-time.Hour)), + }}})) + + time.Sleep(2 * time.Second) + + require.NoError(t, em.Send(&pbEventsV1.EventsV1Request{Events: []*pbEventsV1.Event{{ + Type: "TYPE2", + }}})) + + resp, err := em.CloseAndRecv() + require.NoError(t, err) + + require.EqualValues(t, resp.Processed, 3) + + ret := cache.Events(t) + + require.Len(t, ret, 3) + require.Equal(t, "TYPE", ret[0].Type) + require.Equal(t, resp.Created.AsTime().Unix(), ret[0].GetCreated().AsTime().Unix()) + require.Equal(t, "TYPE1", ret[1].Type) + require.Equal(t, start.Add(-time.Hour).Unix(), ret[1].GetCreated().AsTime().Unix()) + require.Equal(t, "TYPE2", ret[2].Type) + require.Equal(t, resp.Created.AsTime().Unix(), ret[2].GetCreated().AsTime().Unix()) + + require.Equal(t, ret[2].GetCreated().AsTime().Unix(), ret[0].GetCreated().AsTime().Unix()) +} diff --git a/integrations/events/v1/impl.go b/integrations/events/v1/impl.go new file mode 100644 index 000000000..14722e9d9 --- /dev/null +++ b/integrations/events/v1/impl.go @@ -0,0 +1,177 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + "io" + "time" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" + + pbEventsV1 "github.com/arangodb/kube-arangodb/integrations/events/v1/definition" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/svc" +) + +func New(cfg Configuration) (svc.Handler, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + + col := cfg.KVCollection(cfg.Endpoint, "_system", "_events") + + col = withTTLIndex(col) + + return newInternal(cfg, NewArangoRemoteStore[*pbEventsV1.Event](col)), nil +} + +func newInternal(cfg Configuration, c RemoteStore[*pbEventsV1.Event]) *implementation { + q := c + + if cfg.Async.Enabled { + q = WithAsync(q, cfg.Async.Size, cfg.Async.Retry.Timeout, cfg.Async.Retry.Delay) + } + + obj := &implementation{ + cfg: cfg, + remote: q, + } + + return obj +} + +var _ pbEventsV1.EventsV1Server = &implementation{} +var _ svc.Handler = &implementation{} + +type implementation struct { + pbEventsV1.UnimplementedEventsV1Server + + cfg Configuration + remote RemoteStore[*pbEventsV1.Event] +} + +func (i *implementation) Name() string { + return pbEventsV1.Name +} + +func (i *implementation) Health() svc.HealthState { + return svc.Healthy +} + +func (i *implementation) Register(registrar *grpc.Server) { + pbEventsV1.RegisterEventsV1Server(registrar, i) +} + +func (i *implementation) Gateway(ctx context.Context, mux *runtime.ServeMux) error { + return nil +} + +func (i *implementation) Background(ctx context.Context) { + i.init(ctx) + + svc.RunBackgroundSync(ctx, i.remote) +} + +func (i *implementation) init(ctx context.Context) { + time.Sleep(time.Second) + + timerT := time.NewTicker(time.Second) + defer timerT.Stop() + + for { + err := i.remote.Init(ctx) + if err == nil { + return + } + + logger.Err(err).Warn("Unable to init collection") + + select { + case <-timerT.C: + continue + case <-ctx.Done(): + return + } + } +} + +func (i *implementation) Emit(server pbEventsV1.EventsV1_EmitServer) error { + var events = make([]*pbEventsV1.Event, 0, MaxEventCount) + + start := time.Now().Truncate(time.Second) + + for { + msg, err := server.Recv() + + if errors.IsGRPCCode(err, codes.Canceled) { + return io.ErrUnexpectedEOF + } + + if errors.Is(err, io.EOF) { + break + } + + if err != nil { + return err + } + + if len(events)+len(msg.GetEvents()) > MaxEventCount { + return status.Error(codes.ResourceExhausted, "exceeded limit of the events per request") + } + + for _, ev := range msg.GetEvents() { + // Trim to seconds to keep cross-platform compatibility + if q := ev.Created; q == nil { + ev.Created = timestamppb.New(start) + } else { + ev.Created = timestamppb.New(q.AsTime().Truncate(time.Second)) + } + + events = append(events, ev) + } + } + + if len(events) == 0 { + return server.SendAndClose(&pbEventsV1.EventsV1Response{ + Created: timestamppb.New(start), + Processed: 0, + Completed: true, + }) + } + + if err := i.remote.Emit(server.Context(), events...); err != nil { + logger.Err(err).Int("events", len(events)).Warn("Failed to emit events") + return status.Error(codes.Internal, "Unable to emit events") + } + + logger.Int("events", len(events)).Info("Emitted events") + + return server.SendAndClose(&pbEventsV1.EventsV1Response{ + Created: timestamppb.New(start), + Processed: int32(len(events)), + Completed: !i.cfg.Async.Enabled, + }) +} diff --git a/integrations/events/v1/logger.go b/integrations/events/v1/logger.go new file mode 100644 index 000000000..97f655617 --- /dev/null +++ b/integrations/events/v1/logger.go @@ -0,0 +1,27 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "github.com/arangodb/kube-arangodb/pkg/logging" +) + +var logger = logging.Global().RegisterAndGetLogger("integration-events-v1", logging.Info) diff --git a/integrations/events/v1/remote.go b/integrations/events/v1/remote.go new file mode 100644 index 000000000..a82418299 --- /dev/null +++ b/integrations/events/v1/remote.go @@ -0,0 +1,73 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + + "github.com/pkg/errors" + "google.golang.org/protobuf/proto" + + "github.com/arangodb/go-driver/v2/arangodb" + + "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/cache" + ugrpc "github.com/arangodb/kube-arangodb/pkg/util/grpc" +) + +type RemoteStore[IN proto.Message] interface { + Init(ctx context.Context) error + + Emit(ctx context.Context, events ...IN) error +} + +func NewArangoRemoteStore[IN proto.Message](client cache.Object[arangodb.Collection]) RemoteStore[IN] { + return &arangoRemoteStore[IN]{ + client: client, + } +} + +type arangoRemoteStore[IN proto.Message] struct { + client cache.Object[arangodb.Collection] +} + +func (a *arangoRemoteStore[IN]) Init(ctx context.Context) error { + return a.client.Init(ctx) +} + +func (a *arangoRemoteStore[IN]) Emit(ctx context.Context, events ...IN) error { + if len(events) == 0 { + return nil + } + + col, err := a.client.Get(ctx) + if err != nil { + return err + } + + _, err = col.CreateDocuments(ctx, util.FormatList(events, func(a IN) ugrpc.Object[IN] { + return ugrpc.NewObject(a) + })) + if err != nil { + return errors.Wrapf(err, "Unable to save events") + } + return err +} diff --git a/integrations/events/v1/remote_test.go b/integrations/events/v1/remote_test.go new file mode 100644 index 000000000..6678e6b25 --- /dev/null +++ b/integrations/events/v1/remote_test.go @@ -0,0 +1,92 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + ugrpc "github.com/arangodb/kube-arangodb/pkg/util/grpc" +) + +type TestRemoteStore[IN proto.Message] interface { + RemoteStore[IN] + + Events(t *testing.T) []IN +} + +func NewArangoTestStore[IN proto.Message]() TestRemoteStore[IN] { + return &testRemoteStore[IN]{} +} + +type testRemoteStore[IN proto.Message] struct { + lock sync.Mutex + + events [][]byte +} + +func (r *testRemoteStore[IN]) Init(ctx context.Context) error { + return nil +} + +func (r *testRemoteStore[IN]) Events(t *testing.T) []IN { + r.lock.Lock() + defer r.lock.Unlock() + + var ret = make([]IN, len(r.events)) + + for i, e := range r.events { + v, err := ugrpc.Unmarshal[IN](e) + require.NoError(t, err) + ret[i] = v + } + + logger.Int("size", len(ret)).Info("Fetched Events") + + return ret +} + +func (r *testRemoteStore[IN]) Emit(ctx context.Context, events ...IN) error { + r.lock.Lock() + defer r.lock.Unlock() + + var res = make([][]byte, len(events)) + + logger.Info("Emitting events in testing") + + for id, z := range events { + data, err := ugrpc.Marshal(z) + if err != nil { + return err + } + + res[id] = data + } + + logger.Int("size", len(res)).Info("Emitted events in testing") + + r.events = append(r.events, res...) + return nil +} diff --git a/integrations/events/v1/service_test.go b/integrations/events/v1/service_test.go new file mode 100644 index 000000000..a842c9016 --- /dev/null +++ b/integrations/events/v1/service_test.go @@ -0,0 +1,74 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + pbEventsV1 "github.com/arangodb/kube-arangodb/integrations/events/v1/definition" + "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/svc" + "github.com/arangodb/kube-arangodb/pkg/util/tests/tgrpc" +) + +func Handler(remote RemoteStore[*pbEventsV1.Event], mods ...util.ModR[Configuration]) svc.Handler { + return newInternal(NewConfiguration().With(mods...), remote) +} + +func Server(t *testing.T, ctx context.Context, mods ...util.ModR[Configuration]) (svc.ServiceStarter, TestRemoteStore[*pbEventsV1.Event]) { + store := NewArangoTestStore[*pbEventsV1.Event]() + + var currentMods []util.ModR[Configuration] + + currentMods = append(currentMods, func(c Configuration) Configuration { + return c + }) + + currentMods = append(currentMods, mods...) + + local, err := svc.NewService(svc.Configuration{ + Address: "127.0.0.1:0", + Gateway: &svc.ConfigurationGateway{ + Address: "127.0.0.1:0", + }, + }, Handler(store, currentMods...)) + require.NoError(t, err) + + return local.Start(ctx), store +} + +func Client(t *testing.T, ctx context.Context, mods ...util.ModR[Configuration]) (pbEventsV1.EventsV1Client, TestRemoteStore[*pbEventsV1.Event]) { + start, store := Server(t, ctx, mods...) + + client := tgrpc.NewGRPCClient(t, ctx, pbEventsV1.NewEventsV1Client, start.Address()) + + return client, store +} + +func Test_Service(t *testing.T) { + ctx, c := context.WithCancel(context.Background()) + defer c() + + Client(t, ctx) +} diff --git a/integrations/inventory/v1/definition/service.pb.gw.go b/integrations/inventory/v1/definition/service.pb.gw.go index f2e5a6035..5203b1f0d 100644 --- a/integrations/inventory/v1/definition/service.pb.gw.go +++ b/integrations/inventory/v1/definition/service.pb.gw.go @@ -14,7 +14,7 @@ import ( "io" "net/http" - definition_8 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" + definition_9 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/grpc-ecosystem/grpc-gateway/v2/utilities" "google.golang.org/grpc" @@ -38,7 +38,7 @@ var ( func request_InventoryV1_Inventory_0(ctx context.Context, marshaler runtime.Marshaler, client InventoryV1Client, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var ( - protoReq definition_8.Empty + protoReq definition_9.Empty metadata runtime.ServerMetadata ) if req.Body != nil { @@ -50,7 +50,7 @@ func request_InventoryV1_Inventory_0(ctx context.Context, marshaler runtime.Mars func local_request_InventoryV1_Inventory_0(ctx context.Context, marshaler runtime.Marshaler, server InventoryV1Server, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var ( - protoReq definition_8.Empty + protoReq definition_9.Empty metadata runtime.ServerMetadata ) msg, err := server.Inventory(ctx, &protoReq) diff --git a/integrations/meta/v1/configuration.go b/integrations/meta/v1/configuration.go index 0de84eb7d..8fe264c86 100644 --- a/integrations/meta/v1/configuration.go +++ b/integrations/meta/v1/configuration.go @@ -24,8 +24,10 @@ import ( "fmt" "time" + shared "github.com/arangodb/kube-arangodb/pkg/apis/shared" integrationsShared "github.com/arangodb/kube-arangodb/pkg/integrations/shared" "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/strings" ) @@ -61,5 +63,8 @@ func (c Configuration) Key(parts ...string) string { } func (c Configuration) Validate() error { - return nil + return errors.Errors( + shared.PrefixResourceError("endpoint", c.Endpoint.Validate()), + shared.PrefixResourceError("database", c.Database.Validate()), + ) } diff --git a/integrations/meta/v1/impl.go b/integrations/meta/v1/impl.go index e1901803f..d79aa53b6 100644 --- a/integrations/meta/v1/impl.go +++ b/integrations/meta/v1/impl.go @@ -42,26 +42,21 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/svc" ) -func New(ctx context.Context, cfg Configuration) (svc.Handler, error) { - return newInternal(ctx, cfg) -} - -func newInternal(ctx context.Context, cfg Configuration) (*implementation, error) { - return newInternalWithRemoteCache(ctx, cfg, cache.NewRemoteCacheWithTTL[*Object](cfg.KVCollection(cfg.Endpoint, "_system", "_meta_store"), cfg.TTL)) -} - -func newInternalWithRemoteCache(ctx context.Context, cfg Configuration, c cache.RemoteCache[*Object]) (*implementation, error) { +func New(cfg Configuration) (svc.Handler, error) { if err := cfg.Validate(); err != nil { return nil, err } - obj := &implementation{ + col := cfg.KVCollection(cfg.Endpoint, "_system", "_meta_store") + + return newInternal(cfg, cache.NewRemoteCacheWithTTL[*Object](col, cfg.TTL)), nil +} + +func newInternal(cfg Configuration, c cache.RemoteCache[*Object]) *implementation { + return &implementation{ cfg: cfg, - ctx: ctx, cache: c, } - - return obj, nil } var _ pbMetaV1.MetaV1Server = &implementation{} @@ -72,7 +67,6 @@ type implementation struct { lock sync.RWMutex - ctx context.Context cfg Configuration cache cache.RemoteCache[*Object] @@ -90,6 +84,33 @@ func (i *implementation) Register(registrar *grpc.Server) { pbMetaV1.RegisterMetaV1Server(registrar, i) } +func (i *implementation) Background(ctx context.Context) { + i.init(ctx) +} + +func (i *implementation) init(ctx context.Context) { + time.Sleep(time.Second) + + timerT := time.NewTicker(time.Second) + defer timerT.Stop() + + for { + err := i.cache.Init(ctx) + if err == nil { + return + } + + logger.Err(err).Warn("Unable to init collection") + + select { + case <-timerT.C: + continue + case <-ctx.Done(): + return + } + } +} + func (i *implementation) Gateway(ctx context.Context, mux *runtime.ServeMux) error { return nil } diff --git a/integrations/meta/v1/service_test.go b/integrations/meta/v1/service_test.go index 16fccfd6e..0db3e2c35 100644 --- a/integrations/meta/v1/service_test.go +++ b/integrations/meta/v1/service_test.go @@ -22,7 +22,6 @@ package v1 import ( "context" - "encoding/json" "testing" "github.com/stretchr/testify/require" @@ -34,34 +33,8 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/tests/tgrpc" ) -func Test_Types(t *testing.T) { - z := `{ - "meta": { - "updatedAt": "2025-07-07T15:13:09Z" - }, - "object": { - "Object": { - "type_url": "type.googleapis.com/arangodb_platform_internal.metadata_store.GenAiProjectNames", - "value": "ChV0ZXN0X3Byb2plY3RfNmVhYWM3MjM=" - } - } -}` - - var obj Object - - require.NoError(t, json.Unmarshal([]byte(z), &obj)) - - n, err := json.Marshal(obj) - require.NoError(t, err) - - require.JSONEq(t, z, string(n)) -} - -func Handler(t *testing.T, ctx context.Context, mods ...util.ModR[Configuration]) svc.Handler { - handler, err := newInternalWithRemoteCache(ctx, NewConfiguration().With(mods...), tcache.NewRemoteCache[*Object]()) - require.NoError(t, err) - - return handler +func Handler(mods ...util.ModR[Configuration]) svc.Handler { + return newInternal(NewConfiguration().With(mods...), tcache.NewRemoteCache[*Object]()) } func Server(t *testing.T, ctx context.Context, mods ...util.ModR[Configuration]) svc.ServiceStarter { @@ -78,7 +51,7 @@ func Server(t *testing.T, ctx context.Context, mods ...util.ModR[Configuration]) Gateway: &svc.ConfigurationGateway{ Address: "127.0.0.1:0", }, - }, Handler(t, ctx, currentMods...)) + }, Handler(currentMods...)) require.NoError(t, err) return local.Start(ctx) diff --git a/integrations/meta/v1/types_test.go b/integrations/meta/v1/types_test.go new file mode 100644 index 000000000..1f05770d9 --- /dev/null +++ b/integrations/meta/v1/types_test.go @@ -0,0 +1,51 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_Types(t *testing.T) { + z := `{ + "meta": { + "updatedAt": "2025-07-07T15:13:09Z" + }, + "object": { + "Object": { + "type_url": "type.googleapis.com/arangodb_platform_internal.metadata_store.GenAiProjectNames", + "value": "ChV0ZXN0X3Byb2plY3RfNmVhYWM3MjM=" + } + } +}` + + var obj Object + + require.NoError(t, json.Unmarshal([]byte(z), &obj)) + + n, err := json.Marshal(obj) + require.NoError(t, err) + + require.JSONEq(t, z, string(n)) +} diff --git a/integrations/pong/v1/definition/pong.pb.go b/integrations/pong/v1/definition/pong.pb.go index ded9aa434..cf080ccb8 100644 --- a/integrations/pong/v1/definition/pong.pb.go +++ b/integrations/pong/v1/definition/pong.pb.go @@ -51,6 +51,8 @@ type PongV1PingResponse struct { // Current time in UTC Time *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=time,proto3" json:"time,omitempty"` + // Ticks since the start + Ticks int32 `protobuf:"varint,2,opt,name=ticks,proto3" json:"ticks,omitempty"` } func (x *PongV1PingResponse) Reset() { @@ -92,6 +94,13 @@ func (x *PongV1PingResponse) GetTime() *timestamppb.Timestamp { return nil } +func (x *PongV1PingResponse) GetTicks() int32 { + if x != nil { + return x.Ticks + } + return 0 +} + // PongV1 Services Call Response type PongV1ServicesResponse struct { state protoimpl.MessageState @@ -221,35 +230,37 @@ var file_integrations_pong_v1_definition_pong_proto_rawDesc = []byte{ 0x6f, 0x1a, 0x2d, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x22, 0x44, 0x0a, 0x12, 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, + 0x22, 0x5a, 0x0a, 0x12, 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x52, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x22, 0x49, 0x0a, 0x16, 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x2f, 0x0a, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x6f, 0x6e, 0x67, 0x2e, 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x73, 0x22, 0x57, 0x0a, 0x0d, 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x32, 0x96, 0x01, 0x0a, 0x06, 0x50, - 0x6f, 0x6e, 0x67, 0x56, 0x31, 0x12, 0x53, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x0d, 0x2e, - 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x70, - 0x6f, 0x6e, 0x67, 0x2e, 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x22, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1c, 0x12, 0x1a, - 0x2f, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x70, 0x6f, - 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x37, 0x0a, 0x08, 0x53, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x12, 0x0d, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, 0x2e, 0x70, 0x6f, 0x6e, 0x67, 0x2e, 0x50, 0x6f, 0x6e, - 0x67, 0x56, 0x31, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x2d, - 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x70, 0x6f, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, - 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x69, 0x63, 0x6b, 0x73, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x74, 0x69, 0x63, 0x6b, 0x73, 0x22, 0x49, 0x0a, 0x16, + 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x70, 0x6f, 0x6e, 0x67, 0x2e, + 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x52, 0x08, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x22, 0x57, 0x0a, 0x0d, 0x50, 0x6f, 0x6e, 0x67, 0x56, + 0x31, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, + 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, + 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, + 0x32, 0x96, 0x01, 0x0a, 0x06, 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, 0x12, 0x53, 0x0a, 0x04, 0x50, + 0x69, 0x6e, 0x67, 0x12, 0x0d, 0x2e, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x45, 0x6d, 0x70, + 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x70, 0x6f, 0x6e, 0x67, 0x2e, 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, + 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x22, 0x82, 0xd3, + 0xe4, 0x93, 0x02, 0x1c, 0x12, 0x1a, 0x2f, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x2f, 0x70, 0x6f, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x69, 0x6e, 0x67, + 0x12, 0x37, 0x0a, 0x08, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x12, 0x0d, 0x2e, 0x73, + 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, 0x2e, 0x70, 0x6f, + 0x6e, 0x67, 0x2e, 0x50, 0x6f, 0x6e, 0x67, 0x56, 0x31, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, + 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x2d, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x69, + 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x70, 0x6f, 0x6e, 0x67, + 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/integrations/pong/v1/definition/pong.pb.gw.go b/integrations/pong/v1/definition/pong.pb.gw.go index 9f2279ae7..19a1500af 100644 --- a/integrations/pong/v1/definition/pong.pb.gw.go +++ b/integrations/pong/v1/definition/pong.pb.gw.go @@ -14,7 +14,7 @@ import ( "io" "net/http" - definition_8 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" + definition_9 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/grpc-ecosystem/grpc-gateway/v2/utilities" "google.golang.org/grpc" @@ -38,7 +38,7 @@ var ( func request_PongV1_Ping_0(ctx context.Context, marshaler runtime.Marshaler, client PongV1Client, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var ( - protoReq definition_8.Empty + protoReq definition_9.Empty metadata runtime.ServerMetadata ) if req.Body != nil { @@ -50,7 +50,7 @@ func request_PongV1_Ping_0(ctx context.Context, marshaler runtime.Marshaler, cli func local_request_PongV1_Ping_0(ctx context.Context, marshaler runtime.Marshaler, server PongV1Server, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var ( - protoReq definition_8.Empty + protoReq definition_9.Empty metadata runtime.ServerMetadata ) msg, err := server.Ping(ctx, &protoReq) diff --git a/integrations/pong/v1/definition/pong.proto b/integrations/pong/v1/definition/pong.proto index 004b08d30..1a69cc14f 100644 --- a/integrations/pong/v1/definition/pong.proto +++ b/integrations/pong/v1/definition/pong.proto @@ -50,6 +50,9 @@ service PongV1 { message PongV1PingResponse { // Current time in UTC google.protobuf.Timestamp time = 1; + + // Ticks since the start + int32 ticks = 2; } // PongV1 Services Call Response diff --git a/integrations/pong/v1/impl.go b/integrations/pong/v1/impl.go index 1cf16afa1..3f5eee791 100644 --- a/integrations/pong/v1/impl.go +++ b/integrations/pong/v1/impl.go @@ -74,11 +74,33 @@ func New(services ...Service) (svc.Handler, error) { var _ pbPongV1.PongV1Server = &impl{} var _ svc.Handler = &impl{} +var _ svc.Background = &impl{} type impl struct { services []Service pbPongV1.UnimplementedPongV1Server + + ticks int32 +} + +func (i *impl) Background(ctx context.Context) { + logger.Info("Async background started") + defer func() { + logger.Info("Async background completed") + }() + + tickerT := time.NewTicker(time.Second) + defer tickerT.Stop() + + for { + select { + case <-tickerT.C: + i.ticks++ + case <-ctx.Done(): + return + } + } } func (i *impl) Name() string { @@ -98,7 +120,7 @@ func (i *impl) Gateway(ctx context.Context, mux *runtime.ServeMux) error { } func (i *impl) Ping(context.Context, *pbSharedV1.Empty) (*pbPongV1.PongV1PingResponse, error) { - return &pbPongV1.PongV1PingResponse{Time: timestamppb.New(time.Now().UTC())}, nil + return &pbPongV1.PongV1PingResponse{Time: timestamppb.New(time.Now().UTC()), Ticks: i.ticks}, nil } func (i *impl) Services(context.Context, *pbSharedV1.Empty) (*pbPongV1.PongV1ServicesResponse, error) { diff --git a/integrations/pong/v1/logger.go b/integrations/pong/v1/logger.go new file mode 100644 index 000000000..15d802508 --- /dev/null +++ b/integrations/pong/v1/logger.go @@ -0,0 +1,27 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "github.com/arangodb/kube-arangodb/pkg/logging" +) + +var logger = logging.Global().RegisterAndGetLogger("integration-pong-v1", logging.Info) diff --git a/integrations/shutdown/v1/definition/shutdown.pb.gw.go b/integrations/shutdown/v1/definition/shutdown.pb.gw.go index 2cb88bde2..e00bf93ea 100644 --- a/integrations/shutdown/v1/definition/shutdown.pb.gw.go +++ b/integrations/shutdown/v1/definition/shutdown.pb.gw.go @@ -14,7 +14,7 @@ import ( "io" "net/http" - definition_8 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" + definition_9 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/grpc-ecosystem/grpc-gateway/v2/utilities" "google.golang.org/grpc" @@ -38,7 +38,7 @@ var ( func request_ShutdownV1_Shutdown_0(ctx context.Context, marshaler runtime.Marshaler, client ShutdownV1Client, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var ( - protoReq definition_8.Empty + protoReq definition_9.Empty metadata runtime.ServerMetadata ) if req.Body != nil { @@ -50,7 +50,7 @@ func request_ShutdownV1_Shutdown_0(ctx context.Context, marshaler runtime.Marsha func local_request_ShutdownV1_Shutdown_0(ctx context.Context, marshaler runtime.Marshaler, server ShutdownV1Server, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var ( - protoReq definition_8.Empty + protoReq definition_9.Empty metadata runtime.ServerMetadata ) msg, err := server.Shutdown(ctx, &protoReq) diff --git a/pkg/deployment/resources/arango_profiles.go b/pkg/deployment/resources/arango_profiles.go index 266f83345..ff09a210b 100644 --- a/pkg/deployment/resources/arango_profiles.go +++ b/pkg/deployment/resources/arango_profiles.go @@ -235,6 +235,7 @@ func (r *Resources) EnsureArangoProfiles(ctx context.Context, cachedStatus inspe return nil, false }), gen(utilConstants.ProfilesIntegrationMeta, utilConstants.ProfilesIntegrationV1, always(integrationsSidecar.IntegrationMetaV1{})), + gen(utilConstants.ProfilesIntegrationEvents, utilConstants.ProfilesIntegrationV1, always(integrationsSidecar.IntegrationEventsV1{})), gen(utilConstants.ProfilesIntegrationStorage, utilConstants.ProfilesIntegrationV2, func() (integrationsSidecar.Integration, bool) { if v, err := cachedStatus.ArangoPlatformStorage().V1Beta1(); err == nil { if p, ok := v.GetSimple(deploymentName); ok { diff --git a/pkg/integrations/events_v1.go b/pkg/integrations/events_v1.go new file mode 100644 index 000000000..ec1a579d6 --- /dev/null +++ b/pkg/integrations/events_v1.go @@ -0,0 +1,69 @@ +// +// DISCLAIMER +// +// Copyright 2024-2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package integrations + +import ( + "context" + "time" + + "github.com/spf13/cobra" + + pbImplEventsV1 "github.com/arangodb/kube-arangodb/integrations/events/v1" + pbEventsV1 "github.com/arangodb/kube-arangodb/integrations/events/v1/definition" + integrationsShared "github.com/arangodb/kube-arangodb/pkg/integrations/shared" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/svc" +) + +func init() { + registerer.Register(pbEventsV1.Name, func() Integration { + return &eventsV1{} + }) +} + +type eventsV1 struct { + config pbImplEventsV1.Configuration +} + +func (a eventsV1) Name() string { + return pbEventsV1.Name +} + +func (a *eventsV1) Description() string { + return "Enable EventsV1 Integration Service" +} + +func (a *eventsV1) Register(cmd *cobra.Command, fs FlagEnvHandler) error { + return errors.Errors( + fs.BoolVar(&a.config.Async.Enabled, "async", true, "Enables async injection of the events"), + fs.IntVar(&a.config.Async.Size, "async.size", 16, "Size of the async queue"), + fs.DurationVar(&a.config.Async.Retry.Delay, "async.retry.delay", time.Second, "Delay of the retries"), + fs.DurationVar(&a.config.Async.Retry.Timeout, "async.retry.timeout", time.Minute, "Timeout for the event injection"), + ) +} + +func (a *eventsV1) Handler(ctx context.Context, cmd *cobra.Command) (svc.Handler, error) { + if err := integrationsShared.FillAll(cmd, &a.config.Endpoint, &a.config.Database); err != nil { + return nil, err + } + + return pbImplEventsV1.New(a.config) +} diff --git a/pkg/integrations/meta_v1.go b/pkg/integrations/meta_v1.go index 5bc2c6530..e50665e5d 100644 --- a/pkg/integrations/meta_v1.go +++ b/pkg/integrations/meta_v1.go @@ -62,5 +62,5 @@ func (a *metaV1) Handler(ctx context.Context, cmd *cobra.Command) (svc.Handler, return nil, err } - return pbImplMetaV1.New(ctx, a.config) + return pbImplMetaV1.New(a.config) } diff --git a/pkg/integrations/register.go b/pkg/integrations/register.go index 491313c72..49bec3610 100644 --- a/pkg/integrations/register.go +++ b/pkg/integrations/register.go @@ -203,7 +203,7 @@ func (c *configuration) runWithContext(ctx context.Context, cmd *cobra.Command) return errors.Wrapf(err, "Unable to parse external config") } - var internalHandlers, externalHandlers, healthHandlers []svc.Handler + var internalHandlers, externalHandlers, healthHandlers, allHandlers []svc.Handler var services []pbImplPongV1.Service @@ -215,6 +215,7 @@ func (c *configuration) runWithContext(ctx context.Context, cmd *cobra.Command) internalHandlers = append(internalHandlers, pong) externalHandlers = append(externalHandlers, pong) healthHandlers = append(healthHandlers, pong) + allHandlers = append(allHandlers, pong) for _, handler := range c.registered { if ok, err := cmd.Flags().GetBool(fmt.Sprintf("integration.%s", handler.Name())); err != nil { @@ -252,6 +253,8 @@ func (c *configuration) runWithContext(ctx context.Context, cmd *cobra.Command) if svc, err := handler.Handler(ctx, cmd); err != nil { return err } else { + allHandlers = append(allHandlers, svc) + if internalEnabled { internalHandlers = append(internalHandlers, svc) } @@ -280,6 +283,26 @@ func (c *configuration) runWithContext(ctx context.Context, cmd *cobra.Command) logger.Str("address", healthHandler.Address()).Bool("ssl", healthConfig.TLSOptions != nil).Info("Health handler started") + return c.startBackgroundersWithContext(ctx, health, internalConfig, externalConfig, allHandlers, internalHandlers, externalHandlers) +} +func (c *configuration) startBackgroundersWithContext(ctx context.Context, health svc.HealthService, internalConfig, externalConfig svc.Configuration, allHandlers, internalHandlers, externalHandlers []svc.Handler) error { + var wg sync.WaitGroup + + defer wg.Wait() + + for _, handler := range allHandlers { + wg.Add(1) + z := svc.RunBackground(handler) + defer func(in context.CancelFunc) { + defer wg.Done() + in() + }(z) + } + + return c.startServerWithContext(ctx, health, internalConfig, externalConfig, internalHandlers, externalHandlers) +} + +func (c *configuration) startServerWithContext(ctx context.Context, health svc.HealthService, internalConfig, externalConfig svc.Configuration, internalHandlers, externalHandlers []svc.Handler) error { var wg sync.WaitGroup var internal, external error diff --git a/pkg/integrations/shared/database.go b/pkg/integrations/shared/database.go index d89a31658..86265692b 100644 --- a/pkg/integrations/shared/database.go +++ b/pkg/integrations/shared/database.go @@ -184,6 +184,6 @@ func (d *Database) KVCollectionFromClient(clientO cache.Object[arangodb.Client], return nil, 0, err } - return col, 24 * time.Hour, nil + return col, time.Hour, nil }) } diff --git a/pkg/integrations/sidecar/integration.events.v1.go b/pkg/integrations/sidecar/integration.events.v1.go new file mode 100644 index 000000000..997240aa8 --- /dev/null +++ b/pkg/integrations/sidecar/integration.events.v1.go @@ -0,0 +1,56 @@ +// +// DISCLAIMER +// +// Copyright 2024-2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package sidecar + +import ( + core "k8s.io/api/core/v1" +) + +type IntegrationEventsV1 struct { + Core *Core +} + +func (i IntegrationEventsV1) Name() []string { + return []string{"EVENTS", "V1"} +} + +func (i IntegrationEventsV1) Validate() error { + return nil +} + +func (i IntegrationEventsV1) Envs() ([]core.EnvVar, error) { + var envs = []core.EnvVar{ + { + Name: "INTEGRATION_EVENTS_V1", + Value: "true", + }, + } + + return i.Core.Envs(i, envs...), nil +} + +func (i IntegrationEventsV1) GlobalEnvs() ([]core.EnvVar, error) { + return nil, nil +} + +func (i IntegrationEventsV1) Volumes() ([]core.Volume, []core.VolumeMount, error) { + return nil, nil, nil +} diff --git a/pkg/util/cache/object.go b/pkg/util/cache/object.go index 9ff84676e..73a0307c5 100644 --- a/pkg/util/cache/object.go +++ b/pkg/util/cache/object.go @@ -37,6 +37,7 @@ func NewObject[T any](caller ObjectFetcher[T]) Object[T] { type ObjectFetcher[T any] func(ctx context.Context) (T, time.Duration, error) type Object[T any] interface { + Init(context.Context) error Get(ctx context.Context) (T, error) } @@ -49,6 +50,11 @@ type object[T any] struct { obj T } +func (o *object[T]) Init(ctx context.Context) error { + _, err := o.Get(ctx) + return err +} + func (o *object[T]) Get(ctx context.Context) (T, error) { o.lock.Lock() defer o.lock.Unlock() diff --git a/pkg/util/cache/remote_cache.go b/pkg/util/cache/remote_cache.go index 8c358c0fc..3b607ffb9 100644 --- a/pkg/util/cache/remote_cache.go +++ b/pkg/util/cache/remote_cache.go @@ -85,6 +85,9 @@ type RemoteCache[T RemoteCacheObject] interface { // List lists the keys matching predicate from the server // Always misses the cache List(ctx context.Context, size int, prefix string) (util.NextIterator[[]string], error) + + // Init inits the cache + Init(ctx context.Context) error } type remoteCache[T RemoteCacheObject] struct { @@ -95,6 +98,10 @@ type remoteCache[T RemoteCacheObject] struct { cache Cache[string, T] } +func (r *remoteCache[T]) Init(ctx context.Context) error { + return r.collection.Init(ctx) +} + func (r *remoteCache[T]) Put(ctx context.Context, key string, obj T) error { r.lock.Lock() defer r.lock.Unlock() diff --git a/pkg/util/constants/profiles.go b/pkg/util/constants/profiles.go index 9d64062f1..f7f7afa05 100644 --- a/pkg/util/constants/profiles.go +++ b/pkg/util/constants/profiles.go @@ -41,6 +41,7 @@ const ( ProfilesIntegrationStorage = "storage" ProfilesIntegrationShutdown = "shutdown" ProfilesIntegrationMeta = "meta" + ProfilesIntegrationEvents = "events" ) const ( diff --git a/pkg/util/context.go b/pkg/util/context.go index 5b6d81077..dcab39b27 100644 --- a/pkg/util/context.go +++ b/pkg/util/context.go @@ -120,3 +120,19 @@ func WithKubernetesPatch[P1 meta.Object](ctx context.Context, obj string, client } type ContextKey string + +func RunContextAsync(ctx context.Context, in func(ctx context.Context)) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + + done := make(chan struct{}) + + go func() { + defer close(done) + in(ctx) + }() + + return func() { + cancel() + <-done + } +} diff --git a/pkg/util/grpc/grpc.go b/pkg/util/grpc/grpc.go index 876c6470e..ee63f0f76 100644 --- a/pkg/util/grpc/grpc.go +++ b/pkg/util/grpc/grpc.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" - "google.golang.org/protobuf/encoding/protojson" proto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -145,35 +144,3 @@ func GRPCAnyCastAs[T proto.Message](in *anypb.Any, v T) error { return nil } - -func NewGRPC[T proto.Message](in T) GRPC[T] { - return GRPC[T]{ - Object: in, - } -} - -type GRPC[T proto.Message] struct { - Object T -} - -func (g *GRPC[T]) UnmarshalJSON(data []byte) error { - return g.UnmarshalJSONOpts(data) -} - -func (g *GRPC[T]) UnmarshalJSONOpts(data []byte, opts ...util.Mod[protojson.UnmarshalOptions]) error { - o, err := Unmarshal[T](data, opts...) - if err != nil { - return err - } - - g.Object = o - return nil -} - -func (g GRPC[T]) MarshalJSON() ([]byte, error) { - return g.MarshalJSONOpts() -} - -func (g GRPC[T]) MarshalJSONOpts(opts ...util.Mod[protojson.MarshalOptions]) ([]byte, error) { - return Marshal[T](g.Object, opts...) -} diff --git a/pkg/util/grpc/object.go b/pkg/util/grpc/object.go index d19449540..e19dcf2ce 100644 --- a/pkg/util/grpc/object.go +++ b/pkg/util/grpc/object.go @@ -20,7 +20,12 @@ package grpc -import "google.golang.org/protobuf/proto" +import ( + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + "github.com/arangodb/kube-arangodb/pkg/util" +) func NewObject[IN proto.Message](in IN) Object[IN] { return Object[IN]{Object: in} @@ -30,17 +35,24 @@ type Object[IN proto.Message] struct { Object IN } -func (obj Object[IN]) MarshalJSON() ([]byte, error) { - return Marshal[IN](obj.Object) +func (g *Object[T]) UnmarshalJSON(data []byte) error { + return g.UnmarshalJSONOpts(data) } -func (obj *Object[IN]) UnmarshalJSON(data []byte) error { - z, err := Unmarshal[IN](data) +func (g *Object[T]) UnmarshalJSONOpts(data []byte, opts ...util.Mod[protojson.UnmarshalOptions]) error { + o, err := Unmarshal[T](data, opts...) if err != nil { return err } - obj.Object = z - + g.Object = o return nil } + +func (g Object[T]) MarshalJSON() ([]byte, error) { + return g.MarshalJSONOpts() +} + +func (g Object[T]) MarshalJSONOpts(opts ...util.Mod[protojson.MarshalOptions]) ([]byte, error) { + return Marshal[T](g.Object, opts...) +} diff --git a/pkg/util/svc/background.go b/pkg/util/svc/background.go new file mode 100644 index 000000000..577524871 --- /dev/null +++ b/pkg/util/svc/background.go @@ -0,0 +1,47 @@ +// +// DISCLAIMER +// +// Copyright 2025 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package svc + +import ( + "context" + + "github.com/arangodb/kube-arangodb/pkg/util" +) + +type Background interface { + Background(ctx context.Context) +} + +func RunBackgroundSync(ctx context.Context, in any) { + if h, ok := in.(Background); ok { + h.Background(ctx) + } +} + +func RunBackground(in any) context.CancelFunc { + if h, ok := in.(Background); ok { + return util.RunContextAsync(context.Background(), h.Background) + } + + return func() { + + } +} diff --git a/pkg/util/tests/cache/remote_cache.go b/pkg/util/tests/cache/remote_cache.go index 2af306260..447fe2296 100644 --- a/pkg/util/tests/cache/remote_cache.go +++ b/pkg/util/tests/cache/remote_cache.go @@ -42,6 +42,10 @@ type localRemoteCache[T cache.RemoteCacheObject] struct { objects map[string]json.RawMessage } +func (l *localRemoteCache[T]) Init(ctx context.Context) error { + return nil +} + func (l *localRemoteCache[T]) Put(ctx context.Context, key string, obj T) error { l.lock.Lock() defer l.lock.Unlock()