diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 6a6e8cc7..5aba256a 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -305,6 +305,15 @@ func (container *Container) EventsQueueConfiguration() (config services.PushQueu func (container *Container) EventsQueue() (queue services.PushQueue) { container.logger.Debug("creating events services.PushQueue") + if isLocal() { + return services.NewInMemoryPushQueue( + container.Logger(), + container.Tracer(), + container.EventsQueueConfiguration(), + func() *services.EventDispatcher { return container.eventDispatcher }, + ) + } + return services.NewGooglePushQueue( container.Logger(), container.Tracer(), diff --git a/api/pkg/services/in_memory_push_queue.go b/api/pkg/services/in_memory_push_queue.go new file mode 100644 index 00000000..c57ba40f --- /dev/null +++ b/api/pkg/services/in_memory_push_queue.go @@ -0,0 +1,59 @@ +package services + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/NdoleStudio/httpsms/pkg/telemetry" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" +) + +type inMemoryPushQueue struct { + getEventDispatcher func() *EventDispatcher + queueConfig PushQueueConfig + logger telemetry.Logger + tracer telemetry.Tracer +} + +// NewGooglePushQueue creates a new googlePushQueue +func NewInMemoryPushQueue( + logger telemetry.Logger, + tracer telemetry.Tracer, + queueConfig PushQueueConfig, + getEventDispatcher func() *EventDispatcher, +) PushQueue { + return &inMemoryPushQueue{ + tracer: tracer, + logger: logger, + queueConfig: queueConfig, + getEventDispatcher: getEventDispatcher, + } +} + +// Enqueue a task to the queue +func (queue *inMemoryPushQueue) Enqueue(ctx context.Context, task *PushQueueTask, timeout time.Duration) (queueID string, err error) { + ctx, span := queue.tracer.Start(ctx) + defer span.End() + + ctxLogger := queue.tracer.CtxLogger(queue.logger, span) + queueID = uuid.New().String() + + go func() { + time.Sleep(timeout) + var event cloudevents.Event + json.Unmarshal(task.Body, &event) + queue.getEventDispatcher().DispatchSync(ctx, event) + }() + + ctxLogger.Info(fmt.Sprintf( + "item added to [%s] queue with id [%s] and schedule [%s]", + queue.queueConfig.Name, + queueID, + time.Now().UTC().Add(timeout), + )) + + return queueID, nil +}