Skip to content

Add support for Map data type in Flink-ClickHouse connector #55

@rafael-gumiero

Description

@rafael-gumiero

Use case

We are using the Flink-ClickHouse connector to ingest OpenTelemetry logs into ClickHouse. The OpenTelemetry format requires Map fields to store resource attributes, scope attributes, and log attributes as key-value pairs.
According to the documentation, the Map<K,V> data type is currently not supported, which blocks us from using this connector for our observability pipeline.

Our ClickHouse table schema includes several Map fields:

sqlCREATE TABLE IF NOT EXISTS otel_logs (
    Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)),
    TimestampTime DateTime DEFAULT toDateTime(Timestamp),
    TraceId String CODEC(ZSTD(1)),
    SpanId String CODEC(ZSTD(1)),
    TraceFlags UInt8,
    SeverityText LowCardinality(String) CODEC(ZSTD(1)),
    SeverityNumber UInt8,
    ServiceName LowCardinality(String) CODEC(ZSTD(1)),
    Body String CODEC(ZSTD(1)),
    ResourceSchemaUrl LowCardinality(String) CODEC(ZSTD(1)),
    ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
    ScopeSchemaUrl LowCardinality(String) CODEC(ZSTD(1)),
    ScopeName String CODEC(ZSTD(1)),
    ScopeVersion LowCardinality(String) CODEC(ZSTD(1)),
    ScopeAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
    LogAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
    
    INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
    INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
    INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8
) ENGINE = MergeTree()
PARTITION BY toDate(TimestampTime)
PRIMARY KEY (ServiceName, TimestampTime)
ORDER BY (ServiceName, TimestampTime, Timestamp)
TTL TimestampTime + toIntervalDay(180)
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;

The three critical Map fields are:

ResourceAttributes Map(LowCardinality(String), String)
ScopeAttributes Map(LowCardinality(String), String)
LogAttributes Map(LowCardinality(String), String)

Describe the solution you'd like

Add support for the Map<K,V> data type in the Flink-ClickHouse connector.
Minimum requirement (to unblock our use case):

Support for Map<String, String> - basic string key-value maps
Proper serialization/deserialization between Flink's MapTypeInfo/Map<K, V> and ClickHouse's Map(K, V) type

Nice to have (for full OpenTelemetry compatibility):

Support for Maps with different value types (e.g., Map<String, Int32>, Map<String, Boolean>)
Support for nested Map structures (e.g., Map<String, Map<String, String>>)

The basic Map<String, String> support would already enable us to use this connector for OpenTelemetry logs, as we can convert all attribute values to strings during ingestion.

Additional context

Flink versions in use: 1.20 and 2.1.0
Environment: Production observability pipeline processing high-volume log streams

Future data type needs: Beyond Map support, we would also benefit from:

Variant type support (for schema flexibility)
JSON type support (for semi-structured data)

Related: Map support would make this connector compatible with the OpenTelemetry semantic conventions and the recommended ClickHouse schema for OTel data.

Sub-issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions