From b600f5c3f831bc0cf8aec5c631fe7be8fb51a925 Mon Sep 17 00:00:00 2001 From: bbernays Date: Fri, 26 Jan 2024 09:53:02 -0600 Subject: [PATCH 1/3] initial --- docs/json.go | 18 ++++++++++-------- schema/arrow.go | 9 +++++---- schema/column.go | 36 ++++++++++++++++++++++++++---------- schema/doc.go | 2 -- schema/resource.go | 24 +++++++++++++++++++----- schema/table.go | 11 +++++++++++ transformers/struct.go | 23 +++++++++++++++++++++++ 7 files changed, 94 insertions(+), 29 deletions(-) delete mode 100644 schema/doc.go diff --git a/docs/json.go b/docs/json.go index 8972a86b8c..2601ddb316 100644 --- a/docs/json.go +++ b/docs/json.go @@ -18,10 +18,11 @@ type jsonTable struct { } type jsonColumn struct { - Name string `json:"name"` - Type string `json:"type"` - IsPrimaryKey bool `json:"is_primary_key,omitempty"` - IsIncrementalKey bool `json:"is_incremental_key,omitempty"` + Name string `json:"name"` + Type string `json:"type"` + IsPrimaryKey bool `json:"is_primary_key,omitempty"` + IsVirtualPrimaryKey bool `json:"is_virtual_primary_key,omitempty"` + IsIncrementalKey bool `json:"is_incremental_key,omitempty"` } func (g *Generator) renderTablesAsJSON(dir string) error { @@ -44,10 +45,11 @@ func (g *Generator) jsonifyTables(tables schema.Tables) []jsonTable { jsonColumns := make([]jsonColumn, len(table.Columns)) for c, col := range table.Columns { jsonColumns[c] = jsonColumn{ - Name: col.Name, - Type: col.Type.String(), - IsPrimaryKey: col.PrimaryKey, - IsIncrementalKey: col.IncrementalKey, + Name: col.Name, + Type: col.Type.String(), + IsPrimaryKey: col.PrimaryKey, + IsVirtualPrimaryKey: col.VirtualPrimaryKey, + IsIncrementalKey: col.IncrementalKey, } } jsonTables[i] = jsonTable{ diff --git a/schema/arrow.go b/schema/arrow.go index e882625334..31af61c666 100644 --- a/schema/arrow.go +++ b/schema/arrow.go @@ -5,10 +5,11 @@ import ( ) const ( - MetadataUnique = "cq:extension:unique" - MetadataPrimaryKey = "cq:extension:primary_key" - MetadataConstraintName = "cq:extension:constraint_name" - MetadataIncremental = "cq:extension:incremental" + MetadataUnique = "cq:extension:unique" + MetadataPrimaryKey = "cq:extension:primary_key" + MetadataVirtualPrimaryKey = "cq:extension:virtual_primary_key" + MetadataConstraintName = "cq:extension:constraint_name" + MetadataIncremental = "cq:extension:incremental" MetadataTrue = "true" MetadataFalse = "false" diff --git a/schema/column.go b/schema/column.go index 3fd93e1f47..19993002e2 100644 --- a/schema/column.go +++ b/schema/column.go @@ -43,6 +43,9 @@ type Column struct { IncrementalKey bool `json:"incremental_key"` // Unique requires the destinations supporting this to mark this column as unique Unique bool `json:"unique"` + + // VirtualPrimaryKey is a flag that indicates if the column is used as part of the input to calculate the value of `_cq_id`. + VirtualPrimaryKey bool `json:"virtual_primary_key"` } // NewColumnFromArrowField creates a new Column from an arrow.Field @@ -64,14 +67,18 @@ func NewColumnFromArrowField(f arrow.Field) Column { v, ok = f.Metadata.GetValue(MetadataIncremental) column.IncrementalKey = ok && v == MetadataTrue + v, ok = f.Metadata.GetValue(MetadataVirtualPrimaryKey) + column.VirtualPrimaryKey = ok && v == MetadataTrue + return column } func (c Column) ToArrowField() arrow.Field { mdKV := map[string]string{ - MetadataPrimaryKey: MetadataFalse, - MetadataUnique: MetadataFalse, - MetadataIncremental: MetadataFalse, + MetadataPrimaryKey: MetadataFalse, + MetadataUnique: MetadataFalse, + MetadataIncremental: MetadataFalse, + MetadataVirtualPrimaryKey: MetadataFalse, } if c.PrimaryKey { mdKV[MetadataPrimaryKey] = MetadataTrue @@ -82,6 +89,9 @@ func (c Column) ToArrowField() arrow.Field { if c.IncrementalKey { mdKV[MetadataIncremental] = MetadataTrue } + if c.VirtualPrimaryKey { + mdKV[MetadataVirtualPrimaryKey] = MetadataTrue + } return arrow.Field{ Name: c.Name, @@ -93,13 +103,14 @@ func (c Column) ToArrowField() arrow.Field { func (c Column) MarshalJSON() ([]byte, error) { type Alias struct { - Name string `json:"name"` - Type string `json:"type"` - Description string `json:"description"` - PrimaryKey bool `json:"primary_key"` - NotNull bool `json:"not_null"` - Unique bool `json:"unique"` - IncrementalKey bool `json:"incremental_key"` + Name string `json:"name"` + Type string `json:"type"` + Description string `json:"description"` + PrimaryKey bool `json:"primary_key"` + NotNull bool `json:"not_null"` + Unique bool `json:"unique"` + IncrementalKey bool `json:"incremental_key"` + VirtualPrimaryKey bool `json:"virtual_primary_key"` } var alias Alias alias.Name = c.Name @@ -109,6 +120,7 @@ func (c Column) MarshalJSON() ([]byte, error) { alias.NotNull = c.NotNull alias.Unique = c.Unique alias.IncrementalKey = c.IncrementalKey + alias.VirtualPrimaryKey = c.VirtualPrimaryKey return json.Marshal(alias) } @@ -130,6 +142,10 @@ func (c Column) String() string { if c.IncrementalKey { sb.WriteString(":IncrementalKey") } + + if c.VirtualPrimaryKey { + sb.WriteString(":VirtualPrimaryKey") + } return sb.String() } diff --git a/schema/doc.go b/schema/doc.go deleted file mode 100644 index 1a1354872c..0000000000 --- a/schema/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package schema defines types supported by tables in source plugins -package schema diff --git a/schema/resource.go b/schema/resource.go index fa17c8ada7..b817dbed76 100644 --- a/schema/resource.go +++ b/schema/resource.go @@ -3,6 +3,7 @@ package schema import ( "crypto/sha256" "fmt" + "hash" "slices" "github.com/cloudquery/plugin-sdk/v4/scalar" @@ -79,21 +80,34 @@ func (r *Resource) GetValues() scalar.Vector { //nolint:revive func (r *Resource) CalculateCQID(deterministicCQID bool) error { + // if `virtualPK` is set, we calculate the CQID based on the virtual primary keys + virtualPrimaryKeys := r.Table.VirtualPrimaryKeys() + if len(virtualPrimaryKeys) > 0 { + return r.storeCQID(uuid.NewSHA1(uuid.UUID{}, calculateCqIDValue(r, virtualPrimaryKeys).Sum(nil))) + } + + // If deterministicCQID is false, we generate a random CQID if !deterministicCQID { return r.storeCQID(uuid.New()) } names := r.Table.PrimaryKeys() + // If there are no primary keys or if CQID is the only PK, we generate a random CQID if len(names) == 0 || (len(names) == 1 && names[0] == CqIDColumn.Name) { return r.storeCQID(uuid.New()) } - slices.Sort(names) + + return r.storeCQID(uuid.NewSHA1(uuid.UUID{}, calculateCqIDValue(r, names).Sum(nil))) +} + +func calculateCqIDValue(r *Resource, cols []string) hash.Hash { h := sha256.New() - for _, name := range names { + slices.Sort(cols) + for _, col := range cols { // We need to include the column name in the hash because the same value can be present in multiple columns and therefore lead to the same hash - h.Write([]byte(name)) - h.Write([]byte(r.Get(name).String())) + h.Write([]byte(col)) + h.Write([]byte(r.Get(col).String())) } - return r.storeCQID(uuid.NewSHA1(uuid.UUID{}, h.Sum(nil))) + return h } func (r *Resource) storeCQID(value uuid.UUID) error { diff --git a/schema/table.go b/schema/table.go index ab058d7219..989ee5ffc2 100644 --- a/schema/table.go +++ b/schema/table.go @@ -589,6 +589,17 @@ func (t *Table) IncrementalKeys() []string { return incrementalKeys } +func (t *Table) VirtualPrimaryKeys() []string { + var virtualPrimaryKeys []string + for _, c := range t.Columns { + if c.VirtualPrimaryKey { + virtualPrimaryKeys = append(virtualPrimaryKeys, c.Name) + } + } + + return virtualPrimaryKeys +} + func (t *Table) TableNames() []string { ret := []string{t.Name} for _, rel := range t.Relations { diff --git a/transformers/struct.go b/transformers/struct.go index 0148cb714d..ea73f4c6af 100644 --- a/transformers/struct.go +++ b/transformers/struct.go @@ -26,6 +26,8 @@ type structTransformer struct { structFieldsToUnwrap []string pkFields []string pkFieldsFound []string + virtualPKFields []string + virtualPKFieldsFound []string } type NameTransformer func(reflect.StructField) (string, error) @@ -117,6 +119,13 @@ func WithPrimaryKeys(fields ...string) StructTransformerOption { } } +// WithVirtualPrimaryKeys allows to specify what struct fields should be used as virtual primary keys +func WithVirtualPrimaryKeys(fields ...string) StructTransformerOption { + return func(t *structTransformer) { + t.virtualPKFields = fields + } +} + func TransformWithStruct(st any, opts ...StructTransformerOption) schema.Transform { t := &structTransformer{ nameTransformer: DefaultNameTransformer, @@ -159,6 +168,10 @@ func TransformWithStruct(st any, opts ...StructTransformerOption) schema.Transfo if diff := funk.SubtractString(t.pkFields, t.pkFieldsFound); len(diff) > 0 { return fmt.Errorf("failed to create all of the desired primary keys: %v", diff) } + + if diff := funk.SubtractString(t.virtualPKFields, t.virtualPKFieldsFound); len(diff) > 0 { + return fmt.Errorf("failed to create all of the desired virtual primary keys: %v", diff) + } return nil } } @@ -286,6 +299,16 @@ func (t *structTransformer) addColumnFromField(field reflect.StructField, parent } } + for _, pk := range t.virtualPKFields { + if pk == path { + // use path to allow the following + // 1. Don't duplicate the PK fields if the unwrapped struct contains a fields with the same name + // 2. Allow specifying the nested unwrapped field as part of the PK. + column.VirtualPrimaryKey = true + t.virtualPKFieldsFound = append(t.virtualPKFieldsFound, pk) + } + } + t.table.Columns = append(t.table.Columns, column) return nil From 1dc48eb51c4e8d68841284b080dbad178836e47e Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 30 Jan 2024 10:37:57 -0600 Subject: [PATCH 2/3] Update scheduler_test.go --- scheduler/scheduler_test.go | 171 +++++++++++++++++++++--------------- 1 file changed, 99 insertions(+), 72 deletions(-) diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 254c6b7d6f..f3e3ab158d 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -101,6 +101,23 @@ func testTableSuccessWithCQIDPK() *schema.Table { } } +func testTableSuccessWithCQIDPKVirtualPK() *schema.Table { + cqID := schema.CqIDColumn + cqID.PrimaryKey = true + return &schema.Table{ + Name: "test_table_succes_vpk__cq_id", + Resolver: testResolverSuccess, + Columns: []schema.Column{ + cqID, + { + Name: "test_column", + Type: arrow.PrimitiveTypes.Int64, + VirtualPrimaryKey: true, + }, + }, + } +} + func testTableResolverPanic() *schema.Table { return &schema.Table{ Name: "test_table_resolver_panic", @@ -270,6 +287,16 @@ var syncTestCases = []syncTestCase{ }, deterministicCQID: false, }, + { + table: testTableSuccessWithCQIDPKVirtualPK(), + data: []scalar.Vector{ + { + // This value will not be validated as it will be randomly set by the scheduler + &scalar.UUID{}, + &scalar.Int{Value: 3, Valid: true}, + }, + }, + }, } func TestScheduler(t *testing.T) { @@ -289,77 +316,6 @@ func TestScheduler(t *testing.T) { } } -func TestScheduler_Cancellation(t *testing.T) { - data := make([]any, 100) - - tests := []struct { - name string - data []any - cancel bool - messageCount int - }{ - { - name: "should consume all message", - data: data, - cancel: false, - messageCount: len(data) + 1, // 9 data + 1 migration message - }, - { - name: "should not consume all message on cancel", - data: data, - cancel: true, - messageCount: len(data) + 1, // 9 data + 1 migration message - }, - } - - for _, strategy := range AllStrategies { - strategy := strategy - for _, tc := range tests { - tc := tc - t.Run(fmt.Sprintf("%s_%s", tc.name, strategy.String()), func(t *testing.T) { - logger := zerolog.New(zerolog.NewTestWriter(t)) - if tc.cancel { - logger = zerolog.Nop() // FIXME without this, zerolog usage causes a race condition when tests are run with `-race -count=100` - } - sc := NewScheduler(WithLogger(logger), WithStrategy(strategy)) - - messages := make(chan message.SyncMessage) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go func() { - err := sc.Sync( - ctx, - &testExecutionClient{}, - []*schema.Table{testTableSuccessWithData(tc.data)}, - messages, - ) - if tc.cancel { - assert.Equal(t, err, context.Canceled) - } else { - require.NoError(t, err) - } - close(messages) - }() - - messageConsumed := 0 - for range messages { - if tc.cancel { - cancel() - } - messageConsumed++ - } - - if tc.cancel { - assert.NotEqual(t, tc.messageCount, messageConsumed) - } else { - assert.Equal(t, tc.messageCount, messageConsumed) - } - }) - } - } -} - // nolint:revive func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, deterministicCQID bool) { ctx := context.Background() @@ -411,7 +367,7 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist initialTable := tables.Get(v.Table.Name) pks := migratedTable.PrimaryKeys() - if deterministicCQID && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil { + if (deterministicCQID || len(migratedTable.VirtualPrimaryKeys()) > 0) && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil { if len(pks) != 1 { t.Fatalf("expected 1 pk. got %d", len(pks)) } @@ -433,3 +389,74 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist t.Fatalf("expected %d resources. got %d", len(tc.data), i) } } + +func TestScheduler_Cancellation(t *testing.T) { + data := make([]any, 100) + + tests := []struct { + name string + data []any + cancel bool + messageCount int + }{ + { + name: "should consume all message", + data: data, + cancel: false, + messageCount: len(data) + 1, // 9 data + 1 migration message + }, + { + name: "should not consume all message on cancel", + data: data, + cancel: true, + messageCount: len(data) + 1, // 9 data + 1 migration message + }, + } + + for _, strategy := range AllStrategies { + strategy := strategy + for _, tc := range tests { + tc := tc + t.Run(fmt.Sprintf("%s_%s", tc.name, strategy.String()), func(t *testing.T) { + logger := zerolog.New(zerolog.NewTestWriter(t)) + if tc.cancel { + logger = zerolog.Nop() // FIXME without this, zerolog usage causes a race condition when tests are run with `-race -count=100` + } + sc := NewScheduler(WithLogger(logger), WithStrategy(strategy)) + + messages := make(chan message.SyncMessage) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + err := sc.Sync( + ctx, + &testExecutionClient{}, + []*schema.Table{testTableSuccessWithData(tc.data)}, + messages, + ) + if tc.cancel { + assert.Equal(t, err, context.Canceled) + } else { + require.NoError(t, err) + } + close(messages) + }() + + messageConsumed := 0 + for range messages { + if tc.cancel { + cancel() + } + messageConsumed++ + } + + if tc.cancel { + assert.NotEqual(t, tc.messageCount, messageConsumed) + } else { + assert.Equal(t, tc.messageCount, messageConsumed) + } + }) + } + } +} From f195069e4e3e4829bd85d56dc3a18eaef95e7e28 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 30 Jan 2024 11:02:30 -0600 Subject: [PATCH 3/3] Update name from VirtualPK to PrimaryKeyComponents --- docs/json.go | 20 +++++++++--------- scheduler/scheduler_test.go | 12 +++++------ schema/arrow.go | 10 ++++----- schema/column.go | 42 ++++++++++++++++++------------------- schema/resource.go | 8 +++---- schema/table.go | 10 ++++----- transformers/struct.go | 20 +++++++++--------- 7 files changed, 61 insertions(+), 61 deletions(-) diff --git a/docs/json.go b/docs/json.go index 2601ddb316..922e733be1 100644 --- a/docs/json.go +++ b/docs/json.go @@ -18,11 +18,11 @@ type jsonTable struct { } type jsonColumn struct { - Name string `json:"name"` - Type string `json:"type"` - IsPrimaryKey bool `json:"is_primary_key,omitempty"` - IsVirtualPrimaryKey bool `json:"is_virtual_primary_key,omitempty"` - IsIncrementalKey bool `json:"is_incremental_key,omitempty"` + Name string `json:"name"` + Type string `json:"type"` + IsPrimaryKey bool `json:"is_primary_key,omitempty"` + IsPrimaryKeyComponent bool `json:"is_primary_key_component,omitempty"` + IsIncrementalKey bool `json:"is_incremental_key,omitempty"` } func (g *Generator) renderTablesAsJSON(dir string) error { @@ -45,11 +45,11 @@ func (g *Generator) jsonifyTables(tables schema.Tables) []jsonTable { jsonColumns := make([]jsonColumn, len(table.Columns)) for c, col := range table.Columns { jsonColumns[c] = jsonColumn{ - Name: col.Name, - Type: col.Type.String(), - IsPrimaryKey: col.PrimaryKey, - IsVirtualPrimaryKey: col.VirtualPrimaryKey, - IsIncrementalKey: col.IncrementalKey, + Name: col.Name, + Type: col.Type.String(), + IsPrimaryKey: col.PrimaryKey, + IsPrimaryKeyComponent: col.PrimaryKeyComponent, + IsIncrementalKey: col.IncrementalKey, } } jsonTables[i] = jsonTable{ diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index f3e3ab158d..2db5e5e3b8 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -101,7 +101,7 @@ func testTableSuccessWithCQIDPK() *schema.Table { } } -func testTableSuccessWithCQIDPKVirtualPK() *schema.Table { +func testTableSuccessWithPKComponents() *schema.Table { cqID := schema.CqIDColumn cqID.PrimaryKey = true return &schema.Table{ @@ -110,9 +110,9 @@ func testTableSuccessWithCQIDPKVirtualPK() *schema.Table { Columns: []schema.Column{ cqID, { - Name: "test_column", - Type: arrow.PrimitiveTypes.Int64, - VirtualPrimaryKey: true, + Name: "test_column", + Type: arrow.PrimitiveTypes.Int64, + PrimaryKeyComponent: true, }, }, } @@ -288,7 +288,7 @@ var syncTestCases = []syncTestCase{ deterministicCQID: false, }, { - table: testTableSuccessWithCQIDPKVirtualPK(), + table: testTableSuccessWithPKComponents(), data: []scalar.Vector{ { // This value will not be validated as it will be randomly set by the scheduler @@ -367,7 +367,7 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist initialTable := tables.Get(v.Table.Name) pks := migratedTable.PrimaryKeys() - if (deterministicCQID || len(migratedTable.VirtualPrimaryKeys()) > 0) && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil { + if (deterministicCQID || len(migratedTable.PrimaryKeyComponents()) > 0) && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil { if len(pks) != 1 { t.Fatalf("expected 1 pk. got %d", len(pks)) } diff --git a/schema/arrow.go b/schema/arrow.go index 31af61c666..f732f3adb2 100644 --- a/schema/arrow.go +++ b/schema/arrow.go @@ -5,11 +5,11 @@ import ( ) const ( - MetadataUnique = "cq:extension:unique" - MetadataPrimaryKey = "cq:extension:primary_key" - MetadataVirtualPrimaryKey = "cq:extension:virtual_primary_key" - MetadataConstraintName = "cq:extension:constraint_name" - MetadataIncremental = "cq:extension:incremental" + MetadataUnique = "cq:extension:unique" + MetadataPrimaryKey = "cq:extension:primary_key" + MetadataPrimaryKeyComponent = "cq:extension:primary_key_component" + MetadataConstraintName = "cq:extension:constraint_name" + MetadataIncremental = "cq:extension:incremental" MetadataTrue = "true" MetadataFalse = "false" diff --git a/schema/column.go b/schema/column.go index 19993002e2..6c4474cca2 100644 --- a/schema/column.go +++ b/schema/column.go @@ -44,8 +44,8 @@ type Column struct { // Unique requires the destinations supporting this to mark this column as unique Unique bool `json:"unique"` - // VirtualPrimaryKey is a flag that indicates if the column is used as part of the input to calculate the value of `_cq_id`. - VirtualPrimaryKey bool `json:"virtual_primary_key"` + // PrimaryKeyComponent is a flag that indicates if the column is used as part of the input to calculate the value of `_cq_id`. + PrimaryKeyComponent bool `json:"primary_key_component"` } // NewColumnFromArrowField creates a new Column from an arrow.Field @@ -67,18 +67,18 @@ func NewColumnFromArrowField(f arrow.Field) Column { v, ok = f.Metadata.GetValue(MetadataIncremental) column.IncrementalKey = ok && v == MetadataTrue - v, ok = f.Metadata.GetValue(MetadataVirtualPrimaryKey) - column.VirtualPrimaryKey = ok && v == MetadataTrue + v, ok = f.Metadata.GetValue(MetadataPrimaryKeyComponent) + column.PrimaryKeyComponent = ok && v == MetadataTrue return column } func (c Column) ToArrowField() arrow.Field { mdKV := map[string]string{ - MetadataPrimaryKey: MetadataFalse, - MetadataUnique: MetadataFalse, - MetadataIncremental: MetadataFalse, - MetadataVirtualPrimaryKey: MetadataFalse, + MetadataPrimaryKey: MetadataFalse, + MetadataUnique: MetadataFalse, + MetadataIncremental: MetadataFalse, + MetadataPrimaryKeyComponent: MetadataFalse, } if c.PrimaryKey { mdKV[MetadataPrimaryKey] = MetadataTrue @@ -89,8 +89,8 @@ func (c Column) ToArrowField() arrow.Field { if c.IncrementalKey { mdKV[MetadataIncremental] = MetadataTrue } - if c.VirtualPrimaryKey { - mdKV[MetadataVirtualPrimaryKey] = MetadataTrue + if c.PrimaryKeyComponent { + mdKV[MetadataPrimaryKeyComponent] = MetadataTrue } return arrow.Field{ @@ -103,14 +103,14 @@ func (c Column) ToArrowField() arrow.Field { func (c Column) MarshalJSON() ([]byte, error) { type Alias struct { - Name string `json:"name"` - Type string `json:"type"` - Description string `json:"description"` - PrimaryKey bool `json:"primary_key"` - NotNull bool `json:"not_null"` - Unique bool `json:"unique"` - IncrementalKey bool `json:"incremental_key"` - VirtualPrimaryKey bool `json:"virtual_primary_key"` + Name string `json:"name"` + Type string `json:"type"` + Description string `json:"description"` + PrimaryKey bool `json:"primary_key"` + NotNull bool `json:"not_null"` + Unique bool `json:"unique"` + IncrementalKey bool `json:"incremental_key"` + PrimaryKeyComponent bool `json:"primary_key_component"` } var alias Alias alias.Name = c.Name @@ -120,7 +120,7 @@ func (c Column) MarshalJSON() ([]byte, error) { alias.NotNull = c.NotNull alias.Unique = c.Unique alias.IncrementalKey = c.IncrementalKey - alias.VirtualPrimaryKey = c.VirtualPrimaryKey + alias.PrimaryKeyComponent = c.PrimaryKeyComponent return json.Marshal(alias) } @@ -143,8 +143,8 @@ func (c Column) String() string { sb.WriteString(":IncrementalKey") } - if c.VirtualPrimaryKey { - sb.WriteString(":VirtualPrimaryKey") + if c.PrimaryKeyComponent { + sb.WriteString(":PrimaryKeyComponent") } return sb.String() } diff --git a/schema/resource.go b/schema/resource.go index b817dbed76..3e826ba8ee 100644 --- a/schema/resource.go +++ b/schema/resource.go @@ -80,10 +80,10 @@ func (r *Resource) GetValues() scalar.Vector { //nolint:revive func (r *Resource) CalculateCQID(deterministicCQID bool) error { - // if `virtualPK` is set, we calculate the CQID based on the virtual primary keys - virtualPrimaryKeys := r.Table.VirtualPrimaryKeys() - if len(virtualPrimaryKeys) > 0 { - return r.storeCQID(uuid.NewSHA1(uuid.UUID{}, calculateCqIDValue(r, virtualPrimaryKeys).Sum(nil))) + // if `PrimaryKeyComponent` is set, we calculate the CQID based on those components + pkComponents := r.Table.PrimaryKeyComponents() + if len(pkComponents) > 0 { + return r.storeCQID(uuid.NewSHA1(uuid.UUID{}, calculateCqIDValue(r, pkComponents).Sum(nil))) } // If deterministicCQID is false, we generate a random CQID diff --git a/schema/table.go b/schema/table.go index 989ee5ffc2..10e2237454 100644 --- a/schema/table.go +++ b/schema/table.go @@ -589,15 +589,15 @@ func (t *Table) IncrementalKeys() []string { return incrementalKeys } -func (t *Table) VirtualPrimaryKeys() []string { - var virtualPrimaryKeys []string +func (t *Table) PrimaryKeyComponents() []string { + var primaryKeyComponents []string for _, c := range t.Columns { - if c.VirtualPrimaryKey { - virtualPrimaryKeys = append(virtualPrimaryKeys, c.Name) + if c.PrimaryKeyComponent { + primaryKeyComponents = append(primaryKeyComponents, c.Name) } } - return virtualPrimaryKeys + return primaryKeyComponents } func (t *Table) TableNames() []string { diff --git a/transformers/struct.go b/transformers/struct.go index ea73f4c6af..105924ed6a 100644 --- a/transformers/struct.go +++ b/transformers/struct.go @@ -26,8 +26,8 @@ type structTransformer struct { structFieldsToUnwrap []string pkFields []string pkFieldsFound []string - virtualPKFields []string - virtualPKFieldsFound []string + pkComponentFields []string + pkComponentFieldsFound []string } type NameTransformer func(reflect.StructField) (string, error) @@ -119,10 +119,10 @@ func WithPrimaryKeys(fields ...string) StructTransformerOption { } } -// WithVirtualPrimaryKeys allows to specify what struct fields should be used as virtual primary keys -func WithVirtualPrimaryKeys(fields ...string) StructTransformerOption { +// WithPrimaryKeyComponents allows to specify what struct fields should be used as primary key components +func WithPrimaryKeyComponents(fields ...string) StructTransformerOption { return func(t *structTransformer) { - t.virtualPKFields = fields + t.pkComponentFields = fields } } @@ -169,8 +169,8 @@ func TransformWithStruct(st any, opts ...StructTransformerOption) schema.Transfo return fmt.Errorf("failed to create all of the desired primary keys: %v", diff) } - if diff := funk.SubtractString(t.virtualPKFields, t.virtualPKFieldsFound); len(diff) > 0 { - return fmt.Errorf("failed to create all of the desired virtual primary keys: %v", diff) + if diff := funk.SubtractString(t.pkComponentFields, t.pkComponentFieldsFound); len(diff) > 0 { + return fmt.Errorf("failed to find all of the desired primary key components: %v", diff) } return nil } @@ -299,13 +299,13 @@ func (t *structTransformer) addColumnFromField(field reflect.StructField, parent } } - for _, pk := range t.virtualPKFields { + for _, pk := range t.pkComponentFields { if pk == path { // use path to allow the following // 1. Don't duplicate the PK fields if the unwrapped struct contains a fields with the same name // 2. Allow specifying the nested unwrapped field as part of the PK. - column.VirtualPrimaryKey = true - t.virtualPKFieldsFound = append(t.virtualPKFieldsFound, pk) + column.PrimaryKeyComponent = true + t.pkComponentFieldsFound = append(t.pkComponentFieldsFound, pk) } }