diff --git a/internal/memdb/memdb.go b/internal/memdb/memdb.go index 2907d4cb4e..5acac68741 100644 --- a/internal/memdb/memdb.go +++ b/internal/memdb/memdb.go @@ -18,7 +18,6 @@ import ( // client is mostly used for testing the destination plugin. type client struct { - schema.DefaultTransformer spec specs.Destination memoryDB map[string][]arrow.Record tables map[string]*arrow.Schema @@ -87,8 +86,10 @@ func (c *client) overwrite(table *arrow.Schema, data arrow.Record) { } } if found { + tmp := c.memoryDB[tableName][i] c.memoryDB[tableName] = append(c.memoryDB[tableName][:i], c.memoryDB[tableName][i+1:]...) c.memoryDB[tableName] = append(c.memoryDB[tableName], data) + tmp.Release() return } } @@ -109,6 +110,11 @@ func (c *client) Migrate(_ context.Context, tables schema.Schemas) error { if changes == nil { continue } + for _, t := range c.memoryDB { + for _, row := range t { + row.Release() + } + } c.memoryDB[tableName] = make([]arrow.Record, 0) c.tables[tableName] = table } @@ -198,6 +204,11 @@ func (*client) Metrics() destination.Metrics { } func (c *client) Close(context.Context) error { + for _, table := range c.memoryDB { + for _, row := range table { + row.Release() + } + } c.memoryDB = nil return nil } @@ -219,6 +230,8 @@ func (c *client) deleteStaleTable(_ context.Context, table *arrow.Schema, source rowSyncTime := row.Column(syncColIndex).(*array.Timestamp).Value(0).ToTime(arrow.Microsecond).UTC() if !rowSyncTime.Before(syncTime) { filteredTable = append(filteredTable, c.memoryDB[tableName][i]) + } else { + c.memoryDB[tableName][i].Release() } } } diff --git a/plugins/destination/plugin_testing.go b/plugins/destination/plugin_testing.go index fddbbb84ce..ca5fc42423 100644 --- a/plugins/destination/plugin_testing.go +++ b/plugins/destination/plugin_testing.go @@ -14,6 +14,7 @@ import ( "github.com/apache/arrow/go/v12/arrow/memory" "github.com/cloudquery/plugin-sdk/v2/schema" "github.com/cloudquery/plugin-sdk/v2/specs" + "github.com/cloudquery/plugin-sdk/v2/types" "github.com/rs/zerolog" ) @@ -214,8 +215,16 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs func sortRecordsBySyncTime(table *schema.Table, records []arrow.Record) { syncTimeIndex := table.Columns.Index(schema.CqSyncTimeColumn.Name) + cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name) sort.Slice(records, func(i, j int) bool { // sort by sync time, then UUID - return records[i].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond).Before(records[j].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond)) + first := records[i].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond) + second := records[j].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond) + if first.Equal(second) { + firstUUID := records[i].Column(cqIDIndex).(*types.UUIDArray).Value(0).String() + secondUUID := records[j].Column(cqIDIndex).(*types.UUIDArray).Value(0).String() + return strings.Compare(firstUUID, secondUUID) < 0 + } + return first.Before(second) }) } diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index c7ed0b6587..5cb42a7016 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -42,6 +42,7 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P MaxRows: 1, } resource1 := testdata.GenTestData(mem, source, opts)[0] + resource1.Retain() defer resource1.Release() if err := p.writeOne(ctx, sourceSpec, syncTime, resource1); err != nil { return fmt.Errorf("failed to write one: %w", err) @@ -51,6 +52,7 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P return fmt.Errorf("failed to migrate existing table: %w", err) } resource2 := testdata.GenTestData(mem, target, opts)[0] + resource2.Retain() defer resource2.Release() if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil { return fmt.Errorf("failed to write one after migration: %w", err) diff --git a/plugins/destination/plugin_testing_overwrite.go b/plugins/destination/plugin_testing_overwrite.go index 08adcbd5fc..8c58ae554c 100644 --- a/plugins/destination/plugin_testing_overwrite.go +++ b/plugins/destination/plugin_testing_overwrite.go @@ -42,6 +42,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, MaxRows: 2, } resources := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts) + for _, r := range resources { + r.Retain() + } defer func() { for _, r := range resources { r.Release() @@ -83,6 +86,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, StableUUID: *u, } updatedResource := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts)[0] + updatedResource.Retain() defer updatedResource.Release() // write second time if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil { diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go index 752d85a618..39ae0e99f2 100644 --- a/plugins/destination/plugin_testing_overwrite_delete_stale.go +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -48,6 +48,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte incResources := testdata.GenTestData(mem, incTable.ToArrowSchema(), opts) allResources := resources allResources = append(allResources, incResources...) + for _, r := range allResources { + r.Retain() + } defer func() { for _, r := range allResources { r.Release() @@ -96,6 +99,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte MaxRows: 1, } updatedResources := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0] + updatedResources.Retain() defer updatedResources.Release() if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil { diff --git a/plugins/destination/plugin_testing_write_append.go b/plugins/destination/plugin_testing_write_append.go index 368a764817..746bf34d26 100644 --- a/plugins/destination/plugin_testing_write_append.go +++ b/plugins/destination/plugin_testing_write_append.go @@ -40,6 +40,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, MaxRows: 1, } record1 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0] + record1.Retain() defer record1.Release() if err := p.writeOne(ctx, specSource, syncTime, record1); err != nil { return fmt.Errorf("failed to write one second time: %w", err) @@ -48,6 +49,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, secondSyncTime := syncTime.Add(10 * time.Second).UTC() opts.SyncTime = secondSyncTime record2 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0] + record2.Retain() defer record2.Release() if !s.tests.SkipSecondAppend {