diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9a886a7c6..a3c216f00 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: - name: Upload coverage artifact uses: actions/upload-artifact@v4 with: - name: coverage + name: coverage-${{ strategy.job-index }} path: coverage.out pretest: @@ -62,7 +62,7 @@ jobs: - name: Upload coverage artifact uses: actions/upload-artifact@v4 with: - name: coverage-plugins + name: coverage-plugins-${{ strategy.job-index }} path: coverage-plugins*.out coverage: @@ -79,11 +79,11 @@ jobs: - name: Download coverage uses: actions/download-artifact@v4 with: - name: coverage + name: coverage-${{ strategy.job-index }} - name: Download plugins coverage uses: actions/download-artifact@v4 with: - name: coverage-plugins + name: coverage-plugins-${{ strategy.job-index }} - name: Install goveralls and send coverage env: COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/plugins/sinks/compass/sink.go b/plugins/sinks/compass/sink.go index 218a13021..33f718066 100644 --- a/plugins/sinks/compass/sink.go +++ b/plugins/sinks/compass/sink.go @@ -20,6 +20,7 @@ import ( "github.com/goto/meteor/utils" "github.com/goto/salt/log" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/anypb" ) @@ -91,22 +92,33 @@ func (s *Sink) Init(ctx context.Context, config plugins.Config) error { } func (s *Sink) Sink(ctx context.Context, batch []models.Record) error { - for _, record := range batch { - asset := record.Data() - s.logger.Info("sinking record to compass", "record", asset.GetUrn()) + if len(batch) == 0 { + return nil + } - compassPayload, err := s.buildCompassPayload(asset) - if err != nil { - return fmt.Errorf("build compass payload: %w", err) - } - if err = s.send(ctx, compassPayload); err != nil { - return fmt.Errorf("send data: %w", err) - } + errGroup := errgroup.Group{} + errGroup.SetLimit(len(batch)) - s.logger.Info("successfully sinked record to compass", "record", asset.GetUrn()) + for _, record := range batch { + record := record + errGroup.Go(func() error { + asset := record.Data() + s.logger.Info("sinking record to compass", "record", asset.GetUrn()) + + compassPayload, err := s.buildCompassPayload(asset) + if err != nil { + return fmt.Errorf("build compass payload: %w", err) + } + if err := s.send(ctx, compassPayload); err != nil { + return fmt.Errorf("send data: %w", err) + } + + s.logger.Info("successfully sinked record to compass", "record", asset.GetUrn()) + return nil + }) } - return nil + return errGroup.Wait() } func (*Sink) Close() error { return nil }