Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,30 +189,13 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s

// send migrate messages first
for _, tableOriginal := range tables.FlattenTables() {
// var table *schema.Table
table := tableOriginal.Copy(nil)
if syncClient.deterministicCQID {
// No PK adjustment should occur if `_cq_id` is not present in the table
cqIDCol := table.Columns.Get(schema.CqIDColumn.Name)
if cqIDCol == nil {
continue
}
Comment on lines -197 to -199
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this occurred, no syncMigrateMessage would be sent

for i, c := range table.Columns {
if c.Name == schema.CqIDColumn.Name {
// Ensure that the cq_id column is the primary key
table.Columns[i].PrimaryKey = true
continue
}
if !c.PrimaryKey {
continue
}
table.Columns[i].PrimaryKey = false
}
migrateMessage := &message.SyncMigrateTable{
Table: tableOriginal.Copy(nil),
}

res <- &message.SyncMigrateTable{
Table: table,
if syncClient.deterministicCQID {
schema.CqIDAsPK(migrateMessage.Table)
}
res <- migrateMessage
}

resources := make(chan *schema.Resource)
Expand Down
8 changes: 4 additions & 4 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func testColumnResolverPanic(context.Context, schema.ClientMeta, *schema.Resourc

func testTableSuccessWithData(data []any) *schema.Table {
return &schema.Table{
Name: "test_table_success",
Name: "test_table_success_with_data",
Resolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource, res chan<- any) error {
res <- data
return nil
Expand Down Expand Up @@ -74,7 +74,7 @@ func testTableSuccess() *schema.Table {

func testTableSuccessWithPK() *schema.Table {
return &schema.Table{
Name: "test_table_success",
Name: "test_table_success_pk",
Resolver: testResolverSuccess,
Columns: []schema.Column{
{
Expand All @@ -88,7 +88,7 @@ func testTableSuccessWithPK() *schema.Table {

func testTableSuccessWithCQIDPK() *schema.Table {
return &schema.Table{
Name: "test_table_success",
Name: "test_table_success_cq_id",
Resolver: testResolverSuccess,
Columns: []schema.Column{
schema.CqIDColumn,
Expand Down Expand Up @@ -411,7 +411,7 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist
initialTable := tables.Get(v.Table.Name)

pks := migratedTable.PrimaryKeys()
if deterministicCQID {
if deterministicCQID && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil {
if len(pks) != 1 {
t.Fatalf("expected 1 pk. got %d", len(pks))
}
Expand Down
20 changes: 20 additions & 0 deletions schema/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,26 @@ func AddCqIDs(table *Table) {
}
}

// CqIDAsPK sets the cq_id column as primary key if it exists
// and removes the primary key from all other columns
func CqIDAsPK(t *Table) {
cqIDCol := t.Columns.Get(CqIDColumn.Name)
if cqIDCol == nil {
return
}
for i, c := range t.Columns {
if c.Name == CqIDColumn.Name {
// Ensure that the cq_id column is the primary key
t.Columns[i].PrimaryKey = true
continue
}
if !c.PrimaryKey {
continue
}
t.Columns[i].PrimaryKey = false
}
}

func NewTablesFromArrowSchemas(schemas []*arrow.Schema) (Tables, error) {
tables := make(Tables, len(schemas))
for i, schema := range schemas {
Expand Down