From 51d151f352cd22e1e095600595384788f4143ee0 Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Thu, 26 Jan 2023 15:09:29 +0200 Subject: [PATCH 1/3] filter the duplicate pks --- internal/pk/pk.go | 20 ++++++++++ internal/pk/pk_test.go | 57 +++++++++++++++++++++++++++ plugins/destination/managed_writer.go | 45 +++++++++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 internal/pk/pk.go create mode 100644 internal/pk/pk_test.go diff --git a/internal/pk/pk.go b/internal/pk/pk.go new file mode 100644 index 0000000000..6759751bac --- /dev/null +++ b/internal/pk/pk.go @@ -0,0 +1,20 @@ +package pk + +import ( + "fmt" + "strings" + + "github.com/cloudquery/plugin-sdk/schema" +) + +func Convert(table *schema.Table, resource []any) string { + parts := make([]string, 0, len(table.PrimaryKeys())) + for i, col := range table.Columns { + if !col.CreationOptions.PrimaryKey { + continue + } + parts = append(parts, fmt.Sprint(resource[i])) + } + + return "(" + strings.Join(parts, ",") + ")" +} diff --git a/internal/pk/pk_test.go b/internal/pk/pk_test.go new file mode 100644 index 0000000000..ce2f3ff6e2 --- /dev/null +++ b/internal/pk/pk_test.go @@ -0,0 +1,57 @@ +package pk_test + +import ( + "testing" + + "github.com/cloudquery/plugin-sdk/internal/pk" + "github.com/cloudquery/plugin-sdk/schema" + "github.com/stretchr/testify/require" +) + +func TestConvert(t *testing.T) { + type testCase struct { + table *schema.Table + resource []any + expected string + } + + for _, tc := range []testCase{ + { + table: &schema.Table{ + Name: "int", + Columns: schema.ColumnList{ + { + Name: "col1", + Type: schema.TypeInt, + CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}, + }, + }, + }, + resource: []any{2}, + expected: "(2)", + }, + { + table: &schema.Table{ + Name: "int_str", + Columns: schema.ColumnList{ + { + Name: "col1", + Type: schema.TypeInt, + CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}, + }, + { + Name: "col2", + Type: schema.TypeString, + CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}, + }, + }, + }, + resource: []any{2, "some"}, + expected: "(2,some)", + }, + } { + t.Run(tc.table.Name, func(t *testing.T) { + require.Equal(t, tc.expected, pk.Convert(tc.table, tc.resource)) + }) + } +} diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index 3b98ae92d6..6deb2d1df1 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -2,12 +2,15 @@ package destination import ( "context" + "strings" "sync" "sync/atomic" "time" + "github.com/cloudquery/plugin-sdk/internal/pk" "github.com/cloudquery/plugin-sdk/schema" "github.com/cloudquery/plugin-sdk/specs" + "github.com/getsentry/sentry-go" ) type worker struct { @@ -54,6 +57,8 @@ func (p *Plugin) worker(ctx context.Context, metrics *Metrics, table *schema.Tab } func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Table, resources [][]any) { + resources = p.filterByPK(table, resources) + start := time.Now() batchSize := len(resources) if err := p.client.WriteTableBatch(ctx, table, resources); err != nil { @@ -66,6 +71,46 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Tabl } } +func (p *Plugin) filterByPK(table *schema.Table, resources [][]any) [][]any { + pks := make(map[string]bool, len(resources)) // value = reported already + res := make([][]any, 0, len(resources)) + + for _, r := range resources { + k := pk.Convert(table, r) + switch reported, ok := pks[k]; ok { + case true: + if reported { + // already reported + continue + } + pks[k] = true + + pkSpec := "(" + strings.Join(table.PrimaryKeys(), ",") + ")" + // log err + p.logger.Error(). + Str("table", table.Name). + Str("pk", pkSpec). + Str("value", pk.Convert(table, r)). + Msg("duplicate primary key") + + // send to Sentry only once per table, + // to avoid sending too many duplicate messages + sentry.WithScope(func(scope *sentry.Scope) { + scope.SetTag("plugin", p.name) + scope.SetTag("version", p.version) + scope.SetTag("table", table.Name) + scope.SetExtra("pk", pkSpec) + sentry.CurrentHub().CaptureMessage("duplicate primary key") + }) + default: + pks[k] = false + res = append(res, r) + } + } + + return res +} + func (p *Plugin) writeManagedTableBatch(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan schema.DestinationResource) error { syncTime = syncTime.UTC() SetDestinationManagedCqColumns(tables) From fe2c3a66ab44a4a3f865bc2f32b21fc04df76e19 Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Thu, 26 Jan 2023 15:55:44 +0200 Subject: [PATCH 2/3] less ident --- plugins/destination/managed_writer.go | 58 +++++++++++++-------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index 6deb2d1df1..30f4ebd66a 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -72,40 +72,40 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Tabl } func (p *Plugin) filterByPK(table *schema.Table, resources [][]any) [][]any { - pks := make(map[string]bool, len(resources)) // value = reported already + pks := make(map[string]struct{}, len(resources)) res := make([][]any, 0, len(resources)) - + var reported bool for _, r := range resources { k := pk.Convert(table, r) - switch reported, ok := pks[k]; ok { - case true: - if reported { - // already reported - continue - } - pks[k] = true - - pkSpec := "(" + strings.Join(table.PrimaryKeys(), ",") + ")" - // log err - p.logger.Error(). - Str("table", table.Name). - Str("pk", pkSpec). - Str("value", pk.Convert(table, r)). - Msg("duplicate primary key") - - // send to Sentry only once per table, - // to avoid sending too many duplicate messages - sentry.WithScope(func(scope *sentry.Scope) { - scope.SetTag("plugin", p.name) - scope.SetTag("version", p.version) - scope.SetTag("table", table.Name) - scope.SetExtra("pk", pkSpec) - sentry.CurrentHub().CaptureMessage("duplicate primary key") - }) - default: - pks[k] = false + _, ok := pks[k] + switch { + case !ok: + pks[k] = struct{}{} res = append(res, r) + continue + case reported: + continue } + + reported = true + pkSpec := "(" + strings.Join(table.PrimaryKeys(), ",") + ")" + + // log err + p.logger.Error(). + Str("table", table.Name). + Str("pk", pkSpec). + Str("value", pk.Convert(table, r)). + Msg("duplicate primary key") + + // send to Sentry only once per table, + // to avoid sending too many duplicate messages + sentry.WithScope(func(scope *sentry.Scope) { + scope.SetTag("plugin", p.name) + scope.SetTag("version", p.version) + scope.SetTag("table", table.Name) + scope.SetExtra("pk", pkSpec) + sentry.CurrentHub().CaptureMessage("duplicate primary key") + }) } return res From c76f8aaf9267b4c59f130c24562c663d206bb129 Mon Sep 17 00:00:00 2001 From: candiduslynx Date: Thu, 26 Jan 2023 17:00:15 +0200 Subject: [PATCH 3/3] review comments --- internal/pk/pk.go | 2 +- internal/pk/pk_test.go | 4 ++-- plugins/destination/managed_writer.go | 12 ++++++------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/pk/pk.go b/internal/pk/pk.go index 6759751bac..0723d64178 100644 --- a/internal/pk/pk.go +++ b/internal/pk/pk.go @@ -7,7 +7,7 @@ import ( "github.com/cloudquery/plugin-sdk/schema" ) -func Convert(table *schema.Table, resource []any) string { +func String(table *schema.Table, resource []any) string { parts := make([]string, 0, len(table.PrimaryKeys())) for i, col := range table.Columns { if !col.CreationOptions.PrimaryKey { diff --git a/internal/pk/pk_test.go b/internal/pk/pk_test.go index ce2f3ff6e2..3bea20053c 100644 --- a/internal/pk/pk_test.go +++ b/internal/pk/pk_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestConvert(t *testing.T) { +func TestString(t *testing.T) { type testCase struct { table *schema.Table resource []any @@ -51,7 +51,7 @@ func TestConvert(t *testing.T) { }, } { t.Run(tc.table.Name, func(t *testing.T) { - require.Equal(t, tc.expected, pk.Convert(tc.table, tc.resource)) + require.Equal(t, tc.expected, pk.String(tc.table, tc.resource)) }) } } diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index 30f4ebd66a..0dd5ab2f41 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -57,7 +57,7 @@ func (p *Plugin) worker(ctx context.Context, metrics *Metrics, table *schema.Tab } func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Table, resources [][]any) { - resources = p.filterByPK(table, resources) + resources = p.removeDuplicatesByPK(table, resources) start := time.Now() batchSize := len(resources) @@ -71,16 +71,16 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Tabl } } -func (p *Plugin) filterByPK(table *schema.Table, resources [][]any) [][]any { +func (p *Plugin) removeDuplicatesByPK(table *schema.Table, resources [][]any) [][]any { pks := make(map[string]struct{}, len(resources)) res := make([][]any, 0, len(resources)) var reported bool for _, r := range resources { - k := pk.Convert(table, r) - _, ok := pks[k] + key := pk.String(table, r) + _, ok := pks[key] switch { case !ok: - pks[k] = struct{}{} + pks[key] = struct{}{} res = append(res, r) continue case reported: @@ -94,7 +94,7 @@ func (p *Plugin) filterByPK(table *schema.Table, resources [][]any) [][]any { p.logger.Error(). Str("table", table.Name). Str("pk", pkSpec). - Str("value", pk.Convert(table, r)). + Str("value", key). Msg("duplicate primary key") // send to Sentry only once per table,