From e1c8c105a8801937611c43ee3782a61585681956 Mon Sep 17 00:00:00 2001 From: Eriksson Monteiro Date: Wed, 15 Mar 2023 18:30:10 +0000 Subject: [PATCH 1/3] handle send event in a local setup (dev. localhost) --- api/pkg/di/container.go | 4 +++ api/pkg/services/event_dispatcher_service.go | 34 ++++++++++++++------ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 6a6e8cc7..82a60503 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -305,6 +305,10 @@ func (container *Container) EventsQueueConfiguration() (config services.PushQueu func (container *Container) EventsQueue() (queue services.PushQueue) { container.logger.Debug("creating events services.PushQueue") + if isLocal() { + return nil + } + return services.NewGooglePushQueue( container.Logger(), container.Tracer(), diff --git a/api/pkg/services/event_dispatcher_service.go b/api/pkg/services/event_dispatcher_service.go index 43b4d1a9..35d5c4de 100644 --- a/api/pkg/services/event_dispatcher_service.go +++ b/api/pkg/services/event_dispatcher_service.go @@ -13,6 +13,7 @@ import ( "github.com/NdoleStudio/httpsms/pkg/telemetry" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/palantir/stacktrace" + "github.com/google/uuid" ) // EventDispatcher dispatches a new event @@ -72,15 +73,30 @@ func (dispatcher *EventDispatcher) DispatchWithTimeout(ctx context.Context, even return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) } - task, err := dispatcher.createCloudTask(event) - if err != nil { - msg := fmt.Sprintf("cannot create cloud task for event [%s] with id [%s]", event.Type(), event.ID()) - return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - if queueID, err = dispatcher.queue.Enqueue(ctx, task, timeout); err != nil { - msg := fmt.Sprintf("cannot enqueue event with ID [%s] and type [%s]", event.ID(), event.Type()) - return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + if dispatcher.queue == nil { + ctxLogger := dispatcher.tracer.CtxLogger(dispatcher.logger, span) + queueID := uuid.New() + go func() { + time.Sleep(timeout) + dispatcher.DispatchSync(ctx, event) + }() + ctxLogger.Info(fmt.Sprintf( + "item added to [%s] queue with id [%s] and schedule [%s]", + dispatcher.queueConfig.Name, + queueID, + time.Now().UTC().Add(timeout), + )) + } else { + task, err := dispatcher.createCloudTask(event) + if err != nil { + msg := fmt.Sprintf("cannot create cloud task for event [%s] with id [%s]", event.Type(), event.ID()) + return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + if queueID, err = dispatcher.queue.Enqueue(ctx, task, timeout); err != nil { + msg := fmt.Sprintf("cannot enqueue event with ID [%s] and type [%s]", event.ID(), event.Type()) + return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } } return queueID, nil From 6e4e47318c5d7f4d02a11df0542d8888635bb215 Mon Sep 17 00:00:00 2001 From: Eriksson Monteiro Date: Sun, 19 Mar 2023 21:58:59 +0000 Subject: [PATCH 2/3] refactor code and create a local implementation of the PushQueue interface: InMemoryPushQueue --- api/pkg/di/container.go | 7 ++- api/pkg/services/event_dispatcher_service.go | 34 ++++-------- api/pkg/services/in_memory_push_queue.go | 57 ++++++++++++++++++++ 3 files changed, 72 insertions(+), 26 deletions(-) create mode 100644 api/pkg/services/in_memory_push_queue.go diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 82a60503..5aba256a 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -306,7 +306,12 @@ func (container *Container) EventsQueue() (queue services.PushQueue) { container.logger.Debug("creating events services.PushQueue") if isLocal() { - return nil + return services.NewInMemoryPushQueue( + container.Logger(), + container.Tracer(), + container.EventsQueueConfiguration(), + func() *services.EventDispatcher { return container.eventDispatcher }, + ) } return services.NewGooglePushQueue( diff --git a/api/pkg/services/event_dispatcher_service.go b/api/pkg/services/event_dispatcher_service.go index 35d5c4de..43b4d1a9 100644 --- a/api/pkg/services/event_dispatcher_service.go +++ b/api/pkg/services/event_dispatcher_service.go @@ -13,7 +13,6 @@ import ( "github.com/NdoleStudio/httpsms/pkg/telemetry" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/palantir/stacktrace" - "github.com/google/uuid" ) // EventDispatcher dispatches a new event @@ -73,30 +72,15 @@ func (dispatcher *EventDispatcher) DispatchWithTimeout(ctx context.Context, even return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) } - if dispatcher.queue == nil { - ctxLogger := dispatcher.tracer.CtxLogger(dispatcher.logger, span) - queueID := uuid.New() - go func() { - time.Sleep(timeout) - dispatcher.DispatchSync(ctx, event) - }() - ctxLogger.Info(fmt.Sprintf( - "item added to [%s] queue with id [%s] and schedule [%s]", - dispatcher.queueConfig.Name, - queueID, - time.Now().UTC().Add(timeout), - )) - } else { - task, err := dispatcher.createCloudTask(event) - if err != nil { - msg := fmt.Sprintf("cannot create cloud task for event [%s] with id [%s]", event.Type(), event.ID()) - return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - if queueID, err = dispatcher.queue.Enqueue(ctx, task, timeout); err != nil { - msg := fmt.Sprintf("cannot enqueue event with ID [%s] and type [%s]", event.ID(), event.Type()) - return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } + task, err := dispatcher.createCloudTask(event) + if err != nil { + msg := fmt.Sprintf("cannot create cloud task for event [%s] with id [%s]", event.Type(), event.ID()) + return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + if queueID, err = dispatcher.queue.Enqueue(ctx, task, timeout); err != nil { + msg := fmt.Sprintf("cannot enqueue event with ID [%s] and type [%s]", event.ID(), event.Type()) + return queueID, dispatcher.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) } return queueID, nil diff --git a/api/pkg/services/in_memory_push_queue.go b/api/pkg/services/in_memory_push_queue.go new file mode 100644 index 00000000..b984d067 --- /dev/null +++ b/api/pkg/services/in_memory_push_queue.go @@ -0,0 +1,57 @@ +package services + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/NdoleStudio/httpsms/pkg/telemetry" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" +) + +type inMemoryPushQueue struct { + getEventDispatcher func() *EventDispatcher + queueConfig PushQueueConfig + logger telemetry.Logger + tracer telemetry.Tracer +} + +// NewGooglePushQueue creates a new googlePushQueue +func NewInMemoryPushQueue( + logger telemetry.Logger, + tracer telemetry.Tracer, + queueConfig PushQueueConfig, + getEventDispatcher func() *EventDispatcher, +) PushQueue { + return &inMemoryPushQueue{ + tracer: tracer, + logger: logger, + queueConfig: queueConfig, + getEventDispatcher: getEventDispatcher, + } +} + +// Enqueue a task to the queue +func (queue *inMemoryPushQueue) Enqueue(ctx context.Context, task *PushQueueTask, timeout time.Duration) (queueID string, err error) { + ctx, span := queue.tracer.Start(ctx) + ctxLogger := queue.tracer.CtxLogger(queue.logger, span) + queueID = uuid.New().String() + + go func() { + time.Sleep(timeout) + var event cloudevents.Event + json.Unmarshal(task.Body, &event) + queue.getEventDispatcher().DispatchSync(ctx, event) + }() + + ctxLogger.Info(fmt.Sprintf( + "item added to [%s] queue with id [%s] and schedule [%s]", + queue.queueConfig.Name, + queueID, + time.Now().UTC().Add(timeout), + )) + + return queueID, nil +} From 36dc244cbf75cdd0127dc285a83c118fbdaccbe4 Mon Sep 17 00:00:00 2001 From: Eriksson Monteiro Date: Sun, 19 Mar 2023 22:40:11 +0000 Subject: [PATCH 3/3] add missing span.End() --- api/pkg/services/in_memory_push_queue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/pkg/services/in_memory_push_queue.go b/api/pkg/services/in_memory_push_queue.go index b984d067..c57ba40f 100644 --- a/api/pkg/services/in_memory_push_queue.go +++ b/api/pkg/services/in_memory_push_queue.go @@ -36,6 +36,8 @@ func NewInMemoryPushQueue( // Enqueue a task to the queue func (queue *inMemoryPushQueue) Enqueue(ctx context.Context, task *PushQueueTask, timeout time.Duration) (queueID string, err error) { ctx, span := queue.tracer.Start(ctx) + defer span.End() + ctxLogger := queue.tracer.CtxLogger(queue.logger, span) queueID = uuid.New().String()