Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ go.work
*.swp
*.swo
*~

# Local planning notes (not for upstream)
plans/
60 changes: 58 additions & 2 deletions internal/models/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,23 @@ const (

// CombinedSchema represents a merged view of two schemas
type CombinedSchema struct {
Databases []CombinedDatabase
Functions []CombinedFunction
Databases []CombinedDatabase
Functions []CombinedFunction
Dictionaries []CombinedDictionary
}

// CombinedDictionary represents a dictionary that may exist in source, target, or both
type CombinedDictionary struct {
Database string
Name string
Presence Presence
Source *DictionaryProperties
Target *DictionaryProperties
}

// DictionaryProperties contains properties of a dictionary
type DictionaryProperties struct {
CreateQuery string
}

// CombinedFunction represents a SQL UDF that may exist in source, target, or both
Expand Down Expand Up @@ -120,10 +135,51 @@ func NewCombinedSchema(from, to Schema) *CombinedSchema {
}

cs.Functions = compareFunctions(from.Functions, to.Functions)
cs.Dictionaries = compareDictionaries(from.Dictionaries, to.Dictionaries)

return cs
}

func compareDictionaries(fromDicts, toDicts []Dictionary) []CombinedDictionary {
key := func(d Dictionary) string { return d.Database + "." + d.Name }

fromMap := make(map[string]Dictionary)
for _, d := range fromDicts {
fromMap[key(d)] = d
}

inTo := make(map[string]bool)
var result []CombinedDictionary

for _, toDict := range toDicts {
k := key(toDict)
inTo[k] = true
cd := CombinedDictionary{Database: toDict.Database, Name: toDict.Name}
if fromDict, inFrom := fromMap[k]; inFrom {
cd.Presence = Both
cd.Source = &DictionaryProperties{CreateQuery: fromDict.CreateQuery}
cd.Target = &DictionaryProperties{CreateQuery: toDict.CreateQuery}
} else {
cd.Presence = Target
cd.Target = &DictionaryProperties{CreateQuery: toDict.CreateQuery}
}
result = append(result, cd)
}

for _, fromDict := range fromDicts {
if !inTo[key(fromDict)] {
result = append(result, CombinedDictionary{
Database: fromDict.Database,
Name: fromDict.Name,
Presence: Source,
Source: &DictionaryProperties{CreateQuery: fromDict.CreateQuery},
})
}
}

return result
}

func compareFunctions(fromFuncs, toFuncs []Function) []CombinedFunction {
fromMap := make(map[string]Function)
for _, f := range fromFuncs {
Expand Down
14 changes: 12 additions & 2 deletions internal/models/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (

// Schema represents a complete ClickHouse schema
type Schema struct {
Databases []Database
Functions []Function
Databases []Database
Functions []Function
Dictionaries []Dictionary
}

// Function represents a SQL user-defined function
Expand All @@ -18,6 +19,15 @@ type Function struct {
CreateQuery string
}

// Dictionary represents a ClickHouse dictionary. The full CREATE DICTIONARY
// statement is kept verbatim because dictionary bodies (SOURCE, LAYOUT,
// LIFETIME, ...) are not parsed; diffs compare the raw query.
type Dictionary struct {
Database string
Name string
CreateQuery string
}

// Database represents a ClickHouse database with its tables
type Database struct {
Name string
Expand Down
26 changes: 24 additions & 2 deletions internal/models/schemabuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ func (b *schemaBuilder) build() Schema {
func (b *schemaBuilder) clone() *schemaBuilder {
cloned := &schemaBuilder{
schema: Schema{
Databases: make([]Database, len(b.schema.Databases)),
Functions: append([]Function{}, b.schema.Functions...),
Databases: make([]Database, len(b.schema.Databases)),
Functions: append([]Function{}, b.schema.Functions...),
Dictionaries: append([]Dictionary{}, b.schema.Dictionaries...),
},
}
for i, db := range b.schema.Databases {
Expand Down Expand Up @@ -295,6 +296,27 @@ func (b *schemaBuilder) removeFunction(name string) *schemaBuilder {
return b
}

// addDictionary adds a dictionary with a verbatim CREATE DICTIONARY query.
func (b *schemaBuilder) addDictionary(dbName, name, createQuery string) *schemaBuilder {
b.schema.Dictionaries = append(b.schema.Dictionaries, Dictionary{
Database: dbName,
Name: name,
CreateQuery: createQuery,
})
return b
}

// removeDictionary removes a dictionary by database + name.
func (b *schemaBuilder) removeDictionary(dbName, name string) *schemaBuilder {
for i, d := range b.schema.Dictionaries {
if d.Database == dbName && d.Name == name {
b.schema.Dictionaries = append(b.schema.Dictionaries[:i], b.schema.Dictionaries[i+1:]...)
return b
}
}
return b
}

// removeDatabase removes a database.
func (b *schemaBuilder) removeDatabase(dbName string) *schemaBuilder {
for i, db := range b.schema.Databases {
Expand Down
54 changes: 49 additions & 5 deletions internal/models/syncplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ type Operation struct {
type OperationLevel string

const (
LevelDatabase OperationLevel = "database"
LevelTable OperationLevel = "table"
LevelColumn OperationLevel = "column"
LevelIndex OperationLevel = "index"
LevelFunction OperationLevel = "function"
LevelDatabase OperationLevel = "database"
LevelTable OperationLevel = "table"
LevelColumn OperationLevel = "column"
LevelIndex OperationLevel = "index"
LevelFunction OperationLevel = "function"
LevelDictionary OperationLevel = "dictionary"
)

// OperationAction identifies what action a schema operation performs.
Expand Down Expand Up @@ -128,6 +129,49 @@ func (g *SyncPlanGenerator) buildHybridStrategy(combined *CombinedSchema) Strate
}
}

// Process dictionary-level operations. Dictionaries are emitted after table
// operations because CLICKHOUSE-source dictionaries reference tables that
// must exist first. Body changes drop+recreate — chsync does not parse the
// CREATE DICTIONARY body.
for _, d := range combined.Dictionaries {
qname := quoteIdent(d.Database) + "." + quoteIdent(d.Name)
switch d.Presence {
case Target:
operations = append(operations, Operation{
Level: LevelDictionary,
Action: ActionCreate,
CanLoseData: false,
Statements: []string{d.Target.CreateQuery + ";"},
Explanation: "Create dictionary " + d.Name,
})
case Source:
operations = append(operations, Operation{
Level: LevelDictionary,
Action: ActionDrop,
CanLoseData: true,
Statements: []string{"DROP DICTIONARY IF EXISTS " + qname + ";"},
Explanation: "Drop dictionary " + d.Name,
})
case Both:
if d.Source.CreateQuery != d.Target.CreateQuery {
operations = append(operations, Operation{
Level: LevelDictionary,
Action: ActionDrop,
CanLoseData: true,
Statements: []string{"DROP DICTIONARY IF EXISTS " + qname + ";"},
Explanation: "Drop dictionary " + d.Name + " (body changed)",
})
operations = append(operations, Operation{
Level: LevelDictionary,
Action: ActionCreate,
CanLoseData: false,
Statements: []string{d.Target.CreateQuery + ";"},
Explanation: "Create dictionary " + d.Name,
})
}
}
}

// Process function-level operations
for _, fn := range combined.Functions {
switch fn.Presence {
Expand Down
60 changes: 60 additions & 0 deletions internal/models/syncplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,66 @@ func TestSyncPlanGenerator_FunctionOperations(t *testing.T) {
runTests(t, tests)
}

func TestSyncPlanGenerator_DictionaryOperations(t *testing.T) {
boolPtr := func(b bool) *bool { return &b }

dictV1 := "CREATE DICTIONARY db1.d (`id` UUID) PRIMARY KEY id SOURCE(CLICKHOUSE(QUERY 'SELECT id FROM db1.users')) LIFETIME(MIN 0 MAX 0) LAYOUT(COMPLEX_KEY_HASHED())"
dictV2 := "CREATE DICTIONARY db1.d (`id` UUID) PRIMARY KEY id SOURCE(CLICKHOUSE(QUERY 'SELECT id FROM db1.orders')) LIFETIME(MIN 0 MAX 0) LAYOUT(COMPLEX_KEY_HASHED())"

tests := []struct {
name string
from func() Schema
to func() Schema
wantOperations []expectedOperation
}{
{
name: "dictionary added",
from: func() Schema { return baseSchema().build() },
to: func() Schema {
return baseSchema().addDictionary("db1", "d", dictV1).build()
},
wantOperations: []expectedOperation{
{level: LevelDictionary, action: ActionCreate, canLoseData: boolPtr(false), statements: []string{dictV1 + ";"}},
},
},
{
name: "dictionary removed",
from: func() Schema {
return baseSchema().addDictionary("db1", "d", dictV1).build()
},
to: func() Schema { return baseSchema().build() },
wantOperations: []expectedOperation{
{level: LevelDictionary, action: ActionDrop, canLoseData: boolPtr(true), statements: []string{"DROP DICTIONARY IF EXISTS `db1`.`d`;"}},
},
},
{
name: "dictionary body changed",
from: func() Schema {
return baseSchema().addDictionary("db1", "d", dictV1).build()
},
to: func() Schema {
return baseSchema().addDictionary("db1", "d", dictV2).build()
},
wantOperations: []expectedOperation{
{level: LevelDictionary, action: ActionDrop, canLoseData: boolPtr(true), statements: []string{"DROP DICTIONARY IF EXISTS `db1`.`d`;"}},
{level: LevelDictionary, action: ActionCreate, canLoseData: boolPtr(false), statements: []string{dictV2 + ";"}},
},
},
{
name: "dictionary unchanged",
from: func() Schema {
return baseSchema().addDictionary("db1", "d", dictV1).build()
},
to: func() Schema {
return baseSchema().addDictionary("db1", "d", dictV1).build()
},
wantOperations: []expectedOperation{},
},
}

runTests(t, tests)
}

func TestSyncPlanGenerator_MixedOperations(t *testing.T) {
tests := []struct {
name string
Expand Down
39 changes: 38 additions & 1 deletion internal/repositories/clickhouse/loadschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func (c *Client) LoadSchema(ctx context.Context, f Filter) (*models.Schema, erro
return nil, fmt.Errorf("load functions: %w", err)
}

dictionaries, err := c.loadDictionaries(ctx, databases, f)
if err != nil {
return nil, fmt.Errorf("load dictionaries: %w", err)
}

// Assign columns to their tables
for key, cols := range columns {
if t, ok := tables[key]; ok {
Expand All @@ -92,10 +97,42 @@ func (c *Client) LoadSchema(ctx context.Context, f Filter) (*models.Schema, erro
}

schema.Functions = functions
schema.Dictionaries = dictionaries

return schema, nil
}

// loadDictionaries reads CREATE DICTIONARY statements verbatim from
// system.tables (where engine = 'Dictionary') for the given databases.
// The full create_table_query is kept; bodies are not parsed.
func (c *Client) loadDictionaries(ctx context.Context, databases []string, f Filter) ([]models.Dictionary, error) {
if len(databases) == 0 {
return nil, nil
}
cond := fmt.Sprintf("database IN (%s) AND engine = 'Dictionary'", quoted(databases))
cond += tableFilterClauses(f, "name")

query := fmt.Sprintf(
"SELECT database, name, create_table_query FROM system.tables WHERE %s ORDER BY database, name",
cond,
)
rows, err := c.Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("query dictionaries: %w", err)
}
defer rows.Close()

var dicts []models.Dictionary
for rows.Next() {
var d models.Dictionary
if err := rows.Scan(&d.Database, &d.Name, &d.CreateQuery); err != nil {
return nil, fmt.Errorf("scan dictionary: %w", err)
}
dicts = append(dicts, d)
}
return dicts, rows.Err()
}

// loadFunctions loads all SQL user-defined functions.
func (c *Client) loadFunctions(ctx context.Context) ([]models.Function, error) {
rows, err := c.Query(ctx, "SELECT name, create_query FROM system.functions WHERE origin = 'SQLUserDefined' ORDER BY name")
Expand Down Expand Up @@ -152,7 +189,7 @@ func (c *Client) loadDatabases(ctx context.Context, f Filter) ([]string, error)
// loadTables loads all tables for the given databases in a single query.
// Returns a map keyed by "database.table".
func (c *Client) loadTables(ctx context.Context, databases []string, f Filter) (map[string]models.Table, error) {
cond := fmt.Sprintf("database IN (%s)", quoted(databases))
cond := fmt.Sprintf("database IN (%s) AND engine != 'Dictionary'", quoted(databases))
cond += tableFilterClauses(f, "name")

query := fmt.Sprintf(
Expand Down