diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index c6911edddb..30c496be2c 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -189,30 +189,13 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s // send migrate messages first for _, tableOriginal := range tables.FlattenTables() { - // var table *schema.Table - table := tableOriginal.Copy(nil) - if syncClient.deterministicCQID { - // No PK adjustment should occur if `_cq_id` is not present in the table - cqIDCol := table.Columns.Get(schema.CqIDColumn.Name) - if cqIDCol == nil { - continue - } - for i, c := range table.Columns { - if c.Name == schema.CqIDColumn.Name { - // Ensure that the cq_id column is the primary key - table.Columns[i].PrimaryKey = true - continue - } - if !c.PrimaryKey { - continue - } - table.Columns[i].PrimaryKey = false - } + migrateMessage := &message.SyncMigrateTable{ + Table: tableOriginal.Copy(nil), } - - res <- &message.SyncMigrateTable{ - Table: table, + if syncClient.deterministicCQID { + schema.CqIDAsPK(migrateMessage.Table) } + res <- migrateMessage } resources := make(chan *schema.Resource) diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 31b3e1d8db..254c6b7d6f 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -45,7 +45,7 @@ func testColumnResolverPanic(context.Context, schema.ClientMeta, *schema.Resourc func testTableSuccessWithData(data []any) *schema.Table { return &schema.Table{ - Name: "test_table_success", + Name: "test_table_success_with_data", Resolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource, res chan<- any) error { res <- data return nil @@ -74,7 +74,7 @@ func testTableSuccess() *schema.Table { func testTableSuccessWithPK() *schema.Table { return &schema.Table{ - Name: "test_table_success", + Name: "test_table_success_pk", Resolver: testResolverSuccess, Columns: []schema.Column{ { @@ -88,7 +88,7 @@ func testTableSuccessWithPK() *schema.Table { func testTableSuccessWithCQIDPK() *schema.Table { return &schema.Table{ - Name: "test_table_success", + Name: "test_table_success_cq_id", Resolver: testResolverSuccess, Columns: []schema.Column{ schema.CqIDColumn, @@ -411,7 +411,7 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist initialTable := tables.Get(v.Table.Name) pks := migratedTable.PrimaryKeys() - if deterministicCQID { + if deterministicCQID && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil { if len(pks) != 1 { t.Fatalf("expected 1 pk. got %d", len(pks)) } diff --git a/schema/table.go b/schema/table.go index 6338868379..ab058d7219 100644 --- a/schema/table.go +++ b/schema/table.go @@ -124,6 +124,26 @@ func AddCqIDs(table *Table) { } } +// CqIDAsPK sets the cq_id column as primary key if it exists +// and removes the primary key from all other columns +func CqIDAsPK(t *Table) { + cqIDCol := t.Columns.Get(CqIDColumn.Name) + if cqIDCol == nil { + return + } + for i, c := range t.Columns { + if c.Name == CqIDColumn.Name { + // Ensure that the cq_id column is the primary key + t.Columns[i].PrimaryKey = true + continue + } + if !c.PrimaryKey { + continue + } + t.Columns[i].PrimaryKey = false + } +} + func NewTablesFromArrowSchemas(schemas []*arrow.Schema) (Tables, error) { tables := make(Tables, len(schemas)) for i, schema := range schemas {