Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/models/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type CombinedTable struct {
// TableProperties contains properties of a table
type TableProperties struct {
Engine string
EngineArgs string
OrderBy []string
PrimaryKey []string
PartitionBy string
Expand Down Expand Up @@ -296,6 +297,7 @@ func tablesFromTarget(tables []Table) []CombinedTable {
func tableToProperties(t Table) *TableProperties {
return &TableProperties{
Engine: t.Engine,
EngineArgs: t.EngineArgs,
OrderBy: t.OrderBy,
PrimaryKey: t.PrimaryKey,
PartitionBy: t.PartitionBy,
Expand Down
1 change: 1 addition & 0 deletions internal/models/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Database struct {
type Table struct {
Name string
Engine string
EngineArgs string
Columns []Column
OrderBy []string
PrimaryKey []string
Expand Down
18 changes: 18 additions & 0 deletions internal/models/schemabuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (b *schemaBuilder) clone() *schemaBuilder {
cloned.schema.Databases[i].Tables[j] = Table{
Name: tbl.Name,
Engine: tbl.Engine,
EngineArgs: tbl.EngineArgs,
OrderBy: append([]string{}, tbl.OrderBy...),
PrimaryKey: append([]string{}, tbl.PrimaryKey...),
PartitionBy: tbl.PartitionBy,
Expand Down Expand Up @@ -243,6 +244,23 @@ func (b *schemaBuilder) setTableEngine(dbName, tableName, newEngine string) *sch
return b
}

// setTableEngineArgs sets a table's EngineArgs (the parenthesized engine
// argument list, e.g. "xo_received_at" for ReplacingMergeTree(xo_received_at)).
func (b *schemaBuilder) setTableEngineArgs(dbName, tableName, args string) *schemaBuilder {
for i := range b.schema.Databases {
if b.schema.Databases[i].Name != dbName {
continue
}
for j := range b.schema.Databases[i].Tables {
if b.schema.Databases[i].Tables[j].Name == tableName {
b.schema.Databases[i].Tables[j].EngineArgs = args
return b
}
}
}
return b
}

// setTablePrimaryKey changes a table's PRIMARY KEY clause.
func (b *schemaBuilder) setTablePrimaryKey(dbName, tableName string, primaryKey []string) *schemaBuilder {
for i := range b.schema.Databases {
Expand Down
8 changes: 7 additions & 1 deletion internal/models/syncplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (g *SyncPlanGenerator) processTablesInDatabase(db CombinedDatabase) []Opera
// If ENGINE, ORDER BY, or PRIMARY KEY also changed, a rename is not enough: drop + recreate
needsRecreate := targetTable != nil && table.Source != nil && targetTable.Target != nil &&
(table.Source.Engine != targetTable.Target.Engine ||
table.Source.EngineArgs != targetTable.Target.EngineArgs ||
!equalStringSlices(table.Source.OrderBy, targetTable.Target.OrderBy) ||
!equalStringSlices(table.Source.PrimaryKey, targetTable.Target.PrimaryKey))

Expand Down Expand Up @@ -311,6 +312,7 @@ func (g *SyncPlanGenerator) processTableChanges(dbName string, table CombinedTab
// Check if table properties changed (engine, order by, primary key, etc.)
if table.Source != nil && table.Target != nil {
if table.Source.Engine != table.Target.Engine ||
table.Source.EngineArgs != table.Target.EngineArgs ||
!equalStringSlices(table.Source.OrderBy, table.Target.OrderBy) ||
!equalStringSlices(table.Source.PrimaryKey, table.Target.PrimaryKey) {
// ENGINE, ORDER BY, or PRIMARY KEY changed - requires drop + recreate
Expand Down Expand Up @@ -701,7 +703,11 @@ func buildCreateTableSQL(dbName string, table CombinedTable) string {
colDefs = append(colDefs, "PROJECTION "+n+" ("+t.Projections[n]+")")
}
}
sql := fmt.Sprintf("CREATE TABLE %s.%s (%s) ENGINE = %s", quoteIdent(dbName), quoteIdent(table.Name), strings.Join(colDefs, ", "), t.Engine)
engine := t.Engine
if t.EngineArgs != "" {
engine += "(" + t.EngineArgs + ")"
}
sql := fmt.Sprintf("CREATE TABLE %s.%s (%s) ENGINE = %s", quoteIdent(dbName), quoteIdent(table.Name), strings.Join(colDefs, ", "), engine)
if len(t.OrderBy) > 0 {
sql += " ORDER BY (" + strings.Join(t.OrderBy, ", ") + ")"
} else if strings.Contains(t.Engine, "MergeTree") {
Expand Down
20 changes: 20 additions & 0 deletions internal/models/syncplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,26 @@ func TestSyncPlanGenerator_TableOperations(t *testing.T) {
}},
},
},
{
name: "table engine args added (ReplacingMergeTree gains version column)",
from: func() Schema {
return baseSchema().
setTableEngine("db1", "users", "ReplacingMergeTree").
build()
},
to: func() Schema {
return baseSchema().
setTableEngine("db1", "users", "ReplacingMergeTree").
setTableEngineArgs("db1", "users", "xo_received_at").
build()
},
wantOperations: []expectedOperation{
{level: LevelTable, action: ActionDrop, statements: []string{"DROP TABLE IF EXISTS `db1`.`users`;"}},
{level: LevelTable, action: ActionCreate, statements: []string{
"CREATE TABLE `db1`.`users` (`id` Int32, `name` String) ENGINE = ReplacingMergeTree(xo_received_at) ORDER BY (id);",
}},
},
},
{
name: "table order by changed",
from: func() Schema {
Expand Down
58 changes: 58 additions & 0 deletions internal/repositories/clickhouse/loadschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clickhouse
import (
"context"
"fmt"
"regexp"
"strings"

"github.com/anytoe/chsync/internal/models"
Expand Down Expand Up @@ -224,6 +225,7 @@ func (c *Client) loadTables(ctx context.Context, databases []string, f Filter) (
table := models.Table{
Name: name,
Engine: models.NormalizeEngine(engine),
EngineArgs: parseEngineArgs(engineFull),
PartitionBy: partitionKey,
Settings: parseSettings(engineFull),
}
Expand All @@ -239,6 +241,62 @@ func (c *Client) loadTables(ctx context.Context, databases []string, f Filter) (
return tables, rows.Err()
}

// replicationParamsRe matches the two leading replication arguments that
// Shared*/Replicated* MergeTree engines carry inside their engine arg list:
//
// '/clickhouse/tables/{uuid}/{shard}', '{replica}'[, <real args>]
//
// These are infrastructure parameters managed by ClickHouse Cloud and not part
// of the logical table definition, so they are stripped to mirror
// NormalizeEngine which already strips the "Shared" prefix from the engine name.
var replicationParamsRe = regexp.MustCompile(`^'/clickhouse/tables/\{uuid\}/\{shard\}',\s*'\{replica\}'(?:,\s*)?`)

// parseEngineArgs extracts the parenthesized engine arguments from a
// system.tables engine_full string. ClickHouse formats engine_full as
// "<Engine>(<args>) [ORDER BY ...] [PARTITION BY ...] [SETTINGS ...]"; an
// engine reported with no args (e.g. "MergeTree") has no parens at all and
// this returns "".
//
// The version column on engines like ReplacingMergeTree(xo_received_at) lives
// here and would be lost without this — system.tables.engine returns only the
// bare engine name.
func parseEngineArgs(engineFull string) string {
open := strings.IndexByte(engineFull, '(')
if open < 0 {
return ""
}
depth := 0
inQuote := false
close := -1
for i := open; i < len(engineFull); i++ {
ch := engineFull[i]
if ch == '\'' {
inQuote = !inQuote
continue
}
if inQuote {
continue
}
switch ch {
case '(':
depth++
case ')':
depth--
if depth == 0 {
close = i
}
}
if close != -1 {
break
}
}
if close < 0 {
return ""
}
args := engineFull[open+1 : close]
return replicationParamsRe.ReplaceAllString(args, "")
}

// parseSettings extracts the SETTINGS clause from a system.tables engine_full
// string and returns it as a key→value map. ClickHouse formats engine_full as
// "<Engine>(...) [ORDER BY ...] [PARTITION BY ...] [SETTINGS k1 = v1, k2 = v2, ...]".
Expand Down
63 changes: 63 additions & 0 deletions internal/repositories/clickhouse/loadschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,66 @@ func TestParseSettings(t *testing.T) {
})
}
}

func TestParseEngineArgs(t *testing.T) {
tests := []struct {
name string
in string
want string
}{
{
name: "no parens at all",
in: "MergeTree ORDER BY id",
want: "",
},
{
name: "empty parens",
in: "MergeTree() ORDER BY id SETTINGS index_granularity = 8192",
want: "",
},
{
name: "ReplacingMergeTree with version column",
in: "ReplacingMergeTree(xo_received_at) ORDER BY id SETTINGS index_granularity = 8192",
want: "xo_received_at",
},
{
name: "SummingMergeTree with column list",
in: "SummingMergeTree(a, b, c) ORDER BY id",
want: "a, b, c",
},
{
name: "VersionedCollapsingMergeTree with two args",
in: "VersionedCollapsingMergeTree(sign, version) ORDER BY id",
want: "sign, version",
},
{
name: "nested parens in args",
in: "ReplacingMergeTree(toUInt64(modified_at)) ORDER BY id",
want: "toUInt64(modified_at)",
},
{
name: "quoted string with paren inside",
in: "Distributed('cluster', 'db', 'table_(2024)', rand())",
want: "'cluster', 'db', 'table_(2024)', rand()",
},
{
name: "Shared engine strips replication params",
in: "SharedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', xo_received_at) ORDER BY id",
want: "xo_received_at",
},
{
name: "Shared engine with only replication params yields empty",
in: "SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') ORDER BY id",
want: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseEngineArgs(tt.in)
if got != tt.want {
t.Errorf("parseEngineArgs(%q) = %q, want %q", tt.in, got, tt.want)
}
})
}
}