Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## 0.1.1
* Add support for Array, Map, Tuple #57
* Add support in FixedString, Date/Date32/DateTime/DateTime64, Uint8/16/32/64/128/256, Decimal, UUID
## 0.1.0
* ClickHouse Sink supports Apache Flink 1.17+
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ Planned for a future release — a complete end-to-end example will be added onc
| int/Integer | Enum16 | ✅ | Serialize.writeInt16 |
| java.util.UUID | UUID | ✅ | Serialize.writeIntUUID |
| String | JSON | ❌ | N/A |
| Array<Type> | Array<Type> | | N/A |
| Map<K,V> | Map<K,V> | | N/A |
| Tuple<Type,..> | Map<T1,T2,..> | ❌ | N/A |
| Array<Type> | Array<Type> | | Serialize.writeArray |
| Map<K,V> | Map<K,V> | | Serialize.writeMap |
| Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | Serialize.writeTuple |
| Object | Variant | ❌ | N/A |

* A ZoneId must also be provided when performing date operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ private String createSimplePOJOTableSQL(String database, String tableName) {
"v_dateTime DateTime," +
"v_dateTime64 DateTime64," +
"uuid UUID," +
"stringList Array(String)," +
"longList Array(Int64)," +
"mapOfStrings Map(String,String)," +
"tupleOfObjects Tuple(String,Int64,Boolean)," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive); ";
Expand Down Expand Up @@ -283,6 +287,7 @@ void SimplePOJODataTest() throws Exception {
simplePOJOs.sinkTo(simplePOJOSink);
int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS);
Assertions.assertEquals(EXPECTED_ROWS, rows);
// ClickHouseServerForTests.showData(tableName);
}

@Test
Expand Down Expand Up @@ -532,6 +537,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
simplePOJOs.sinkTo(simplePOJOSink);
int rows = executeAsyncJob(env, tableName, 100, EXPECTED_ROWS);
Assertions.assertEquals(EXPECTED_ROWS, rows);
// ClickHouseServerForTests.showData("simple_too_many_parts_pojo");
//ClickHouseServerForTests.executeSql(String.format("SYSTEM START MERGES `%s.%s`", getDatabase(), tableName));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.flink.connector.clickhouse.sink.convertor;

import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.utils.Serialize;
import org.apache.flink.connector.clickhouse.convertor.POJOConvertor;
Expand Down Expand Up @@ -66,5 +67,14 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);

Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid");

Serialize.writeArray(out, input.getStringList(), ClickHouseColumn.of("stringList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString())));

Serialize.writeArray(out, input.getLongList(), ClickHouseColumn.of("longList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.Int64.toString())));

Serialize.writeMap(out, input.getMapOfStrings(), ClickHouseColumn.of("mapOfStrings", ClickHouseDataType.Map, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.String.toString())));

Serialize.writeTuple(out, input.getTupleOfObjects(), ClickHouseColumn.of("tupleOfObjects", ClickHouseDataType.Tuple, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.Int64.toString()), ClickHouseColumn.of("", ClickHouseDataType.Bool.toString()) ));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.*;

public class SimplePOJO {

Expand Down Expand Up @@ -64,6 +64,13 @@ public class SimplePOJO {

private UUID uuid;

private List<String> stringList;
private List<Long> longList;

private Map<String, String> mapOfStrings;

private List<Object> tupleOfObjects;

public SimplePOJO(int index) {
this.bytePrimitive = Byte.MIN_VALUE;
this.byteObject = Byte.MAX_VALUE;
Expand Down Expand Up @@ -119,6 +126,27 @@ public SimplePOJO(int index) {
this.dateTime64 = LocalDateTime.now();

this.uuid = UUID.randomUUID();

this.stringList = new ArrayList<>();
this.stringList.add("a");
this.stringList.add("b");
this.stringList.add("c");
this.stringList.add("d");

this.longList = new ArrayList<>();
this.longList.add(1L);
this.longList.add(2L);
this.longList.add(3L);
this.longList.add(4L);

this.mapOfStrings = new HashMap<>();
this.mapOfStrings.put("a", "a");
this.mapOfStrings.put("b", "b");

this.tupleOfObjects = new ArrayList<>();
this.tupleOfObjects.add("test");
this.tupleOfObjects.add(1L);
this.tupleOfObjects.add(true);
}

public byte getBytePrimitive() {
Expand Down Expand Up @@ -221,4 +249,11 @@ public Double getDoubleObject() {

public UUID getUuid() { return uuid; }

public List<String> getStringList() { return stringList; }

public List<Long> getLongList() { return longList; }

public Map<String, String> getMapOfStrings() { return mapOfStrings; }

public List<Object> getTupleOfObjects() { return tupleOfObjects; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ public static void executeSql(String sql) throws ExecutionException, Interrupted
}
}

public static void showData(String tableName) throws ExecutionException, InterruptedException {
String showDataSql = String.format("select * from '%s'", tableName);
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
List<GenericRecord> content = client.queryAll(showDataSql);
for (GenericRecord record : content) {
System.out.println();
for (int i = 0; i< record.getSchema().getColumns().toArray().length; i++) {
System.out.print(" | " + record.getObject(i +1));
}
}
}

public static int countParts(String table) {
String countPartsSql = String.format("SELECT count(*) FROM system.parts WHERE table = '%s' and active = 1", table);
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,60 @@ public class ClickHouseSinkTests extends FlinkClusterTests {

static final int STREAM_PARALLELISM = 5;

private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
String createTable = createSimplePOJOTableSQL(database, tableName);
return createTable.trim().substring(0, createTable.trim().length() - 1) + " " + String.format("SETTINGS parts_to_throw_insert = %d;", parts_to_throw_insert);
}

private String createSimplePOJOTableSQL(String database, String tableName) {
return "CREATE TABLE `" + database + "`.`" + tableName + "` (" +
"bytePrimitive Int8," +
"byteObject Int8," +
"shortPrimitive Int16," +
"shortObject Int16," +
"intPrimitive Int32," +
"integerObject Int32," +
"longPrimitive Int64," +
"longObject Int64," +
"bigInteger128 Int128," +
"bigInteger256 Int256," +
"uint8Primitive UInt8," +
"uint8Object UInt8," +
"uint16Primitive UInt16," +
"uint16Object UInt16," +
"uint32Primitive UInt32," +
"uint32Object UInt32," +
"uint64Primitive UInt64," +
"uint64Object UInt64," +
"uint128Object UInt128," +
"uint256Object UInt256," +
"decimal Decimal(10,5)," +
"decimal32 Decimal32(9)," +
"decimal64 Decimal64(18)," +
"decimal128 Decimal128(38)," +
"decimal256 Decimal256(76)," +
"floatPrimitive Float," +
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
"booleanPrimitive Boolean," +
"booleanObject Boolean," +
"str String," +
"fixedStr FixedString(10)," +
"v_date Date," +
"v_date32 Date32," +
"v_dateTime DateTime," +
"v_dateTime64 DateTime64," +
"uuid UUID," +
"stringList Array(String)," +
"longList Array(Int64)," +
"mapOfStrings Map(String,String)," +
"tupleOfObjects Tuple(String,Int64,Boolean)," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive); ";
}

private int executeAsyncJob(StreamExecutionEnvironment env, String tableName, int numIterations, int expectedRows) throws Exception {
JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource");
int rows = 0;
Expand Down Expand Up @@ -197,22 +251,7 @@ void SimplePOJODataTest() throws Exception {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
ClickHouseServerForTests.executeSql(dropTable);
// create table
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
"bytePrimitive Int8," +
"byteObject Int8," +
"shortPrimitive Int16," +
"shortObject Int16," +
"intPrimitive Int32," +
"integerObject Int32," +
"longPrimitive Int64," +
"longObject Int64," +
"floatPrimitive Float," +
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive); ";
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName);
ClickHouseServerForTests.executeSql(tableSql);


Expand Down Expand Up @@ -458,23 +497,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
ClickHouseServerForTests.executeSql(dropTable);
// create table
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
"bytePrimitive Int8," +
"byteObject Int8," +
"shortPrimitive Int16," +
"shortObject Int16," +
"intPrimitive Int32," +
"integerObject Int32," +
"longPrimitive Int64," +
"longObject Int64," +
"floatPrimitive Float," +
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive) " +
"SETTINGS parts_to_throw_insert = 10;";
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName, 10);
ClickHouseServerForTests.executeSql(tableSql);
//ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.flink.connector.clickhouse.sink.convertor;

import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.utils.Serialize;
import org.apache.flink.connector.clickhouse.convertor.POJOConvertor;
Expand All @@ -23,10 +24,57 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
Serialize.writeInt64(out, input.getLongPrimitive(), false, false, ClickHouseDataType.Int64, false, "longPrimitive");
Serialize.writeInt64(out, input.getLongObject(), false, false, ClickHouseDataType.Int64, false, "longObject");

Serialize.writeInt128(out, input.getBigInteger128(), false, false, ClickHouseDataType.Int128, false, "bigInteger128");
Serialize.writeInt256(out, input.getBigInteger256(), false, false, ClickHouseDataType.Int256, false, "bigInteger256");

// UIntX
Serialize.writeUInt8(out, input.getUint8Primitive(), false, false, ClickHouseDataType.UInt8, false, "uint8Primitive");
Serialize.writeUInt8(out, input.getUint8Object(), false, false, ClickHouseDataType.UInt8, false, "uint8Object");

Serialize.writeUInt16(out, input.getUint16Primitive(), false, false, ClickHouseDataType.UInt16, false, "uint8Primitive");
Serialize.writeUInt16(out, input.getUint16Object(), false, false, ClickHouseDataType.UInt16, false, "uint8Object");

Serialize.writeUInt32(out, input.getUint32Primitive(), false, false, ClickHouseDataType.UInt32, false, "uint8Primitive");
Serialize.writeUInt32(out, input.getUint32Object(), false, false, ClickHouseDataType.UInt32, false, "uint8Object");

Serialize.writeUInt64(out, input.getUint64Primitive(), false, false, ClickHouseDataType.UInt64, false, "uint8Primitive");
Serialize.writeUInt64(out, input.getUint64Object(), false, false, ClickHouseDataType.UInt64, false, "uint8Object");
Comment on lines +34 to +41
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a copy-paste error in the field names for UInt16, UInt32, and UInt64 serialization. They're all using 'uint8Primitive' and 'uint8Object' instead of their respective field names.

For example:

Suggested change
Serialize.writeUInt16(out, input.getUint16Primitive(), false, false, ClickHouseDataType.UInt16, false, "uint8Primitive");
Serialize.writeUInt16(out, input.getUint16Object(), false, false, ClickHouseDataType.UInt16, false, "uint8Object");
Serialize.writeUInt32(out, input.getUint32Primitive(), false, false, ClickHouseDataType.UInt32, false, "uint8Primitive");
Serialize.writeUInt32(out, input.getUint32Object(), false, false, ClickHouseDataType.UInt32, false, "uint8Object");
Serialize.writeUInt64(out, input.getUint64Primitive(), false, false, ClickHouseDataType.UInt64, false, "uint8Primitive");
Serialize.writeUInt64(out, input.getUint64Object(), false, false, ClickHouseDataType.UInt64, false, "uint8Object");
Serialize.writeUInt16(out, input.getUint16Primitive(), false, false, ClickHouseDataType.UInt16, false, "uint16Primitive");
Serialize.writeUInt16(out, input.getUint16Object(), false, false, ClickHouseDataType.UInt16, false, "uint16Object");

Similar corrections are needed for UInt32 and UInt64 calls.


Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "bigInteger128");
Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "bigInteger256");
Comment on lines +43 to +44
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field names for UInt128 and UInt256 serialization are incorrect. They're using 'bigInteger128' and 'bigInteger256' which are likely the field names for the signed integer types.

Suggested change
Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "bigInteger128");
Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "bigInteger256");
Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "uint128Object");
Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "uint256Object");


Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal, false, "decimal", 10, 5);
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal32, false, "decimal32", 9, 1);
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal64, false, "decimal64", 18, 10);
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal128, false, "decimal128", 38, 19);
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal256, false, "decimal256", 76, 39);

Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive");
Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject");

Serialize.writeFloat64(out, input.getDoublePrimitive(), false, false, ClickHouseDataType.Float64, false, "doublePrimitive");
Serialize.writeFloat64(out, input.getDoubleObject(), false, false, ClickHouseDataType.Float64, false, "doubleObject");

Serialize.writeBoolean(out, input.isBooleanPrimitive(), false, false, ClickHouseDataType.Bool, false, "booleanPrimitive");
Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject");

Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "str");
Serialize.writeFixedString(out, input.getFixedStr(), false, false, ClickHouseDataType.FixedString, false, "fixedStr", 10);

Serialize.writeDate(out, input.getDate(), false, false, ClickHouseDataType.Date, false, "v_date");
Serialize.writeDate32(out, input.getDate32(), false, false, ClickHouseDataType.Date32, false, "v_date32");
Serialize.writeTimeDate(out, input.getDateTime(), false, false, ClickHouseDataType.DateTime, false, "v_dateTime");
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);

Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid");

Serialize.writeArray(out, input.getStringList(), ClickHouseColumn.of("stringList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString())));

Serialize.writeArray(out, input.getLongList(), ClickHouseColumn.of("longList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.Int64.toString())));

Serialize.writeMap(out, input.getMapOfStrings(), ClickHouseColumn.of("mapOfStrings", ClickHouseDataType.Map, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.String.toString())));

Serialize.writeTuple(out, input.getTupleOfObjects(), ClickHouseColumn.of("tupleOfObjects", ClickHouseDataType.Tuple, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.Int64.toString()), ClickHouseColumn.of("", ClickHouseDataType.Bool.toString()) ));
}

}
Loading
Loading