Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to insert DateTime values? #57

Closed
thilo-behnke opened this issue Feb 13, 2023 · 1 comment
Closed

How to insert DateTime values? #57

thilo-behnke opened this issue Feb 13, 2023 · 1 comment
Assignees

Comments

@thilo-behnke
Copy link

thilo-behnke commented Feb 13, 2023

I need to insert a unix timestamp as a DateTime.
Kafka Connect does not have a DateTime value. How should you insert a DateTime value using the Clickhouse connector?

One way would be to insert string/int/long timestamps:

create table `dev.thilo.timestamp_str` (timestamp DateTime) ENGINE = TinyLog;

insert into `dev.thilo.timestamp_str` (timestamp) values ('1676283600');

SELECT * FROM `dev.thilo.timestamp_str`

Query id: f247748f-0178-47b9-a25d-fb9e06071f7e

┌───────────timestamp─┐
│ 2023-02-13 10:20:00 │
└─────────────────────┘

However this does not work with the Clickhouse connector, the following error is logged when trying to map a string/int field into a DateTime column:

loc_clickhouse_sink  | [2023-02-13 10:15:23,434] ERROR [clickhouse-sink-13|task-1] Table column name [timestamp] type [NONE] is not matching data column type [STRING] (com.clickhouse.kafka.connect.sink.db.ClickHouseWriter:154)

The connector could allow mapping ints/longs/string to DateTime columns. What are your thoughts on this?

Workaround: I'm aware that with the current version of the connector (v0.0.8) I can use MATERIALIZED columns to map timestamps to a DateTime column:

create table `dev.thilo.materialize_test` (timestamp Int64, date DateTime MATERIALIZED toDateTime(timestamp), also_timestamp Int64 ALIAS timestamp) ENGINE = TinyLog;

More details:

Full Error:

loc_clickhouse_sink  | [2023-02-13 10:15:23,434] ERROR [clickhouse-sink-13|task-1] Table column name [timestamp] type [NONE] is not matching data column type [STRING] (com.clickhouse.kafka.connect.sink.db.ClickHouseWriter:154)
...
loc_clickhouse_sink  | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
loc_clickhouse_sink  |  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
loc_clickhouse_sink  |  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
loc_clickhouse_sink  |  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
loc_clickhouse_sink  |  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
loc_clickhouse_sink  |  at java.base/java.lang.Thread.run(Thread.java:829)
loc_clickhouse_sink  | Caused by: java.lang.RuntimeException
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:267)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)

Avro schema:

{
  "schema": {
    "type": "record",
    "name": "timestamp_str",
    "namespace": "dev.thilo",
    "fields": [
      {
        "name": "timestamp",
        "type": "string"
      }
    ]
  }
}

Connector:

{
    "name": "clickhouse-sink",
    "config": {
        "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
        "topics": "dev.thilo.timestamp_str",
        "exactlyOnce": false,
        "hostname": "clickhouse",
        "port": "8123",
        "database": "default",
        "ssl": false,
        "username": "default",
        "tasks.max": "2",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
}
@mzitnik
Copy link
Collaborator

mzitnik commented Mar 2, 2023

I would like to deal with it with implicit conversion Kafka Connect does not support TIMESTAMP type https://kafka.apache.org/20/javadoc/org/apache/kafka/connect/data/Schema.html. So it looks like the best way to approach it is to send INT64 and to conversion, if the column type is DateTime

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants