From c94beb36f554e65b134f4067f1873994d5eb0a11 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Sun, 9 Nov 2025 18:12:57 +0200 Subject: [PATCH 1/3] Fix serialize and add test. --- .../sink/ClickHouseAsyncSinkSerializer.java | 14 ++++++--- .../sink/ClickHouseSinkStateTests.java | 29 +++++++++++++++++++ .../sink/ClickHouseAsyncSinkSerializer.java | 14 ++++++--- .../sink/ClickHouseSinkStateTests.java | 29 +++++++++++++++++++ .../flink/EmbeddedFlinkClusterForTests.java | 5 ++++ 5 files changed, 83 insertions(+), 8 deletions(-) create mode 100644 flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java create mode 100644 flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java diff --git a/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java b/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java index 423a6b3..79bfa9b 100644 --- a/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java +++ b/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java @@ -16,6 +16,7 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException { byte[] bytes = clickHousePayload.getPayload(); if (bytes != null) { + dataOutputStream.writeInt(V1); dataOutputStream.writeInt(bytes.length); dataOutputStream.write(bytes); } else { @@ -34,11 +35,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws } @Override - protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException { - if (version == V1) { - return deserializeV1(dataInputStream); + protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException { + if (requestSize > 0) { + int version = dataInputStream.readInt(); + if (version == V1) { + return deserializeV1(dataInputStream); + } else { + throw new IOException("Unsupported version " + version); + } } else { - throw new IOException("Unsupported version: " + version); + throw new IOException("request size < 0"); } } diff --git a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java new file mode 100644 index 0000000..ba9c76a --- /dev/null +++ b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java @@ -0,0 +1,29 @@ +package org.apache.flink.connector.clickhouse.sink; + +import org.apache.flink.connector.clickhouse.data.ClickHousePayload; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; + +public class ClickHouseSinkStateTests { + + @Test + void SerializerTest() throws Exception { + byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}; + ClickHousePayload clickHousePayload = new ClickHousePayload(data); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer(); + serializer.serializeRequestToStream(clickHousePayload, dos); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + + ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis); + Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength()); + Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload()); + } +} diff --git a/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java b/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java index 423a6b3..79bfa9b 100644 --- a/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java +++ b/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java @@ -16,6 +16,7 @@ public class ClickHouseAsyncSinkSerializer extends AsyncSinkWriterStateSerialize protected void serializeRequestToStream(ClickHousePayload clickHousePayload, DataOutputStream dataOutputStream) throws IOException { byte[] bytes = clickHousePayload.getPayload(); if (bytes != null) { + dataOutputStream.writeInt(V1); dataOutputStream.writeInt(bytes.length); dataOutputStream.write(bytes); } else { @@ -34,11 +35,16 @@ private ClickHousePayload deserializeV1(DataInputStream dataInputStream) throws } @Override - protected ClickHousePayload deserializeRequestFromStream(long version, DataInputStream dataInputStream) throws IOException { - if (version == V1) { - return deserializeV1(dataInputStream); + protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataInputStream dataInputStream) throws IOException { + if (requestSize > 0) { + int version = dataInputStream.readInt(); + if (version == V1) { + return deserializeV1(dataInputStream); + } else { + throw new IOException("Unsupported version " + version); + } } else { - throw new IOException("Unsupported version: " + version); + throw new IOException("request size < 0"); } } diff --git a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java new file mode 100644 index 0000000..ba9c76a --- /dev/null +++ b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java @@ -0,0 +1,29 @@ +package org.apache.flink.connector.clickhouse.sink; + +import org.apache.flink.connector.clickhouse.data.ClickHousePayload; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; + +public class ClickHouseSinkStateTests { + + @Test + void SerializerTest() throws Exception { + byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}; + ClickHousePayload clickHousePayload = new ClickHousePayload(data); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer(); + serializer.serializeRequestToStream(clickHousePayload, dos); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + + ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis); + Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength()); + Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload()); + } +} diff --git a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java index c8a73f6..5bb2bc9 100644 --- a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java +++ b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/flink/EmbeddedFlinkClusterForTests.java @@ -21,6 +21,10 @@ static int getFromEnvOrDefault(String key, int defaultValue) { public static void setUp() throws Exception { Configuration config = new Configuration(); + setUp(config); + } + + public static void setUp(Configuration config) throws Exception { config.set(RestOptions.PORT, REST_PORT); // web UI port (optional) config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS); flinkCluster = new MiniClusterWithClientResource( @@ -31,6 +35,7 @@ public static void setUp() throws Exception { .build()); flinkCluster.before(); } + public static void tearDown() { if (flinkCluster != null) { flinkCluster.after(); From 0e5ea14888a370a453d5449da779a4c02807af3b Mon Sep 17 00:00:00 2001 From: mzitnik Date: Sun, 9 Nov 2025 18:57:18 +0200 Subject: [PATCH 2/3] Fix exception message & extract retry to const. --- .../clickhouse/sink/ClickHouseAsyncSinkSerializer.java | 2 +- .../flink/connector/clickhouse/sink/ClickHouseSinkTests.java | 3 ++- .../clickhouse/sink/ClickHouseAsyncSinkSerializer.java | 2 +- .../flink/connector/clickhouse/sink/ClickHouseSinkTests.java | 3 ++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java b/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java index 79bfa9b..4e7e2f3 100644 --- a/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java +++ b/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java @@ -44,7 +44,7 @@ protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataI throw new IOException("Unsupported version " + version); } } else { - throw new IOException("request size < 0"); + throw new IOException("Request size: " + requestSize); } } diff --git a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index a4be3e6..2ae143d 100644 --- a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -45,6 +45,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests { static final long MAX_RECORD_SIZE_IN_BYTES = 1000; static final int STREAM_PARALLELISM = 5; + static final int NUMBER_OF_RETRIES = 10; private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) { String createTable = createSimplePOJOTableSQL(database, tableName); @@ -511,7 +512,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception { env.setParallelism(STREAM_PARALLELISM); ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); - clickHouseClientConfig.setNumberOfRetries(10); + clickHouseClientConfig.setNumberOfRetries(NUMBER_OF_RETRIES); clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults()); ElementConverter convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor); diff --git a/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java b/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java index 79bfa9b..4e7e2f3 100644 --- a/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java +++ b/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java @@ -44,7 +44,7 @@ protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataI throw new IOException("Unsupported version " + version); } } else { - throw new IOException("request size < 0"); + throw new IOException("Request size: " + requestSize); } } diff --git a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index 7528bac..b9ff4bf 100644 --- a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -45,6 +45,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests { static final long MAX_RECORD_SIZE_IN_BYTES = 1000; static final int STREAM_PARALLELISM = 5; + static final int NUMBER_OF_RETRIES = 10; private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) { String createTable = createSimplePOJOTableSQL(database, tableName); @@ -508,7 +509,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception { env.setParallelism(STREAM_PARALLELISM); ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); - clickHouseClientConfig.setNumberOfRetries(10); + clickHouseClientConfig.setNumberOfRetries(NUMBER_OF_RETRIES); clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults()); ElementConverter convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor); From 998d80bca97cd85674dd253e6b0e32055e1d5404 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Thu, 13 Nov 2025 10:06:48 +0200 Subject: [PATCH 3/3] Adding mote test & fixing edge case --- .../sink/ClickHouseAsyncSinkSerializer.java | 5 ++- .../sink/ClickHouseSinkStateTests.java | 39 ++++++++++++++++--- .../sink/ClickHouseAsyncSinkSerializer.java | 5 ++- .../sink/ClickHouseSinkStateTests.java | 38 +++++++++++++++--- .../clickhouse/data/ClickHousePayload.java | 5 ++- 5 files changed, 77 insertions(+), 15 deletions(-) diff --git a/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java b/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java index 4e7e2f3..098ffa1 100644 --- a/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java +++ b/flink-connector-clickhouse-1.17/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java @@ -20,6 +20,7 @@ protected void serializeRequestToStream(ClickHousePayload clickHousePayload, Dat dataOutputStream.writeInt(bytes.length); dataOutputStream.write(bytes); } else { + dataOutputStream.writeInt(V1); dataOutputStream.writeInt(-1); } @@ -41,10 +42,10 @@ protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataI if (version == V1) { return deserializeV1(dataInputStream); } else { - throw new IOException("Unsupported version " + version); + throw new IOException("Unsupported serialization version: " + version); } } else { - throw new IOException("Request size: " + requestSize); + throw new IOException("Invalid request size: " + requestSize); } } diff --git a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java index ba9c76a..def419e 100644 --- a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java +++ b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java @@ -4,15 +4,12 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.*; public class ClickHouseSinkStateTests { @Test - void SerializerTest() throws Exception { + void testSerializeAndDeserializePayload() throws Exception { byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}; ClickHousePayload clickHousePayload = new ClickHousePayload(data); @@ -26,4 +23,36 @@ void SerializerTest() throws Exception { Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength()); Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload()); } + + @Test + void testSerializeAndDeserializeEmptyPayload() throws Exception { + ClickHousePayload clickHousePayload = new ClickHousePayload(null); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer(); + serializer.serializeRequestToStream(clickHousePayload, dos); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis); + Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength()); + Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload()); + } + + @Test + void testDeserializePayloadWithUnsuportedVersion() throws IOException { + byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + DataOutputStream dataOutputStream = new DataOutputStream(baos); + int V2 = 2; + dataOutputStream.writeInt(V2); + dataOutputStream.writeInt(data.length); + dataOutputStream.write(data); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + + ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer(); + Exception exception = Assertions.assertThrows(IOException.class, () -> { + serializer.deserializeRequestFromStream(dataOutputStream.size(), dis); + }); + Assertions.assertEquals("Unsupported serialization version: 2", exception.getMessage()); + } } diff --git a/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java b/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java index 4e7e2f3..098ffa1 100644 --- a/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java +++ b/flink-connector-clickhouse-2.0.0/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSinkSerializer.java @@ -20,6 +20,7 @@ protected void serializeRequestToStream(ClickHousePayload clickHousePayload, Dat dataOutputStream.writeInt(bytes.length); dataOutputStream.write(bytes); } else { + dataOutputStream.writeInt(V1); dataOutputStream.writeInt(-1); } @@ -41,10 +42,10 @@ protected ClickHousePayload deserializeRequestFromStream(long requestSize, DataI if (version == V1) { return deserializeV1(dataInputStream); } else { - throw new IOException("Unsupported version " + version); + throw new IOException("Unsupported serialization version: " + version); } } else { - throw new IOException("Request size: " + requestSize); + throw new IOException("Invalid request size: " + requestSize); } } diff --git a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java index ba9c76a..35bdb5c 100644 --- a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java +++ b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkStateTests.java @@ -4,15 +4,12 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.*; public class ClickHouseSinkStateTests { @Test - void SerializerTest() throws Exception { + void testSerializeAndDeserializePayload() throws Exception { byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}; ClickHousePayload clickHousePayload = new ClickHousePayload(data); @@ -21,9 +18,40 @@ void SerializerTest() throws Exception { ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer(); serializer.serializeRequestToStream(clickHousePayload, dos); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis); + Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength()); + Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload()); + } + @Test + void testSerializeAndDeserializeEmptyPayload() throws Exception { + ClickHousePayload clickHousePayload = new ClickHousePayload(null); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer(); + serializer.serializeRequestToStream(clickHousePayload, dos); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); ClickHousePayload clickHousePayload1 = serializer.deserializeRequestFromStream(dos.size(), dis); Assertions.assertEquals(clickHousePayload.getPayloadLength(), clickHousePayload1.getPayloadLength()); Assertions.assertArrayEquals(clickHousePayload.getPayload(), clickHousePayload1.getPayload()); } + + @Test + void testDeserializePayloadWithUnsuportedVersion() throws IOException { + byte[] data = {'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + DataOutputStream dataOutputStream = new DataOutputStream(baos); + int V2 = 2; + dataOutputStream.writeInt(V2); + dataOutputStream.writeInt(data.length); + dataOutputStream.write(data); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + + ClickHouseAsyncSinkSerializer serializer = new ClickHouseAsyncSinkSerializer(); + Exception exception = Assertions.assertThrows(IOException.class, () -> { + serializer.deserializeRequestFromStream(dataOutputStream.size(), dis); + }); + Assertions.assertEquals("Unsupported serialization version: 2", exception.getMessage()); + } } diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java index 24f6f08..de6a7d3 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java @@ -15,7 +15,10 @@ public ClickHousePayload(byte[] payload) { this.payload = payload; } public byte[] getPayload() { return payload; } - public int getPayloadLength() { return payload.length; } + public int getPayloadLength() { + if (payload == null) return -1; + return payload.length; + } public int getAttemptCount() { return attemptCount; } public void incrementAttempts() { attemptCount++; } }