From 821806d178be7e57188e477a757fadcefe0ac759 Mon Sep 17 00:00:00 2001 From: anytoe Date: Wed, 13 May 2026 22:07:59 +0100 Subject: [PATCH] diff dictionaries as their own object kind instead of as tables --- .gitignore | 3 + internal/models/combined.go | 60 ++++++++++++++++++- internal/models/schema.go | 14 ++++- internal/models/schemabuilder.go | 26 +++++++- internal/models/syncplan.go | 54 +++++++++++++++-- internal/models/syncplan_test.go | 60 +++++++++++++++++++ .../repositories/clickhouse/loadschema.go | 39 +++++++++++- 7 files changed, 244 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 3d314bd..79e7b51 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,6 @@ go.work *.swp *.swo *~ + +# Local planning notes (not for upstream) +plans/ diff --git a/internal/models/combined.go b/internal/models/combined.go index 13d771c..656e521 100644 --- a/internal/models/combined.go +++ b/internal/models/combined.go @@ -11,8 +11,23 @@ const ( // CombinedSchema represents a merged view of two schemas type CombinedSchema struct { - Databases []CombinedDatabase - Functions []CombinedFunction + Databases []CombinedDatabase + Functions []CombinedFunction + Dictionaries []CombinedDictionary +} + +// CombinedDictionary represents a dictionary that may exist in source, target, or both +type CombinedDictionary struct { + Database string + Name string + Presence Presence + Source *DictionaryProperties + Target *DictionaryProperties +} + +// DictionaryProperties contains properties of a dictionary +type DictionaryProperties struct { + CreateQuery string } // CombinedFunction represents a SQL UDF that may exist in source, target, or both @@ -120,10 +135,51 @@ func NewCombinedSchema(from, to Schema) *CombinedSchema { } cs.Functions = compareFunctions(from.Functions, to.Functions) + cs.Dictionaries = compareDictionaries(from.Dictionaries, to.Dictionaries) return cs } +func compareDictionaries(fromDicts, toDicts []Dictionary) []CombinedDictionary { + key := func(d Dictionary) string { return d.Database + "." + d.Name } + + fromMap := make(map[string]Dictionary) + for _, d := range fromDicts { + fromMap[key(d)] = d + } + + inTo := make(map[string]bool) + var result []CombinedDictionary + + for _, toDict := range toDicts { + k := key(toDict) + inTo[k] = true + cd := CombinedDictionary{Database: toDict.Database, Name: toDict.Name} + if fromDict, inFrom := fromMap[k]; inFrom { + cd.Presence = Both + cd.Source = &DictionaryProperties{CreateQuery: fromDict.CreateQuery} + cd.Target = &DictionaryProperties{CreateQuery: toDict.CreateQuery} + } else { + cd.Presence = Target + cd.Target = &DictionaryProperties{CreateQuery: toDict.CreateQuery} + } + result = append(result, cd) + } + + for _, fromDict := range fromDicts { + if !inTo[key(fromDict)] { + result = append(result, CombinedDictionary{ + Database: fromDict.Database, + Name: fromDict.Name, + Presence: Source, + Source: &DictionaryProperties{CreateQuery: fromDict.CreateQuery}, + }) + } + } + + return result +} + func compareFunctions(fromFuncs, toFuncs []Function) []CombinedFunction { fromMap := make(map[string]Function) for _, f := range fromFuncs { diff --git a/internal/models/schema.go b/internal/models/schema.go index 1b0b234..b23c965 100644 --- a/internal/models/schema.go +++ b/internal/models/schema.go @@ -8,8 +8,9 @@ import ( // Schema represents a complete ClickHouse schema type Schema struct { - Databases []Database - Functions []Function + Databases []Database + Functions []Function + Dictionaries []Dictionary } // Function represents a SQL user-defined function @@ -18,6 +19,15 @@ type Function struct { CreateQuery string } +// Dictionary represents a ClickHouse dictionary. The full CREATE DICTIONARY +// statement is kept verbatim because dictionary bodies (SOURCE, LAYOUT, +// LIFETIME, ...) are not parsed; diffs compare the raw query. +type Dictionary struct { + Database string + Name string + CreateQuery string +} + // Database represents a ClickHouse database with its tables type Database struct { Name string diff --git a/internal/models/schemabuilder.go b/internal/models/schemabuilder.go index 0e907e7..cca75ad 100644 --- a/internal/models/schemabuilder.go +++ b/internal/models/schemabuilder.go @@ -52,8 +52,9 @@ func (b *schemaBuilder) build() Schema { func (b *schemaBuilder) clone() *schemaBuilder { cloned := &schemaBuilder{ schema: Schema{ - Databases: make([]Database, len(b.schema.Databases)), - Functions: append([]Function{}, b.schema.Functions...), + Databases: make([]Database, len(b.schema.Databases)), + Functions: append([]Function{}, b.schema.Functions...), + Dictionaries: append([]Dictionary{}, b.schema.Dictionaries...), }, } for i, db := range b.schema.Databases { @@ -295,6 +296,27 @@ func (b *schemaBuilder) removeFunction(name string) *schemaBuilder { return b } +// addDictionary adds a dictionary with a verbatim CREATE DICTIONARY query. +func (b *schemaBuilder) addDictionary(dbName, name, createQuery string) *schemaBuilder { + b.schema.Dictionaries = append(b.schema.Dictionaries, Dictionary{ + Database: dbName, + Name: name, + CreateQuery: createQuery, + }) + return b +} + +// removeDictionary removes a dictionary by database + name. +func (b *schemaBuilder) removeDictionary(dbName, name string) *schemaBuilder { + for i, d := range b.schema.Dictionaries { + if d.Database == dbName && d.Name == name { + b.schema.Dictionaries = append(b.schema.Dictionaries[:i], b.schema.Dictionaries[i+1:]...) + return b + } + } + return b +} + // removeDatabase removes a database. func (b *schemaBuilder) removeDatabase(dbName string) *schemaBuilder { for i, db := range b.schema.Databases { diff --git a/internal/models/syncplan.go b/internal/models/syncplan.go index b39655c..e6ca68e 100644 --- a/internal/models/syncplan.go +++ b/internal/models/syncplan.go @@ -44,11 +44,12 @@ type Operation struct { type OperationLevel string const ( - LevelDatabase OperationLevel = "database" - LevelTable OperationLevel = "table" - LevelColumn OperationLevel = "column" - LevelIndex OperationLevel = "index" - LevelFunction OperationLevel = "function" + LevelDatabase OperationLevel = "database" + LevelTable OperationLevel = "table" + LevelColumn OperationLevel = "column" + LevelIndex OperationLevel = "index" + LevelFunction OperationLevel = "function" + LevelDictionary OperationLevel = "dictionary" ) // OperationAction identifies what action a schema operation performs. @@ -128,6 +129,49 @@ func (g *SyncPlanGenerator) buildHybridStrategy(combined *CombinedSchema) Strate } } + // Process dictionary-level operations. Dictionaries are emitted after table + // operations because CLICKHOUSE-source dictionaries reference tables that + // must exist first. Body changes drop+recreate — chsync does not parse the + // CREATE DICTIONARY body. + for _, d := range combined.Dictionaries { + qname := quoteIdent(d.Database) + "." + quoteIdent(d.Name) + switch d.Presence { + case Target: + operations = append(operations, Operation{ + Level: LevelDictionary, + Action: ActionCreate, + CanLoseData: false, + Statements: []string{d.Target.CreateQuery + ";"}, + Explanation: "Create dictionary " + d.Name, + }) + case Source: + operations = append(operations, Operation{ + Level: LevelDictionary, + Action: ActionDrop, + CanLoseData: true, + Statements: []string{"DROP DICTIONARY IF EXISTS " + qname + ";"}, + Explanation: "Drop dictionary " + d.Name, + }) + case Both: + if d.Source.CreateQuery != d.Target.CreateQuery { + operations = append(operations, Operation{ + Level: LevelDictionary, + Action: ActionDrop, + CanLoseData: true, + Statements: []string{"DROP DICTIONARY IF EXISTS " + qname + ";"}, + Explanation: "Drop dictionary " + d.Name + " (body changed)", + }) + operations = append(operations, Operation{ + Level: LevelDictionary, + Action: ActionCreate, + CanLoseData: false, + Statements: []string{d.Target.CreateQuery + ";"}, + Explanation: "Create dictionary " + d.Name, + }) + } + } + } + // Process function-level operations for _, fn := range combined.Functions { switch fn.Presence { diff --git a/internal/models/syncplan_test.go b/internal/models/syncplan_test.go index 13a814e..c69784c 100644 --- a/internal/models/syncplan_test.go +++ b/internal/models/syncplan_test.go @@ -535,6 +535,66 @@ func TestSyncPlanGenerator_FunctionOperations(t *testing.T) { runTests(t, tests) } +func TestSyncPlanGenerator_DictionaryOperations(t *testing.T) { + boolPtr := func(b bool) *bool { return &b } + + dictV1 := "CREATE DICTIONARY db1.d (`id` UUID) PRIMARY KEY id SOURCE(CLICKHOUSE(QUERY 'SELECT id FROM db1.users')) LIFETIME(MIN 0 MAX 0) LAYOUT(COMPLEX_KEY_HASHED())" + dictV2 := "CREATE DICTIONARY db1.d (`id` UUID) PRIMARY KEY id SOURCE(CLICKHOUSE(QUERY 'SELECT id FROM db1.orders')) LIFETIME(MIN 0 MAX 0) LAYOUT(COMPLEX_KEY_HASHED())" + + tests := []struct { + name string + from func() Schema + to func() Schema + wantOperations []expectedOperation + }{ + { + name: "dictionary added", + from: func() Schema { return baseSchema().build() }, + to: func() Schema { + return baseSchema().addDictionary("db1", "d", dictV1).build() + }, + wantOperations: []expectedOperation{ + {level: LevelDictionary, action: ActionCreate, canLoseData: boolPtr(false), statements: []string{dictV1 + ";"}}, + }, + }, + { + name: "dictionary removed", + from: func() Schema { + return baseSchema().addDictionary("db1", "d", dictV1).build() + }, + to: func() Schema { return baseSchema().build() }, + wantOperations: []expectedOperation{ + {level: LevelDictionary, action: ActionDrop, canLoseData: boolPtr(true), statements: []string{"DROP DICTIONARY IF EXISTS `db1`.`d`;"}}, + }, + }, + { + name: "dictionary body changed", + from: func() Schema { + return baseSchema().addDictionary("db1", "d", dictV1).build() + }, + to: func() Schema { + return baseSchema().addDictionary("db1", "d", dictV2).build() + }, + wantOperations: []expectedOperation{ + {level: LevelDictionary, action: ActionDrop, canLoseData: boolPtr(true), statements: []string{"DROP DICTIONARY IF EXISTS `db1`.`d`;"}}, + {level: LevelDictionary, action: ActionCreate, canLoseData: boolPtr(false), statements: []string{dictV2 + ";"}}, + }, + }, + { + name: "dictionary unchanged", + from: func() Schema { + return baseSchema().addDictionary("db1", "d", dictV1).build() + }, + to: func() Schema { + return baseSchema().addDictionary("db1", "d", dictV1).build() + }, + wantOperations: []expectedOperation{}, + }, + } + + runTests(t, tests) +} + func TestSyncPlanGenerator_MixedOperations(t *testing.T) { tests := []struct { name string diff --git a/internal/repositories/clickhouse/loadschema.go b/internal/repositories/clickhouse/loadschema.go index 0e9c6de..b2b871c 100644 --- a/internal/repositories/clickhouse/loadschema.go +++ b/internal/repositories/clickhouse/loadschema.go @@ -71,6 +71,11 @@ func (c *Client) LoadSchema(ctx context.Context, f Filter) (*models.Schema, erro return nil, fmt.Errorf("load functions: %w", err) } + dictionaries, err := c.loadDictionaries(ctx, databases, f) + if err != nil { + return nil, fmt.Errorf("load dictionaries: %w", err) + } + // Assign columns to their tables for key, cols := range columns { if t, ok := tables[key]; ok { @@ -92,10 +97,42 @@ func (c *Client) LoadSchema(ctx context.Context, f Filter) (*models.Schema, erro } schema.Functions = functions + schema.Dictionaries = dictionaries return schema, nil } +// loadDictionaries reads CREATE DICTIONARY statements verbatim from +// system.tables (where engine = 'Dictionary') for the given databases. +// The full create_table_query is kept; bodies are not parsed. +func (c *Client) loadDictionaries(ctx context.Context, databases []string, f Filter) ([]models.Dictionary, error) { + if len(databases) == 0 { + return nil, nil + } + cond := fmt.Sprintf("database IN (%s) AND engine = 'Dictionary'", quoted(databases)) + cond += tableFilterClauses(f, "name") + + query := fmt.Sprintf( + "SELECT database, name, create_table_query FROM system.tables WHERE %s ORDER BY database, name", + cond, + ) + rows, err := c.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("query dictionaries: %w", err) + } + defer rows.Close() + + var dicts []models.Dictionary + for rows.Next() { + var d models.Dictionary + if err := rows.Scan(&d.Database, &d.Name, &d.CreateQuery); err != nil { + return nil, fmt.Errorf("scan dictionary: %w", err) + } + dicts = append(dicts, d) + } + return dicts, rows.Err() +} + // loadFunctions loads all SQL user-defined functions. func (c *Client) loadFunctions(ctx context.Context) ([]models.Function, error) { rows, err := c.Query(ctx, "SELECT name, create_query FROM system.functions WHERE origin = 'SQLUserDefined' ORDER BY name") @@ -152,7 +189,7 @@ func (c *Client) loadDatabases(ctx context.Context, f Filter) ([]string, error) // loadTables loads all tables for the given databases in a single query. // Returns a map keyed by "database.table". func (c *Client) loadTables(ctx context.Context, databases []string, f Filter) (map[string]models.Table, error) { - cond := fmt.Sprintf("database IN (%s)", quoted(databases)) + cond := fmt.Sprintf("database IN (%s) AND engine != 'Dictionary'", quoted(databases)) cond += tableFilterClauses(f, "name") query := fmt.Sprintf(