diff --git a/plugins/destination/plugin.go b/plugins/destination/plugin.go index 31ab725094..1d40f6af80 100644 --- a/plugins/destination/plugin.go +++ b/plugins/destination/plugin.go @@ -269,6 +269,7 @@ func (p *Plugin) Write(ctx context.Context, sourceSpec specs.Source, tables sche if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale { tablesToDelete := tables if sourceSpec.Backend != specs.BackendNone { + tablesToDelete = make(schema.Tables, 0, len(tables)) for _, t := range tables { if !t.IsIncremental { tablesToDelete = append(tablesToDelete, t) diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go index 06d92761fa..41b1dad121 100644 --- a/plugins/destination/plugin_testing_overwrite_delete_stale.go +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -89,10 +89,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte StableUUID: u, MaxRows: 1, } - updatedResources := schema.GenTestData(table, opts)[0] + updatedResources := schema.GenTestData(table, opts) + updatedIncResources := schema.GenTestData(incTable, opts) + allUpdatedResources := updatedResources + allUpdatedResources = append(allUpdatedResources, updatedIncResources...) - if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil { - return fmt.Errorf("failed to write one second time: %w", err) + if err := p.writeAll(ctx, sourceSpec, secondSyncTime, allUpdatedResources); err != nil { + return fmt.Errorf("failed to write all second time: %w", err) } resourcesRead, err = p.readAll(ctx, table, sourceName) @@ -108,7 +111,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte return fmt.Errorf("after overwrite expected first resource to be different. diff: %s", diff) } - resourcesRead, err = p.readAll(ctx, tables[0], sourceName) + resourcesRead, err = p.readAll(ctx, table, sourceName) if err != nil { return fmt.Errorf("failed to read all second time: %w", err) } @@ -117,19 +120,19 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte } // we expect the only resource returned to match the updated resource we wrote - if !array.RecordApproxEqual(updatedResources, resourcesRead[0]) { - diff := RecordDiff(updatedResources, resourcesRead[0]) + if !array.RecordApproxEqual(updatedResources[0], resourcesRead[0]) { + diff := RecordDiff(updatedResources[0], resourcesRead[0]) return fmt.Errorf("after delete stale expected resource to be equal. diff: %s", diff) } - // we expect the incremental table to still have 2 resources, because delete-stale should + // we expect the incremental table to still have 3 resources, because delete-stale should // not apply there - resourcesRead, err = p.readAll(ctx, tables[1], sourceName) + resourcesRead, err = p.readAll(ctx, incTable, sourceName) if err != nil { return fmt.Errorf("failed to read all from incremental table: %w", err) } - if len(resourcesRead) != 2 { - return fmt.Errorf("expected 2 resources in incremental table after delete-stale, got %d", len(resourcesRead)) + if len(resourcesRead) != 3 { + return fmt.Errorf("expected 3 resources in incremental table after delete-stale, got %d", len(resourcesRead)) } return nil diff --git a/schema/table.go b/schema/table.go index a0a0940f23..ed774f3b39 100644 --- a/schema/table.go +++ b/schema/table.go @@ -116,21 +116,20 @@ func NewTableFromArrowSchema(sc *arrow.Schema) (*Table, error) { return nil, fmt.Errorf("missing table name") } description, _ := tableMD.GetValue(MetadataTableDescription) + constraintName, _ := tableMD.GetValue(MetadataConstraintName) fields := sc.Fields() columns := make(ColumnList, len(fields)) for i, field := range fields { columns[i] = NewColumnFromArrowField(field) } table := &Table{ - Name: name, - Description: description, - Columns: columns, + Name: name, + Description: description, + PkConstraintName: constraintName, + Columns: columns, } - if constraintName, found := tableMD.GetValue(MetadataConstraintName); found { - table.PkConstraintName = constraintName - } - if title, found := tableMD.GetValue(MetadataIncremental); found { - table.Title = title + if isIncremental, found := tableMD.GetValue(MetadataIncremental); found { + table.IsIncremental = isIncremental == MetadataTrue } return table, nil } @@ -369,9 +368,16 @@ func (t *Table) PrimaryKeysIndexes() []int { func (t *Table) ToArrowSchema() *arrow.Schema { fields := make([]arrow.Field, len(t.Columns)) - schemaMd := arrow.MetadataFrom(map[string]string{ - MetadataTableName: t.Name, - }) + md := map[string]string{ + MetadataTableName: t.Name, + MetadataTableDescription: t.Description, + MetadataConstraintName: t.PkConstraintName, + MetadataIncremental: MetadataFalse, + } + if t.IsIncremental { + md[MetadataIncremental] = MetadataTrue + } + schemaMd := arrow.MetadataFrom(md) for i, c := range t.Columns { fields[i] = c.ToArrowField() }