-
Notifications
You must be signed in to change notification settings - Fork 1
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Description
Flink 2.1.0, Java17, Clickhouse 25.8.1 on Ubuntu24
Steps to reproduce
- Tried to inject 3 billion records from Entities that are instantiated from CSV files. No errors in the conversion.
- Used 10 TaskManagers,
env.setParallelism(40) - At some point I get this error, in my case after ~200 million Entities are injected successfully
Error Log or Exception StackTrace
2025-09-09 14:21:46,187 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Gzipped CSV Source -> Map -> Filter -> Sink: Writer (17/40)#81 (e01de927fad51df3c0211700c3d0219a_cbc357ccb763df2852fee8c4fc7d55f2_16_81) switched from INITIALIZING to FAILED with failure cause:
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
at org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:140) ~[flink-dist-2.1.0.jar:2.1.0]
at java.base/java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:119) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:103) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:171) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:142) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:304) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) [flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) [flink-dist-2.1.0.jar:2.1.0]
at java.base/java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.io.IOException: Unsupported version: 1123
at org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncSinkSerializer.deserializeRequestFromStream(ClickHouseAsyncSinkSerializer.java:41) ~[blob_p-ea3ad9f3732e1fa10aca39e3bd3cecfd6494fb2b-15c484d7f218d531026537aac0c41986:?]
at org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncSinkSerializer.deserializeRequestFromStream(ClickHouseAsyncSinkSerializer.java:12) ~[blob_p-ea3ad9f3732e1fa10aca39e3bd3cecfd6494fb2b-15c484d7f218d531026537aac0c41986:?]
at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:81) ~[flink-connector-files-2.1.0.jar:2.1.0]
at org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer.deserialize(AsyncSinkWriterStateSerializer.java:39) ~[flink-connector-files-2.1.0.jar:2.1.0]
at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:227) ~[flink-dist-2.1.0.jar:2.1.0]
at org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138) ~[flink-dist-2.1.0.jar:2.1.0]
... 17 more
$ kube logs flink10-taskmanager-6 | grep Exception | uniq
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
Caused by: java.io.IOException: Unsupported version: 1327
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
Caused by: java.io.IOException: Unsupported version: 1361
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
Caused by: java.io.IOException: Unsupported version: 1123
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
Caused by: java.io.IOException: Unsupported version: 1224
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
Caused by: java.io.IOException: Unsupported version: 1257
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
Caused by: java.io.IOException: Unsupported version: 1058
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
Caused by: java.io.IOException: Unsupported version: 1176
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
Caused by: java.io.IOException: Unsupported version: 1139
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
Caused by: java.io.IOException: Unsupported version: 1256
org.apache.flink.util.FlinkRuntimeException: Failed to deserialize value
...
Expected Behaviour
Code Example
Using POJOConvertor<T> only with defaultsSupport = false, hasDefault = false. Respected the exact order of columns in the definition.
Serialize.writeInt32(out, input.field1(), false, false, ClickHouseDataType.Int32, false, "field1");
Serialize.writeString(out, input.field2(), false, false, ClickHouseDataType.String, false, "field2");
Serialize.writeTimeDate(out, input.field3(), false, false, ClickHouseDataType.DateTime, false, "field3");
...Configuration
Client Configuration
var clickHouseClientConfig = new ClickHouseClientConfig(
// internal IP of ClickHouse instance
url,
"default",
"_____________________________",
"alerts",
"alerts"
);
clickHouseClientConfig.setSupportDefault(false);
// convertor & sink
var entityConvertor = new EntityJsonOutputStreamConvertor();
var convertorEntity = new ClickHouseConvertor<>(EntityJsonRow.class, entityConvertor);
var clickhouseSink = new ClickHouseAsyncSink<>(
convertorEntity,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
clickHouseClientConfig
);
events.sinkTo(clickhouseSink);Environment
- Cloud
- Connector version: master, changed Flink libraries to point to 2.1.0, Java17
- Language version: 17
- OS: Ubuntu 24
ClickHouse Server
- ClickHouse Server version: 25.8.1
- ClickHouse Server non-default settings, if any:
CREATE TABLEstatements for tables involved:- Sample data for all these tables, use clickhouse-obfuscator if necessary
mzitnik and mbier
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working