Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion internal/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

// client is mostly used for testing the destination plugin.
type client struct {
schema.DefaultTransformer
spec specs.Destination
memoryDB map[string][]arrow.Record
tables map[string]*arrow.Schema
Expand Down Expand Up @@ -87,8 +86,10 @@ func (c *client) overwrite(table *arrow.Schema, data arrow.Record) {
}
}
if found {
tmp := c.memoryDB[tableName][i]
c.memoryDB[tableName] = append(c.memoryDB[tableName][:i], c.memoryDB[tableName][i+1:]...)
c.memoryDB[tableName] = append(c.memoryDB[tableName], data)
tmp.Release()
return
}
}
Expand All @@ -109,6 +110,11 @@ func (c *client) Migrate(_ context.Context, tables schema.Schemas) error {
if changes == nil {
continue
}
for _, t := range c.memoryDB {
for _, row := range t {
row.Release()
}
}
c.memoryDB[tableName] = make([]arrow.Record, 0)
c.tables[tableName] = table
}
Expand Down Expand Up @@ -198,6 +204,11 @@ func (*client) Metrics() destination.Metrics {
}

func (c *client) Close(context.Context) error {
for _, table := range c.memoryDB {
for _, row := range table {
row.Release()
}
}
c.memoryDB = nil
return nil
}
Expand All @@ -219,6 +230,8 @@ func (c *client) deleteStaleTable(_ context.Context, table *arrow.Schema, source
rowSyncTime := row.Column(syncColIndex).(*array.Timestamp).Value(0).ToTime(arrow.Microsecond).UTC()
if !rowSyncTime.Before(syncTime) {
filteredTable = append(filteredTable, c.memoryDB[tableName][i])
} else {
c.memoryDB[tableName][i].Release()
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion plugins/destination/plugin_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/cloudquery/plugin-sdk/v2/specs"
"github.com/cloudquery/plugin-sdk/v2/types"
"github.com/rs/zerolog"
)

Expand Down Expand Up @@ -214,8 +215,16 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs

func sortRecordsBySyncTime(table *schema.Table, records []arrow.Record) {
syncTimeIndex := table.Columns.Index(schema.CqSyncTimeColumn.Name)
cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name)
sort.Slice(records, func(i, j int) bool {
// sort by sync time, then UUID
return records[i].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond).Before(records[j].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond))
first := records[i].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond)
second := records[j].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond)
if first.Equal(second) {
firstUUID := records[i].Column(cqIDIndex).(*types.UUIDArray).Value(0).String()
secondUUID := records[j].Column(cqIDIndex).(*types.UUIDArray).Value(0).String()
return strings.Compare(firstUUID, secondUUID) < 0
}
return first.Before(second)
})
}
2 changes: 2 additions & 0 deletions plugins/destination/plugin_testing_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P
MaxRows: 1,
}
resource1 := testdata.GenTestData(mem, source, opts)[0]
resource1.Retain()
defer resource1.Release()
if err := p.writeOne(ctx, sourceSpec, syncTime, resource1); err != nil {
return fmt.Errorf("failed to write one: %w", err)
Expand All @@ -51,6 +52,7 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P
return fmt.Errorf("failed to migrate existing table: %w", err)
}
resource2 := testdata.GenTestData(mem, target, opts)[0]
resource2.Retain()
defer resource2.Release()
if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil {
return fmt.Errorf("failed to write one after migration: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions plugins/destination/plugin_testing_overwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
MaxRows: 2,
}
resources := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts)
for _, r := range resources {
r.Retain()
}
defer func() {
for _, r := range resources {
r.Release()
Expand Down Expand Up @@ -83,6 +86,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
StableUUID: *u,
}
updatedResource := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts)[0]
updatedResource.Retain()
defer updatedResource.Release()
// write second time
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
incResources := testdata.GenTestData(mem, incTable.ToArrowSchema(), opts)
allResources := resources
allResources = append(allResources, incResources...)
for _, r := range allResources {
r.Retain()
}
defer func() {
for _, r := range allResources {
r.Release()
Expand Down Expand Up @@ -96,6 +99,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
MaxRows: 1,
}
updatedResources := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0]
updatedResources.Retain()
defer updatedResources.Release()

if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions plugins/destination/plugin_testing_write_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
MaxRows: 1,
}
record1 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0]
record1.Retain()
defer record1.Release()
if err := p.writeOne(ctx, specSource, syncTime, record1); err != nil {
return fmt.Errorf("failed to write one second time: %w", err)
Expand All @@ -48,6 +49,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
secondSyncTime := syncTime.Add(10 * time.Second).UTC()
opts.SyncTime = secondSyncTime
record2 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0]
record2.Retain()
defer record2.Release()

if !s.tests.SkipSecondAppend {
Expand Down