diff --git a/internal/pk/pk.go b/internal/pk/pk.go new file mode 100644 index 0000000000..0723d64178 --- /dev/null +++ b/internal/pk/pk.go @@ -0,0 +1,20 @@ +package pk + +import ( + "fmt" + "strings" + + "github.com/cloudquery/plugin-sdk/schema" +) + +func String(table *schema.Table, resource []any) string { + parts := make([]string, 0, len(table.PrimaryKeys())) + for i, col := range table.Columns { + if !col.CreationOptions.PrimaryKey { + continue + } + parts = append(parts, fmt.Sprint(resource[i])) + } + + return "(" + strings.Join(parts, ",") + ")" +} diff --git a/internal/pk/pk_test.go b/internal/pk/pk_test.go new file mode 100644 index 0000000000..3bea20053c --- /dev/null +++ b/internal/pk/pk_test.go @@ -0,0 +1,57 @@ +package pk_test + +import ( + "testing" + + "github.com/cloudquery/plugin-sdk/internal/pk" + "github.com/cloudquery/plugin-sdk/schema" + "github.com/stretchr/testify/require" +) + +func TestString(t *testing.T) { + type testCase struct { + table *schema.Table + resource []any + expected string + } + + for _, tc := range []testCase{ + { + table: &schema.Table{ + Name: "int", + Columns: schema.ColumnList{ + { + Name: "col1", + Type: schema.TypeInt, + CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}, + }, + }, + }, + resource: []any{2}, + expected: "(2)", + }, + { + table: &schema.Table{ + Name: "int_str", + Columns: schema.ColumnList{ + { + Name: "col1", + Type: schema.TypeInt, + CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}, + }, + { + Name: "col2", + Type: schema.TypeString, + CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}, + }, + }, + }, + resource: []any{2, "some"}, + expected: "(2,some)", + }, + } { + t.Run(tc.table.Name, func(t *testing.T) { + require.Equal(t, tc.expected, pk.String(tc.table, tc.resource)) + }) + } +} diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index 3b98ae92d6..0dd5ab2f41 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -2,12 +2,15 @@ package destination import ( "context" + "strings" "sync" "sync/atomic" "time" + "github.com/cloudquery/plugin-sdk/internal/pk" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" + "github.com/getsentry/sentry-go" ) type worker struct { @@ -54,6 +57,8 @@ func (p *Plugin) worker(ctx context.Context, metrics *Metrics, table *schema.Tab } func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Table, resources [][]any) { + resources = p.removeDuplicatesByPK(table, resources) + start := time.Now() batchSize := len(resources) if err := p.client.WriteTableBatch(ctx, table, resources); err != nil { @@ -66,6 +71,46 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Tabl } } +func (p *Plugin) removeDuplicatesByPK(table *schema.Table, resources [][]any) [][]any { + pks := make(map[string]struct{}, len(resources)) + res := make([][]any, 0, len(resources)) + var reported bool + for _, r := range resources { + key := pk.String(table, r) + _, ok := pks[key] + switch { + case !ok: + pks[key] = struct{}{} + res = append(res, r) + continue + case reported: + continue + } + + reported = true + pkSpec := "(" + strings.Join(table.PrimaryKeys(), ",") + ")" + + // log err + p.logger.Error(). + Str("table", table.Name). + Str("pk", pkSpec). + Str("value", key). + Msg("duplicate primary key") + + // send to Sentry only once per table, + // to avoid sending too many duplicate messages + sentry.WithScope(func(scope *sentry.Scope) { + scope.SetTag("plugin", p.name) + scope.SetTag("version", p.version) + scope.SetTag("table", table.Name) + scope.SetExtra("pk", pkSpec) + sentry.CurrentHub().CaptureMessage("duplicate primary key") + }) + } + + return res +} + func (p *Plugin) writeManagedTableBatch(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan schema.DestinationResource) error { syncTime = syncTime.UTC() SetDestinationManagedCqColumns(tables)