From 6985a579a535a8a2a834565b88285e4313e446d1 Mon Sep 17 00:00:00 2001 From: Parth Patel <88045217+pxp928@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:49:59 -0500 Subject: [PATCH] move message acknowledgment for pubsub to be done after the ingestion has occured (#1753) Signed-off-by: pxp928 --- pkg/certifier/certify/certify_test.go | 15 ++++++++++----- pkg/emitter/emitter.go | 13 ++++++------- pkg/emitter/nats_emitter_test.go | 15 ++++++++++----- pkg/handler/collector/collector_test.go | 13 +++++++++---- pkg/handler/processor/process/process.go | 17 +++++++++++------ 5 files changed, 46 insertions(+), 27 deletions(-) diff --git a/pkg/certifier/certify/certify_test.go b/pkg/certifier/certify/certify_test.go index da568457d3..3c6d9b3f4d 100644 --- a/pkg/certifier/certify/certify_test.go +++ b/pkg/certifier/certify/certify_test.go @@ -34,6 +34,7 @@ import ( "github.com/guacsec/guac/pkg/events" "github.com/guacsec/guac/pkg/handler/processor" "github.com/guacsec/guac/pkg/logging" + "gocloud.dev/pubsub" ) type mockQuery struct { @@ -264,13 +265,13 @@ func Test_Publish(t *testing.T) { err = testSubscribe(ctx, transportFunc, blobStore, pubsub) if err != nil { - if err != nil && !errors.Is(err, context.DeadlineExceeded) { + if !errors.Is(err, context.DeadlineExceeded) { t.Errorf("nats emitter Subscribe test errored = %v", err) } } } -func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error { +func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, emPubSub *emitter.EmitterPubSub) error { logger := logging.FromContext(ctx) uuid, err := uuid.NewV4() @@ -278,13 +279,13 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre return fmt.Errorf("failed to get uuid with the following error: %w", err) } uuidString := uuid.String() - sub, err := pubsub.Subscribe(ctx, uuidString) + sub, err := emPubSub.Subscribe(ctx, uuidString) if err != nil { return err } - processFunc := func(d []byte) error { + processFunc := func(d *pubsub.Message) error { - blobStoreKey, err := events.DecodeEventSubject(ctx, d) + blobStoreKey, err := events.DecodeEventSubject(ctx, d.Body) if err != nil { logger.Errorf("[processor: %s] failed decode event: %v", uuidString, err) return nil @@ -317,6 +318,10 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre } logger.Infof("[processor: %s] docTree Processed: %+v", uuidString, docTree.Document.SourceInformation) + // ack the message from the queue once the ingestion has occurred + d.Ack() + logger.Infof("[processor: %s] message acknowledged in pusbub", uuidString) + return nil } diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index 99ea718030..d7375f7405 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -38,13 +38,13 @@ type EmitterPubSub struct { ServiceURL string } -// DataFunc determines how the data return from NATS is transformed based on implementation per module -type DataFunc func([]byte) error +// DataFunc determines how the data return from the pubsub is transformed based on implementation per module +type DataFunc func(*pubsub.Message) error // subscriber provides dataChan to read the collected data from the stream, errChan for any error that return and // the pubsub.Subscription to close the subscription once complete type subscriber struct { - dataChan <-chan []byte + dataChan <-chan *pubsub.Message errChan <-chan error subscription *pubsub.Subscription } @@ -157,9 +157,9 @@ func (s *subscriber) CloseSubscriber(ctx context.Context) error { } // createSubscriber receives from the subscription and use the dataChan and errChan to continuously send collected data or errors -func createSubscriber(ctx context.Context, subscription *pubsub.Subscription, id string) (<-chan []byte, <-chan error, error) { +func createSubscriber(ctx context.Context, subscription *pubsub.Subscription, id string) (<-chan *pubsub.Message, <-chan error, error) { // docChan to collect artifacts - dataChan := make(chan []byte, bufferChannelSize) + dataChan := make(chan *pubsub.Message, bufferChannelSize) // errChan to receive error from collectors errChan := make(chan error, 1) go func() { @@ -185,8 +185,7 @@ func createSubscriber(ctx context.Context, subscription *pubsub.Subscription, id return } } - msg.Ack() - dataChan <- msg.Body + dataChan <- msg } }() return dataChan, errChan, nil diff --git a/pkg/emitter/nats_emitter_test.go b/pkg/emitter/nats_emitter_test.go index b528152712..99c63e0e21 100644 --- a/pkg/emitter/nats_emitter_test.go +++ b/pkg/emitter/nats_emitter_test.go @@ -22,6 +22,7 @@ import ( "testing" jsoniter "github.com/json-iterator/go" + "gocloud.dev/pubsub" uuid "github.com/gofrs/uuid" nats_test "github.com/guacsec/guac/internal/testing/nats" @@ -112,7 +113,7 @@ func TestNatsEmitter_RecreateStream(t *testing.T) { t.Errorf("failed to delete stream: %v", err) } _, err = jetStream.js.StreamInfo(streamName) - if err == nil || (err != nil) && !errors.Is(err, tt.wantErrMessage) { + if err != nil && !errors.Is(err, tt.wantErrMessage) { t.Errorf("RecreateStream() error = %v, wantErr %v", err, tt.wantErrMessage) return } @@ -145,7 +146,7 @@ func testPublish(ctx context.Context, d *processor.Document, pubsub *EmitterPubS return nil } -func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, pubsub *EmitterPubSub) error { +func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, emPubSub *EmitterPubSub) error { logger := logging.FromContext(ctx) uuid, err := uuid.NewV4() @@ -153,13 +154,13 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre return fmt.Errorf("failed to get uuid with the following error: %w", err) } uuidString := uuid.String() - sub, err := pubsub.Subscribe(ctx, uuidString) + sub, err := emPubSub.Subscribe(ctx, uuidString) if err != nil { return err } - processFunc := func(d []byte) error { + processFunc := func(d *pubsub.Message) error { doc := processor.Document{} - err := json.Unmarshal(d, &doc) + err := json.Unmarshal(d.Body, &doc) if err != nil { fmtErrString := fmt.Sprintf("[processor: %s] failed unmarshal the document bytes", uuidString) logger.Errorf(fmtErrString+": %v", err) @@ -179,6 +180,10 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre return fmt.Errorf(fmtErrString+": %w", err) } logger.Infof("[processor: %s] docTree Processed: %+v", uuidString, docTree.Document.SourceInformation) + // ack the message from the queue once the ingestion has occurred + d.Ack() + logger.Infof("[processor: %s] message acknowledged in pusbub", uuidString) + return nil } diff --git a/pkg/handler/collector/collector_test.go b/pkg/handler/collector/collector_test.go index c069852ecc..f610b042dc 100644 --- a/pkg/handler/collector/collector_test.go +++ b/pkg/handler/collector/collector_test.go @@ -33,6 +33,7 @@ import ( "github.com/guacsec/guac/pkg/handler/collector/file" "github.com/guacsec/guac/pkg/handler/processor" "github.com/guacsec/guac/pkg/logging" + "gocloud.dev/pubsub" ) func TestCollect(t *testing.T) { @@ -143,7 +144,7 @@ func Test_Publish(t *testing.T) { } } -func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error { +func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, emPubSub *emitter.EmitterPubSub) error { logger := logging.FromContext(ctx) uuid, err := uuid.NewV4() @@ -151,13 +152,13 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre return fmt.Errorf("failed to get uuid with the following error: %w", err) } uuidString := uuid.String() - sub, err := pubsub.Subscribe(ctx, uuidString) + sub, err := emPubSub.Subscribe(ctx, uuidString) if err != nil { return err } - processFunc := func(d []byte) error { + processFunc := func(d *pubsub.Message) error { - blobStoreKey, err := events.DecodeEventSubject(ctx, d) + blobStoreKey, err := events.DecodeEventSubject(ctx, d.Body) if err != nil { logger.Errorf("[processor: %s] failed decode event: %v", uuidString, err) return nil @@ -189,6 +190,10 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre return fmt.Errorf(fmtErrString+": %w", err) } logger.Infof("[processor: %s] docTree Processed: %+v", uuidString, docTree.Document.SourceInformation) + // ack the message from the queue once the ingestion has occurred + d.Ack() + logger.Infof("[processor: %s] message acknowledged in pusbub", uuidString) + return nil } diff --git a/pkg/handler/processor/process/process.go b/pkg/handler/processor/process/process.go index a700599dd5..757f0a7701 100644 --- a/pkg/handler/processor/process/process.go +++ b/pkg/handler/processor/process/process.go @@ -43,6 +43,7 @@ import ( "github.com/guacsec/guac/pkg/logging" jsoniter "github.com/json-iterator/go" "github.com/klauspost/compress/zstd" + "gocloud.dev/pubsub" ) var ( @@ -74,7 +75,7 @@ func RegisterDocumentProcessor(p processor.DocumentProcessor, d processor.Docume // Subscribe receives the CD event and decodes the event to obtain the blob store key. // The key is used to retrieve the "document" from the blob store to be processed and ingested. -func Subscribe(ctx context.Context, em collector.Emitter, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error { +func Subscribe(ctx context.Context, em collector.Emitter, blobStore *blob.BlobStore, emPubSub *emitter.EmitterPubSub) error { logger := logging.FromContext(ctx) uuid, err := uuid.NewV4() @@ -82,14 +83,14 @@ func Subscribe(ctx context.Context, em collector.Emitter, blobStore *blob.BlobSt return fmt.Errorf("failed to get uuid with the following error: %w", err) } uuidString := uuid.String() - sub, err := pubsub.Subscribe(ctx, uuidString) + sub, err := emPubSub.Subscribe(ctx, uuidString) if err != nil { return fmt.Errorf("[processor: %s] failed to create new pubsub: %w", uuidString, err) } // should still continue if there are errors since problem is with individual documents - processFunc := func(d []byte) error { + processFunc := func(d *pubsub.Message) error { - blobStoreKey, err := events.DecodeEventSubject(ctx, d) + blobStoreKey, err := events.DecodeEventSubject(ctx, d.Body) if err != nil { logger.Errorf("[processor: %s] failed decode event: %v", uuidString, err) return nil @@ -111,16 +112,20 @@ func Subscribe(ctx context.Context, em collector.Emitter, blobStore *blob.BlobSt logger.Error("[processor: %s] failed transportFunc: %v", uuidString, err) return nil } + // ack the message from the queue once the ingestion has occurred via the Emitter (em) function specified above + d.Ack() + logger.Infof("[processor: %s] message acknowledged in pusbub", uuidString) + return nil } err = sub.GetDataFromSubscriber(ctx, processFunc) if err != nil { - return fmt.Errorf("[processor: %s] failed to get data from %s: %w", uuidString, pubsub.ServiceURL, err) + return fmt.Errorf("[processor: %s] failed to get data from %s: %w", uuidString, emPubSub.ServiceURL, err) } if err := sub.CloseSubscriber(ctx); err != nil { - return fmt.Errorf("[processor: %s] failed to close subscriber: %s, with error: %w", uuidString, pubsub.ServiceURL, err) + return fmt.Errorf("[processor: %s] failed to close subscriber: %s, with error: %w", uuidString, emPubSub.ServiceURL, err) } return nil