Skip to content

Commit

Permalink
move message acknowledgment for pubsub to be done after the ingestion…
Browse files Browse the repository at this point in the history
… has occured (#1753)

Signed-off-by: pxp928 <parth.psu@gmail.com>
  • Loading branch information
pxp928 committed Mar 6, 2024
1 parent d4a9a96 commit 6985a57
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 27 deletions.
15 changes: 10 additions & 5 deletions pkg/certifier/certify/certify_test.go
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/guacsec/guac/pkg/events"
"github.com/guacsec/guac/pkg/handler/processor"
"github.com/guacsec/guac/pkg/logging"
"gocloud.dev/pubsub"
)

type mockQuery struct {
Expand Down Expand Up @@ -264,27 +265,27 @@ func Test_Publish(t *testing.T) {

err = testSubscribe(ctx, transportFunc, blobStore, pubsub)
if err != nil {
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("nats emitter Subscribe test errored = %v", err)
}
}
}

func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error {
func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, emPubSub *emitter.EmitterPubSub) error {
logger := logging.FromContext(ctx)

uuid, err := uuid.NewV4()
if err != nil {
return fmt.Errorf("failed to get uuid with the following error: %w", err)
}
uuidString := uuid.String()
sub, err := pubsub.Subscribe(ctx, uuidString)
sub, err := emPubSub.Subscribe(ctx, uuidString)
if err != nil {
return err
}
processFunc := func(d []byte) error {
processFunc := func(d *pubsub.Message) error {

blobStoreKey, err := events.DecodeEventSubject(ctx, d)
blobStoreKey, err := events.DecodeEventSubject(ctx, d.Body)
if err != nil {
logger.Errorf("[processor: %s] failed decode event: %v", uuidString, err)
return nil
Expand Down Expand Up @@ -317,6 +318,10 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre
}

logger.Infof("[processor: %s] docTree Processed: %+v", uuidString, docTree.Document.SourceInformation)
// ack the message from the queue once the ingestion has occurred
d.Ack()
logger.Infof("[processor: %s] message acknowledged in pusbub", uuidString)

return nil
}

Expand Down
13 changes: 6 additions & 7 deletions pkg/emitter/emitter.go
Expand Up @@ -38,13 +38,13 @@ type EmitterPubSub struct {
ServiceURL string
}

// DataFunc determines how the data return from NATS is transformed based on implementation per module
type DataFunc func([]byte) error
// DataFunc determines how the data return from the pubsub is transformed based on implementation per module
type DataFunc func(*pubsub.Message) error

// subscriber provides dataChan to read the collected data from the stream, errChan for any error that return and
// the pubsub.Subscription to close the subscription once complete
type subscriber struct {
dataChan <-chan []byte
dataChan <-chan *pubsub.Message
errChan <-chan error
subscription *pubsub.Subscription
}
Expand Down Expand Up @@ -157,9 +157,9 @@ func (s *subscriber) CloseSubscriber(ctx context.Context) error {
}

// createSubscriber receives from the subscription and use the dataChan and errChan to continuously send collected data or errors
func createSubscriber(ctx context.Context, subscription *pubsub.Subscription, id string) (<-chan []byte, <-chan error, error) {
func createSubscriber(ctx context.Context, subscription *pubsub.Subscription, id string) (<-chan *pubsub.Message, <-chan error, error) {
// docChan to collect artifacts
dataChan := make(chan []byte, bufferChannelSize)
dataChan := make(chan *pubsub.Message, bufferChannelSize)
// errChan to receive error from collectors
errChan := make(chan error, 1)
go func() {
Expand All @@ -185,8 +185,7 @@ func createSubscriber(ctx context.Context, subscription *pubsub.Subscription, id
return
}
}
msg.Ack()
dataChan <- msg.Body
dataChan <- msg
}
}()
return dataChan, errChan, nil
Expand Down
15 changes: 10 additions & 5 deletions pkg/emitter/nats_emitter_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

jsoniter "github.com/json-iterator/go"
"gocloud.dev/pubsub"

uuid "github.com/gofrs/uuid"
nats_test "github.com/guacsec/guac/internal/testing/nats"
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestNatsEmitter_RecreateStream(t *testing.T) {
t.Errorf("failed to delete stream: %v", err)
}
_, err = jetStream.js.StreamInfo(streamName)
if err == nil || (err != nil) && !errors.Is(err, tt.wantErrMessage) {
if err != nil && !errors.Is(err, tt.wantErrMessage) {
t.Errorf("RecreateStream() error = %v, wantErr %v", err, tt.wantErrMessage)
return
}
Expand Down Expand Up @@ -145,21 +146,21 @@ func testPublish(ctx context.Context, d *processor.Document, pubsub *EmitterPubS
return nil
}

func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, pubsub *EmitterPubSub) error {
func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, emPubSub *EmitterPubSub) error {
logger := logging.FromContext(ctx)

uuid, err := uuid.NewV4()
if err != nil {
return fmt.Errorf("failed to get uuid with the following error: %w", err)
}
uuidString := uuid.String()
sub, err := pubsub.Subscribe(ctx, uuidString)
sub, err := emPubSub.Subscribe(ctx, uuidString)
if err != nil {
return err
}
processFunc := func(d []byte) error {
processFunc := func(d *pubsub.Message) error {
doc := processor.Document{}
err := json.Unmarshal(d, &doc)
err := json.Unmarshal(d.Body, &doc)
if err != nil {
fmtErrString := fmt.Sprintf("[processor: %s] failed unmarshal the document bytes", uuidString)
logger.Errorf(fmtErrString+": %v", err)
Expand All @@ -179,6 +180,10 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre
return fmt.Errorf(fmtErrString+": %w", err)
}
logger.Infof("[processor: %s] docTree Processed: %+v", uuidString, docTree.Document.SourceInformation)
// ack the message from the queue once the ingestion has occurred
d.Ack()
logger.Infof("[processor: %s] message acknowledged in pusbub", uuidString)

return nil
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/handler/collector/collector_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/guacsec/guac/pkg/handler/collector/file"
"github.com/guacsec/guac/pkg/handler/processor"
"github.com/guacsec/guac/pkg/logging"
"gocloud.dev/pubsub"
)

func TestCollect(t *testing.T) {
Expand Down Expand Up @@ -143,21 +144,21 @@ func Test_Publish(t *testing.T) {
}
}

func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error {
func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTree) error, blobStore *blob.BlobStore, emPubSub *emitter.EmitterPubSub) error {
logger := logging.FromContext(ctx)

uuid, err := uuid.NewV4()
if err != nil {
return fmt.Errorf("failed to get uuid with the following error: %w", err)
}
uuidString := uuid.String()
sub, err := pubsub.Subscribe(ctx, uuidString)
sub, err := emPubSub.Subscribe(ctx, uuidString)
if err != nil {
return err
}
processFunc := func(d []byte) error {
processFunc := func(d *pubsub.Message) error {

blobStoreKey, err := events.DecodeEventSubject(ctx, d)
blobStoreKey, err := events.DecodeEventSubject(ctx, d.Body)
if err != nil {
logger.Errorf("[processor: %s] failed decode event: %v", uuidString, err)
return nil
Expand Down Expand Up @@ -189,6 +190,10 @@ func testSubscribe(ctx context.Context, transportFunc func(processor.DocumentTre
return fmt.Errorf(fmtErrString+": %w", err)
}
logger.Infof("[processor: %s] docTree Processed: %+v", uuidString, docTree.Document.SourceInformation)
// ack the message from the queue once the ingestion has occurred
d.Ack()
logger.Infof("[processor: %s] message acknowledged in pusbub", uuidString)

return nil
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/handler/processor/process/process.go
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/guacsec/guac/pkg/logging"
jsoniter "github.com/json-iterator/go"
"github.com/klauspost/compress/zstd"
"gocloud.dev/pubsub"
)

var (
Expand Down Expand Up @@ -74,22 +75,22 @@ func RegisterDocumentProcessor(p processor.DocumentProcessor, d processor.Docume

// Subscribe receives the CD event and decodes the event to obtain the blob store key.
// The key is used to retrieve the "document" from the blob store to be processed and ingested.
func Subscribe(ctx context.Context, em collector.Emitter, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) error {
func Subscribe(ctx context.Context, em collector.Emitter, blobStore *blob.BlobStore, emPubSub *emitter.EmitterPubSub) error {
logger := logging.FromContext(ctx)

uuid, err := uuid.NewV4()
if err != nil {
return fmt.Errorf("failed to get uuid with the following error: %w", err)
}
uuidString := uuid.String()
sub, err := pubsub.Subscribe(ctx, uuidString)
sub, err := emPubSub.Subscribe(ctx, uuidString)
if err != nil {
return fmt.Errorf("[processor: %s] failed to create new pubsub: %w", uuidString, err)
}
// should still continue if there are errors since problem is with individual documents
processFunc := func(d []byte) error {
processFunc := func(d *pubsub.Message) error {

blobStoreKey, err := events.DecodeEventSubject(ctx, d)
blobStoreKey, err := events.DecodeEventSubject(ctx, d.Body)
if err != nil {
logger.Errorf("[processor: %s] failed decode event: %v", uuidString, err)
return nil
Expand All @@ -111,16 +112,20 @@ func Subscribe(ctx context.Context, em collector.Emitter, blobStore *blob.BlobSt
logger.Error("[processor: %s] failed transportFunc: %v", uuidString, err)
return nil
}
// ack the message from the queue once the ingestion has occurred via the Emitter (em) function specified above
d.Ack()
logger.Infof("[processor: %s] message acknowledged in pusbub", uuidString)

return nil
}

err = sub.GetDataFromSubscriber(ctx, processFunc)
if err != nil {
return fmt.Errorf("[processor: %s] failed to get data from %s: %w", uuidString, pubsub.ServiceURL, err)
return fmt.Errorf("[processor: %s] failed to get data from %s: %w", uuidString, emPubSub.ServiceURL, err)
}

if err := sub.CloseSubscriber(ctx); err != nil {
return fmt.Errorf("[processor: %s] failed to close subscriber: %s, with error: %w", uuidString, pubsub.ServiceURL, err)
return fmt.Errorf("[processor: %s] failed to close subscriber: %s, with error: %w", uuidString, emPubSub.ServiceURL, err)
}

return nil
Expand Down

0 comments on commit 6985a57

Please sign in to comment.