Skip to content

Commit

Permalink
feat: custom settings support in native protocol (#993)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkaflik committed May 12, 2023
1 parent 2e92636 commit 412b809
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 14 deletions.
25 changes: 17 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,27 @@ type connect struct {
}

func (c *connect) settings(querySettings Settings) []proto.Setting {
settingToProtoSetting := func(k string, v any) proto.Setting {
isCustom := false
if cv, ok := v.(CustomSetting); ok {
v = cv.Value
isCustom = true
}

return proto.Setting{
Key: k,
Value: v,
Important: !isCustom,
Custom: isCustom,
}
}

settings := make([]proto.Setting, 0, len(c.opt.Settings)+len(querySettings))
for k, v := range c.opt.Settings {
settings = append(settings, proto.Setting{
Key: k,
Value: v,
})
settings = append(settings, settingToProtoSetting(k, v))
}
for k, v := range querySettings {
settings = append(settings, proto.Setting{
Key: k,
Value: v,
})
settings = append(settings, settingToProtoSetting(k, v))
}
return settings
}
Expand Down
7 changes: 7 additions & 0 deletions conn_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
}

for k, v := range opt.Settings {
if cv, ok := v.(CustomSetting); ok {
v = cv.Value
}

query.Set(k, fmt.Sprint(v))
}

Expand Down Expand Up @@ -463,6 +467,9 @@ func (h *httpConnect) createRequest(ctx context.Context, requestUrl string, read
if key == "default_format" {
continue
}
if cv, ok := value.(CustomSetting); ok {
value = cv.Value
}
query.Set(key, fmt.Sprint(value))
}
for key, value := range options.parameters {
Expand Down
8 changes: 8 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ var _contextOptionKey = &QueryOptions{
}

type Settings map[string]any

// CustomSetting is a helper struct to distinguish custom settings from important ones.
// For native protocol, is_important flag is set to value 0x02 (see https://github.com/ClickHouse/ClickHouse/blob/c873560fe7185f45eed56520ec7d033a7beb1551/src/Core/BaseSettings.h#L516-L521)
// Only string value is supported until formatting logic that exists in ClickHouse is implemented in clickhouse-go. (https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Field.cpp#L312 and https://github.com/ClickHouse/clickhouse-go/issues/992)
type CustomSetting struct {
Value string
}

type Parameters map[string]string
type (
QueryOption func(*QueryOptions) error
Expand Down
58 changes: 52 additions & 6 deletions lib/proto/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,17 @@ func (q *Query) encodeClientInfo(buffer *chproto.Buffer, revision uint64) error
type Settings []Setting

type Setting struct {
Key string
Value any
Key string
Value any
Important bool
Custom bool
}

const (
settingFlagImportant = 0x01
settingFlagCustom = 0x02
)

func (s Settings) Encode(buffer *chproto.Buffer, revision uint64) error {
for _, s := range s {
if err := s.encode(buffer, revision); err != nil {
Expand All @@ -172,8 +179,29 @@ func (s *Setting) encode(buffer *chproto.Buffer, revision uint64) error {
buffer.PutUVarInt(value)
return nil
}
buffer.PutBool(true) // is_important
buffer.PutString(fmt.Sprint(s.Value))

{
var flags uint64
if s.Important {
flags |= settingFlagImportant
}
if s.Custom {
flags |= settingFlagCustom
}
buffer.PutUVarInt(flags)
}

if s.Custom {
fieldDump, err := encodeFieldDump(s.Value)
if err != nil {
return err
}

buffer.PutString(fieldDump)
} else {
buffer.PutString(fmt.Sprint(s.Value))
}

return nil
}

Expand All @@ -195,8 +223,26 @@ func (s Parameters) Encode(buffer *chproto.Buffer, revision uint64) error {

func (s *Parameter) encode(buffer *chproto.Buffer, revision uint64) error {
buffer.PutString(s.Key)
buffer.PutUVarInt(uint64(0x02))
buffer.PutString(fmt.Sprintf("'%v'", strings.ReplaceAll(s.Value, "'", "\\'")))
buffer.PutUVarInt(uint64(settingFlagCustom))

fieldDump, err := encodeFieldDump(s.Value)
if err != nil {
return err
}

buffer.PutString(fieldDump)

return nil
}

// encodes a field dump with an appropriate type format
// implements the same logic as in ClickHouse Field::restoreFromDump (https://github.com/ClickHouse/ClickHouse/blob/master/src/Core/Field.cpp#L312)
// currently, only string type is supported
func encodeFieldDump(value any) (string, error) {
switch v := value.(type) {
case string:
return fmt.Sprintf("'%v'", strings.ReplaceAll(v, "'", "\\'")), nil
}

return "", fmt.Errorf("unsupported field type %T", value)
}
41 changes: 41 additions & 0 deletions tests/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,44 @@ func TestEmptyDatabaseConfig(t *testing.T) {
err = anotherConn.Ping(context.Background())
require.NoError(t, err)
}

func TestCustomSettings(t *testing.T) {
runInDocker, _ := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_DOCKER", "true"))
if !runInDocker {
t.Skip("Skip test in cloud environment.") // todo configure cloud instance with custom settings
}

conn, err := GetNativeConnection(clickhouse.Settings{
"custom_setting": clickhouse.CustomSetting{"custom_value"},
}, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})
require.NoError(t, err)

t.Run("get existing custom setting value", func(t *testing.T) {
row := conn.QueryRow(context.Background(), "SELECT getSetting('custom_setting')")
require.NoError(t, row.Err())

var setting string
require.NoError(t, row.Scan(&setting))
require.Equal(t, "custom_value", setting)
})

t.Run("get non-existing custom setting value", func(t *testing.T) {
row := conn.QueryRow(context.Background(), "SELECT getSetting('custom_non_existing_setting')")
require.ErrorContains(t, row.Err(), "Unknown setting custom_non_existing_setting")
})

t.Run("get custom setting value from query context", func(t *testing.T) {
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"custom_query_setting": clickhouse.CustomSetting{"custom_query_value"},
}))

row := conn.QueryRow(ctx, "SELECT getSetting('custom_query_setting')")
require.NoError(t, row.Err())

var setting string
require.NoError(t, row.Scan(&setting))
require.Equal(t, "custom_query_value", setting)
})
}
1 change: 1 addition & 0 deletions tests/resources/custom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@
</client>
</openSSL>
<display_name>clickhouse</display_name>
<custom_settings_prefixes>custom_</custom_settings_prefixes>
</clickhouse>
49 changes: 49 additions & 0 deletions tests/std/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,52 @@ func TestHTTPProxy(t *testing.T) {
return strings.Contains(text, fmt.Sprintf("Established connection to host \"%s\"", clickHouseHost))
}, 60*time.Second, time.Millisecond, "proxy logs should contain clickhouse.cloud instance host")
}

func TestCustomSettings(t *testing.T) {
runInDocker, _ := strconv.ParseBool(clickhouse_tests.GetEnv("CLICKHOUSE_USE_DOCKER", "true"))
if !runInDocker {
t.Skip("Skip test in cloud environment.")
}

dsns := map[string]clickhouse.Protocol{"Native": clickhouse.Native, "Http": clickhouse.HTTP}
for name, protocol := range dsns {
t.Run(fmt.Sprintf("%s Protocol", name), func(t *testing.T) {
conn, err := GetStdOpenDBConnection(
protocol,
clickhouse.Settings{
"custom_setting": clickhouse.CustomSetting{"custom_value"},
},
nil,
nil,
)
require.NoError(t, err)

t.Run("get existing custom setting value", func(t *testing.T) {
row := conn.QueryRowContext(context.Background(), "SELECT getSetting('custom_setting')")
require.NoError(t, row.Err())

var setting string
require.NoError(t, row.Scan(&setting))
require.Equal(t, "custom_value", setting)
})

t.Run("get non-existing custom setting value", func(t *testing.T) {
row := conn.QueryRowContext(context.Background(), "SELECT getSetting('custom_non_existing_setting')")
require.ErrorContains(t, row.Err(), "Unknown setting custom_non_existing_setting")
})

t.Run("get custom setting value from query context", func(t *testing.T) {
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"custom_query_setting": clickhouse.CustomSetting{"custom_query_value"},
}))

row := conn.QueryRowContext(ctx, "SELECT getSetting('custom_query_setting')")
require.NoError(t, row.Err())

var setting string
require.NoError(t, row.Scan(&setting))
require.Equal(t, "custom_query_value", setting)
})
})
}
}

0 comments on commit 412b809

Please sign in to comment.