diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d67b8c0..44b3684 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,11 +1,11 @@ -name: Cron Datax test +name: TEST CI on: pull_request: branches: [main] jobs: - build-datax: + build-connect: runs-on: [self-hosted, X64, Linux, 8c16g] steps: - name: Checkout diff --git a/pom.xml b/pom.xml index 0bb9897..caa92a0 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,8 @@ + 1.8 + 1.8 10.14.2.0 2.4 0.11.1 @@ -64,6 +66,26 @@ 1.17.3 0.1.2 + + + confluent + Confluent + https://packages.confluent.io/maven/ + + + + cloudera-repo + + https://repository.cloudera.com/content/repositories/releases/ + + + true + + + true + + + @@ -271,7 +293,103 @@ com.databend databend-jdbc - 0.1.2 + 0.2.7 + + + org.apache.avro + avro + 1.11.3 + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + org.apache.commons + commons-compress + + + org.slf4j + slf4j-api + + + + + io.confluent + kafka-avro-serializer + 7.2.1 + + + io.confluent + common-utils + + + + + + io.confluent + kafka-connect-avro-converter + 7.2.1 + + + com.google.guava + guava + 32.0.1-jre + + + com.google.guava + failureaccess + 1.0 + + + io.confluent + kafka-schema-registry-client + 7.2.1 + + + io.confluent + common-utils + + + io.confluent + common-config + + + io.swagger + swagger-annotations + + + io.swagger + swagger-core + + + + + org.apache.avro + avro + 1.11.3 + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + org.apache.commons + commons-compress + + + org.slf4j + slf4j-api + + @@ -280,14 +398,10 @@ org.apache.maven.plugins maven-compiler-plugin - true + 3.7.0 - - -Xlint:all - -Werror - - 8 - 8 + 1.8 + 1.8 @@ -418,16 +532,16 @@ - - org.apache.maven.plugins - maven-release-plugin - 3.0.1 - - true - false - v@{project.version} - - + + + + + + + + + + diff --git a/src/main/java/com/databend/kafka/connect/databendclient/TableDefinitions.java b/src/main/java/com/databend/kafka/connect/databendclient/TableDefinitions.java index 3888b42..0e063a1 100644 --- a/src/main/java/com/databend/kafka/connect/databendclient/TableDefinitions.java +++ b/src/main/java/com/databend/kafka/connect/databendclient/TableDefinitions.java @@ -41,7 +41,9 @@ public TableDefinition get( ) throws SQLException { TableDefinition dbTable = cache.get(tableId); if (dbTable == null) { + log.info("Table {} not found in cache; checking database", tableId); if (dialect.tableExists(connection, tableId)) { + log.info("Table {} exists in database", tableId); dbTable = dialect.describeTable(connection, tableId); if (dbTable != null) { log.info("Setting metadata for table {} to {}", tableId, dbTable); diff --git a/src/main/java/com/databend/kafka/connect/sink/BufferedRecords.java b/src/main/java/com/databend/kafka/connect/sink/BufferedRecords.java index 126dca7..6e0a576 100644 --- a/src/main/java/com/databend/kafka/connect/sink/BufferedRecords.java +++ b/src/main/java/com/databend/kafka/connect/sink/BufferedRecords.java @@ -59,6 +59,7 @@ public BufferedRecords( } public List add(SinkRecord record) throws SQLException, TableAlterOrCreateException { + log.info("Adding record to buffer: {}", record); recordValidator.validate(record); final List flushed = new ArrayList<>(); diff --git a/src/main/java/com/databend/kafka/connect/sink/DatabendClient.java b/src/main/java/com/databend/kafka/connect/sink/DatabendClient.java index bf772ad..911791d 100644 --- a/src/main/java/com/databend/kafka/connect/sink/DatabendClient.java +++ b/src/main/java/com/databend/kafka/connect/sink/DatabendClient.java @@ -439,6 +439,7 @@ public boolean tableExists( String[] tableTypes = tableTypes(metadata, this.tableTypes); String tableTypeDisplay = displayableTableTypes(tableTypes, "/"); glog.info("Checking {} dialect for existence of {} {}", this, tableTypeDisplay, tableId); + glog.info("catalogName is {}, schemaName is {}, tableName is {}", tableId.catalogName(),tableId.schemaName(), tableId.tableName()); try (ResultSet rs = connection.getMetaData().getTables( tableId.catalogName(), tableId.schemaName(), @@ -1399,7 +1400,7 @@ public String buildCreateTableStatement( SQLExpressionBuilder builder = expressionBuilder(); final List pkFieldNames = extractPrimaryKeyFieldNames(fields); - builder.append("CREATE TABLE "); + builder.append("CREATE TABLE IF NOT EXISTS "); builder.append(table); builder.append(" ("); writeColumnsSpec(builder, fields); diff --git a/src/main/java/com/databend/kafka/connect/sink/DatabendSinkTask.java b/src/main/java/com/databend/kafka/connect/sink/DatabendSinkTask.java index fe5c909..1013b56 100644 --- a/src/main/java/com/databend/kafka/connect/sink/DatabendSinkTask.java +++ b/src/main/java/com/databend/kafka/connect/sink/DatabendSinkTask.java @@ -28,6 +28,9 @@ public class DatabendSinkTask extends SinkTask { boolean shouldTrimSensitiveLogs; + public DatabendSinkTask() { + } + @Override public void start(final Map props) { log.info("Starting Databend Sink task"); @@ -53,19 +56,41 @@ void initWriter() { log.info("Databend writer initialized"); } + @Override + public void open(final Collection partitions) { + // This method is called when the task's assigned partitions are changed. + // You can initialize resources related to the assigned partitions here. + // For now, we are just logging the assigned partitions. + + log.info("Opening Databend Sink task for the following partitions:"); + for (TopicPartition partition : partitions) { + log.info("Partition: {}", partition); + } + } + + @Override + public Map preCommit(Map offsets) throws RetriableException { + // You can add any processing you need to do before committing the offsets here. + // For now, we are just returning the offsets as is. + return offsets; + } + @Override public void put(Collection records) { + log.info("###: {}", records); + log.info("Received {} records", records.size()); if (records.isEmpty()) { return; } final SinkRecord first = records.iterator().next(); final int recordsCount = records.size(); - log.debug( + log.info( "Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the " + "database...", recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset() ); try { + log.info("Writing {} records", records.size()); writer.write(records); } catch (TableAlterOrCreateException tace) { if (reporter != null) { diff --git a/src/main/java/com/databend/kafka/connect/sink/DatabendWriter.java b/src/main/java/com/databend/kafka/connect/sink/DatabendWriter.java index 2572ab9..fd0d0e3 100644 --- a/src/main/java/com/databend/kafka/connect/sink/DatabendWriter.java +++ b/src/main/java/com/databend/kafka/connect/sink/DatabendWriter.java @@ -46,9 +46,17 @@ protected void onConnect(final Connection connection) throws SQLException { void write(final Collection records) throws SQLException, TableAlterOrCreateException { final Connection connection = cachedConnectionProvider.getConnection(); + log.info("DatabendWriter Writing {} records", records.size()); + log.info("DatabendWriter Writing records is: {}", records); try { final Map bufferByTable = new HashMap<>(); for (SinkRecord record : records) { + log.info("DatabendWriter Writing record keySchema is: {}", record.keySchema()); + log.info("DatabendWriter Writing record valueSchema is: {}", record.valueSchema().fields()); + log.info("DatabendWriter Writing record key is: {}", record.key()); + log.info("DatabendWriter Writing record value is: {}", record.value()); + log.info("DatabendWriter Writing record topic is: {}", record.topic()); + log.info("DatabendWriter Writing record timestamp is: {}", record.timestamp()); final TableIdentity tableId = destinationTable(record.topic()); BufferedRecords buffer = bufferByTable.get(tableId); if (buffer == null) { @@ -64,7 +72,7 @@ void write(final Collection records) buffer.flush(); buffer.close(); } - connection.commit(); +// connection.commit(); } catch (SQLException | TableAlterOrCreateException e) { // e.addSuppressed(e); throw e; diff --git a/src/main/java/com/databend/kafka/connect/sink/DbStructure.java b/src/main/java/com/databend/kafka/connect/sink/DbStructure.java index 6f8e003..8902770 100644 --- a/src/main/java/com/databend/kafka/connect/sink/DbStructure.java +++ b/src/main/java/com/databend/kafka/connect/sink/DbStructure.java @@ -111,6 +111,8 @@ boolean amendIfNecessary( final int maxRetries ) throws SQLException, TableAlterOrCreateException { final TableDefinition tableDefn = tableDefns.get(connection, tableId); + log.info("tableDefn: {}", tableDefn); + log.info("Amending table {} with fieldsMetadata: {}", tableId, fieldsMetadata); final Set missingFields = missingFields( fieldsMetadata.allFields.values(), diff --git a/src/main/java/com/databend/kafka/connect/sink/metadata/FieldsMetadata.java b/src/main/java/com/databend/kafka/connect/sink/metadata/FieldsMetadata.java index e4bd898..dd2588a 100644 --- a/src/main/java/com/databend/kafka/connect/sink/metadata/FieldsMetadata.java +++ b/src/main/java/com/databend/kafka/connect/sink/metadata/FieldsMetadata.java @@ -4,6 +4,8 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; @@ -12,6 +14,7 @@ public class FieldsMetadata { public final Set keyFieldNames; public final Set nonKeyFieldNames; public final Map allFields; + private static final Logger LOGGER = LoggerFactory.getLogger(DatabendSinkConfig.class); // visible for testing public FieldsMetadata( @@ -86,6 +89,7 @@ public static FieldsMetadata extract( } final Set nonKeyFieldNames = new LinkedHashSet<>(); + LOGGER.info("@@Value schema is: {}", valueSchema); if (valueSchema != null) { for (Field field : valueSchema.fields()) { if (keyFieldNames.contains(field.name())) { @@ -246,6 +250,9 @@ private static void extractRecordValuePk( DatabendSinkConfig.PrimaryKeyMode.RECORD_VALUE) ); } + + LOGGER.info("Value schema is: {}", valueSchema.toString()); + LOGGER.info("Value fields are: {}", valueSchema.fields()); if (configuredPkFields.isEmpty()) { for (Field keyField : valueSchema.fields()) { keyFieldNames.add(keyField.name()); diff --git a/src/main/java/com/databend/kafka/connect/sink/records/AvroConverterConfig.java b/src/main/java/com/databend/kafka/connect/sink/records/AvroConverterConfig.java new file mode 100644 index 0000000..9fb8f54 --- /dev/null +++ b/src/main/java/com/databend/kafka/connect/sink/records/AvroConverterConfig.java @@ -0,0 +1,10 @@ +package com.databend.kafka.connect.sink.records; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import java.util.Map; + +class AvroConverterConfig extends AbstractKafkaAvroSerDeConfig { + AvroConverterConfig(final Map props) { + super(baseConfigDef(), props); + } +} diff --git a/src/main/java/com/databend/kafka/connect/sink/records/DatabendAvroConverter.java b/src/main/java/com/databend/kafka/connect/sink/records/DatabendAvroConverter.java new file mode 100644 index 0000000..024e59b --- /dev/null +++ b/src/main/java/com/databend/kafka/connect/sink/records/DatabendAvroConverter.java @@ -0,0 +1,196 @@ +package com.databend.kafka.connect.sink.records; + +import com.databend.jdbc.com.fasterxml.jackson.databind.JsonNode; +import com.databend.kafka.connect.sink.DatabendSinkConfig; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Map; + +import org.apache.avro.Conversions; +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DatabendAvroConverter extends DatabendConverter { + private SchemaRegistryClient schemaRegistry = null; + private static final Logger LOGGER = LoggerFactory.getLogger(DatabendSinkConfig.class); + public static final String BREAK_ON_SCHEMA_REGISTRY_ERROR = "break.on.schema.registry.error"; + public static final String READER_SCHEMA = "reader.schema"; + + // By default, we don't break when schema registry is not found + private boolean breakOnSchemaRegistryError = false; + /* By default, no reader schema is set. In this case, the writer schema of each item is also used + as the reader schema. See https://avro.apache.org/docs/1.9.2/spec.html#Schema+Resolution */ + private Schema readerSchema = null; + + @Override + public void configure(final Map configs, final boolean isKey) { + readBreakOnSchemaRegistryError(configs); + parseReaderSchema(configs); + try { + AvroConverterConfig avroConverterConfig = new AvroConverterConfig(configs); + schemaRegistry = + new CachedSchemaRegistryClient( + avroConverterConfig.getSchemaRegistryUrls(), + avroConverterConfig.getMaxSchemasPerSubject(), + configs); + } catch (Exception e) { + throw e; + } + } + + void readBreakOnSchemaRegistryError(final Map configs) { + try { + Object shouldBreak = configs.get(BREAK_ON_SCHEMA_REGISTRY_ERROR); + if (shouldBreak instanceof String) { + breakOnSchemaRegistryError = ((String) shouldBreak).toLowerCase().equals("true"); + } + } catch (Exception e) { + // do nothing + } + } + + /** + * Parse reader schema from config if provided + * + * @param configs configuration for converter + */ + void parseReaderSchema(final Map configs) { + Object readerSchemaFromConfig = configs.get(READER_SCHEMA); + + if (readerSchemaFromConfig == null) { + return; + } + + if (readerSchemaFromConfig instanceof String) { + try { + readerSchema = new Schema.Parser().parse((String) readerSchemaFromConfig); + } catch (SchemaParseException e) { + LOGGER.error( + "the string provided for reader.schema is no valid Avro schema: " + e.getMessage()); + throw e; + } + } else { + LOGGER.error("reader.schema has to be a string"); + } + } + + // for testing only + boolean getBreakOnSchemaRegistryError() { + return breakOnSchemaRegistryError; + } + + /** + * set a schema registry client for test use only + * + * @param schemaRegistryClient mock schema registry client + */ + void setSchemaRegistry(SchemaRegistryClient schemaRegistryClient) { + this.schemaRegistry = schemaRegistryClient; + } + public String bytesToString(byte[] bytes) { + return new String(bytes, StandardCharsets.ISO_8859_1); + } + /** + * cast bytes array to JsonNode array + * + * @param s topic, unused + * @param bytes input bytes array + * @return JsonNode array + */ + @Override + public SchemaAndValue toConnectData(final String s, final byte[] bytes) { + if (bytes == null) { + return new SchemaAndValue(new DatabendJsonSchema(), new DatabendRecordContent()); + } + ByteBuffer buffer; + int id; + try { + buffer = ByteBuffer.wrap(bytes); + if (buffer.get() != 0) { + throw new IOException("Unknown magic byte!"); + } + id = buffer.getInt(); + } catch (Exception e) { + return logErrorAndReturnBrokenRecord(e, bytes); + } + LOGGER.info("schema sjh: {}", s); + LOGGER.info("schema bytes str: {}", bytesToString(bytes)); + + // If there is any error while getting writer schema from schema registry, + // throw error and break the connector + Schema writerSchema; + try { + writerSchema = schemaRegistry.getById(id); + LOGGER.info("schema: {}", writerSchema.getFields()); + } catch (Exception e) { + return logErrorAndReturnBrokenRecord(e, bytes); + } + + try { + int length = buffer.limit() - 1 - 4; + byte[] data = new byte[length]; + buffer.get(data, 0, length); + + return new SchemaAndValue( + new DatabendJsonSchema(), + new DatabendRecordContent( + parseAvroWithSchema( + data, writerSchema, readerSchema == null ? writerSchema : readerSchema), + id)); + } catch (Exception e) { + + return logErrorAndReturnBrokenRecord(e, bytes); + } + } + + private SchemaAndValue logErrorAndReturnBrokenRecord(final Exception e, final byte[] bytes) { + + LOGGER.error("failed to parse AVRO record\n" + e.getMessage()); + return new SchemaAndValue(new DatabendJsonSchema(), new DatabendRecordContent(bytes)); + } + + /** + * Parse Avro record with a writer schema and a reader schema. The writer and the reader schema + * have to be compatible as described in + * https://avro.apache.org/docs/1.9.2/spec.html#Schema+Resolution + * + * @param data avro data + * @param writerSchema avro schema with which data got serialized + * @param readerSchema avro schema that describes the shape of the returned JsonNode + * @return JsonNode array + */ + private JsonNode parseAvroWithSchema(final byte[] data, Schema writerSchema, Schema readerSchema) + throws IOException { + final GenericData genericData = new GenericData(); + // Conversion for logical type Decimal. There are conversions for other logical types as well. + genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + + InputStream is = new ByteArrayInputStream(data); + Decoder decoder = DecoderFactory.get().binaryDecoder(is, null); + DatumReader reader = + new GenericDatumReader<>(writerSchema, readerSchema, genericData); + GenericRecord datum = reader.read(null, decoder); + // For byte data without logical type, this toString method handles it this way: + // writeEscapedString(StandardCharsets.ISO_8859_1.decode(bytes), buffer); + // The generated string is escaped ISO_8859_1 decoded string. + LOGGER.info("datum string: {}", datum.toString()); // parse ok + return mapper.readTree(datum.toString()); + } +} + diff --git a/src/main/java/com/databend/kafka/connect/sink/records/DatabendConverter.java b/src/main/java/com/databend/kafka/connect/sink/records/DatabendConverter.java new file mode 100644 index 0000000..62f8595 --- /dev/null +++ b/src/main/java/com/databend/kafka/connect/sink/records/DatabendConverter.java @@ -0,0 +1,36 @@ +package com.databend.kafka.connect.sink.records; + +import com.databend.jdbc.com.fasterxml.jackson.databind.ObjectMapper; +import com.databend.kafka.connect.sink.DatabendSinkConfig; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.storage.Converter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Databend Converter + */ +public abstract class DatabendConverter implements Converter { + + private static final Logger log = LoggerFactory.getLogger(DatabendSinkConfig.class); + final ObjectMapper mapper = new ObjectMapper(); + + /** + * unused + */ + @Override + public void configure(final Map map, final boolean b) { + // not necessary + } + + /** + * doesn't support data source connector + */ + @Override + public byte[] fromConnectData(final String s, final Schema schema, final Object o) { + throw new UnsupportedOperationException("DatabendConverter doesn't support data source connector"); + } +} + diff --git a/src/main/java/com/databend/kafka/connect/sink/records/DatabendJsonSchema.java b/src/main/java/com/databend/kafka/connect/sink/records/DatabendJsonSchema.java new file mode 100644 index 0000000..93ddd2d --- /dev/null +++ b/src/main/java/com/databend/kafka/connect/sink/records/DatabendJsonSchema.java @@ -0,0 +1,76 @@ +package com.databend.kafka.connect.sink.records; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +/** databend json schema */ +public class DatabendJsonSchema implements Schema { + static String NAME = "DATABEND_JSON_SCHEMA"; + static int VERSION = 1; + + @Override + public Type type() { + return Type.STRUCT; + } + + @Override + public boolean isOptional() { + return false; + } + + @Override + public Object defaultValue() { + return null; + } + + @Override + public String name() { + return NAME; + } + + @Override + public Integer version() { + return VERSION; + } + + @Override + public String doc() { + return null; + } + + @Override + public Map parameters() { + return null; + } + + @Override + public Schema keySchema() { + // Create and return a Schema representing a string type + return null; + } + + @Override + public Schema valueSchema() { + // Create and return a Schema representing a string type + return null; + } + + @Override + public List fields() { + return null; + } + + @Override + public Field field(final String s) { + return null; + } + + @Override + public Schema schema() { + return null; + } +} + diff --git a/src/main/java/com/databend/kafka/connect/sink/records/DatabendRecordContent.java b/src/main/java/com/databend/kafka/connect/sink/records/DatabendRecordContent.java new file mode 100644 index 0000000..68e5bf6 --- /dev/null +++ b/src/main/java/com/databend/kafka/connect/sink/records/DatabendRecordContent.java @@ -0,0 +1,149 @@ +package com.databend.kafka.connect.sink.records; + +import com.databend.jdbc.com.fasterxml.jackson.databind.JsonNode; +import com.databend.jdbc.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.data.Schema; + +public class DatabendRecordContent { + + private static ObjectMapper MAPPER = new ObjectMapper(); + public static int NON_AVRO_SCHEMA = -1; + private final JsonNode[] content; + private final byte[] brokenData; + private int schemaID; + private boolean isBroken; + + // We have to introduce this field so as to distinguish a null value record from a record whose + // actual contents are an empty json node. + // This is only set inside a constructor which is called when a byte value found in the record is + // null. + private boolean isNullValueRecord; + + /** + * Constructor for null value. + * + *

If we change this logic in future, we need to carefully modify how we handle tombstone + * records. + * + *

@see SnowflakeSinkServiceV1#shouldSkipNullValue(SinkRecord) + */ + public DatabendRecordContent() { + content = new JsonNode[1]; + content[0] = MAPPER.createObjectNode(); + brokenData = null; + isNullValueRecord = true; + } + +// /** +// * constructor for native json converter +// * +// * @param schema schema of the object +// * @param data object produced by native avro/json converters +// * @param isStreaming indicates whether this is part of snowpipe streaming +// */ +// public DatabendRecordContent(Schema schema, Object data, boolean isStreaming) { +// this.content = new JsonNode[1]; +// this.schemaID = NON_AVRO_SCHEMA; +// this.content[0] = RecordService.convertToJson(schema, data, isStreaming); +// this.isBroken = false; +// this.brokenData = null; +// } + + /** + * constructor for json converter + * + * @param data json node + */ + public DatabendRecordContent(JsonNode data) { + this.content = new JsonNode[1]; + this.content[0] = data; + this.isBroken = false; + this.schemaID = NON_AVRO_SCHEMA; + this.brokenData = null; + } + + /** + * constructor for avro converter without schema registry + * + * @param data json node array + */ + DatabendRecordContent(JsonNode[] data) { + this.content = data; + this.isBroken = false; + this.schemaID = NON_AVRO_SCHEMA; + this.brokenData = null; + } + + /** + * constructor for broken record + * + * @param data broken record + */ + public DatabendRecordContent(byte[] data) { + this.brokenData = data; + this.isBroken = true; + this.schemaID = NON_AVRO_SCHEMA; + this.content = null; + } + + /** + * constructor for avro converter + * + * @param data json node + * @param schemaID schema id + */ + DatabendRecordContent(JsonNode data, int schemaID) { + this(data); + this.schemaID = schemaID; + } + + /** + * @return true is record is broken + */ + public boolean isBroken() { + return this.isBroken; + } + + /** + * @return bytes array represents broken data + */ + public byte[] getBrokenData() { + if (!isBroken) { + throw new IllegalStateException("Record is not broken"); + } + assert this.brokenData != null; + return this.brokenData.clone(); + } + + /** + * @return schema id, -1 if not available + */ + int getSchemaID() { + return schemaID; + } + + public JsonNode[] getData() { + if (isBroken) { + System.out.println("ERROR_5011"); + } + assert content != null; + return content.clone(); + } + + /** + * Check if primary reason for this record content's value to be an empty json String, a null + * value? + * + *

i.e if value passed in by record is empty json node (`{}`), we don't interpret this as null + * value. + * + * @return true if content value is empty json node as well as isNullValueRecord is set to true. + */ + public boolean isRecordContentValueNull() { + if (content != null && content[0].isEmpty() && isNullValueRecord) { + return true; + } + return false; + } +} + diff --git a/target/checkstyle-cachefile b/target/checkstyle-cachefile deleted file mode 100644 index a94c452..0000000 --- a/target/checkstyle-cachefile +++ /dev/null @@ -1,4 +0,0 @@ -#Mon Aug 14 14:38:12 CST 2023 -configuration*?=15CB8BB226DF1A8F9AAD0E6A6C0B87E4D56E96A5 -module-resource*?\:checkstyle-xpath-suppressions.xml=CC80A51A485F793BBEE490A9FEC16EA43A8A7FFE -module-resource*?\:checkstyle-suppressions.xml=D02C429C31A78E743A32C7278B64010BF2033C76 diff --git a/target/checkstyle-checker.xml b/target/checkstyle-checker.xml deleted file mode 100644 index eed537b..0000000 --- a/target/checkstyle-checker.xml +++ /dev/null @@ -1,198 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/target/checkstyle-result.xml b/target/checkstyle-result.xml deleted file mode 100644 index b8ea60d..0000000 --- a/target/checkstyle-result.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - - - - - - - - - - - - - - diff --git a/target/checkstyle-suppressions.xml b/target/checkstyle-suppressions.xml deleted file mode 100644 index 18eec8e..0000000 --- a/target/checkstyle-suppressions.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - - - - - - - - - - - - - - - - diff --git a/target/components/component-package.xml b/target/components/component-package.xml index 77d1e69..0f3aa3f 100644 --- a/target/components/component-package.xml +++ b/target/components/component-package.xml @@ -1,19 +1,12 @@ - component - - dir - zip - - true - /Users/hanshanjie/git-works/databend-kafka-connect @@ -55,7 +48,6 @@ * - @@ -73,5 +65,4 @@ - diff --git a/target/components/packages/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/manifest.json b/target/components/packages/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/manifest.json index 3feb088..6a48c1f 100644 --- a/target/components/packages/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/manifest.json +++ b/target/components/packages/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/manifest.json @@ -31,5 +31,5 @@ "url" : "https://www.apache.org/licenses/LICENSE-2.0" } ], "component_types" : [ "sink" ], - "release_date" : "2023-09-07" + "release_date" : "2024-07-13" } \ No newline at end of file diff --git a/target/manifest.json b/target/manifest.json index 3feb088..6a48c1f 100644 --- a/target/manifest.json +++ b/target/manifest.json @@ -31,5 +31,5 @@ "url" : "https://www.apache.org/licenses/LICENSE-2.0" } ], "component_types" : [ "sink" ], - "release_date" : "2023-09-07" + "release_date" : "2024-07-13" } \ No newline at end of file diff --git a/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst index 65b0855..b26aec9 100644 --- a/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst +++ b/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -1 +1,61 @@ +com/databend/kafka/connect/util/StringUtils.class +com/databend/kafka/connect/databendclient/DatabendTypes.class +com/databend/kafka/connect/sink/PreparedStatementBinder$1.class +com/databend/kafka/connect/databendclient/DatabendConnection.class +com/databend/kafka/connect/util/DateTimeUtils.class +com/databend/kafka/connect/sink/DatabendSinkTask.class +com/databend/kafka/connect/databendclient/TableIdentity.class +com/databend/kafka/connect/util/BytesUtil.class +com/databend/kafka/connect/sink/records/DatabendJsonSchema.class +com/databend/kafka/connect/util/TimeZoneValidator.class +com/databend/kafka/connect/sink/records/DatabendConverter.class +com/databend/kafka/connect/sink/DatabendSinkConfig$EnumValidator.class +com/databend/kafka/connect/databendclient/ColumnDefinition$Mutability.class +com/databend/kafka/connect/sink/DatabendClient.class +com/databend/kafka/connect/sink/PreparedStatementBinder.class +com/databend/kafka/connect/sink/metadata/FieldsMetadata$1.class +com/databend/kafka/connect/util/DeleteEnabledRecommender.class +com/databend/kafka/connect/util/StringUtils$1.class +com/databend/kafka/connect/sink/DatabendWriter$1.class +com/databend/kafka/connect/sink/metadata/SinkRecordField.class +com/databend/kafka/connect/sink/TimestampIncrementingCriteria$CriteriaValues.class +com/databend/kafka/connect/databendclient/ConnectionProvider.class +com/databend/kafka/connect/sink/records/AvroConverterConfig.class +com/databend/kafka/connect/databendclient/ColumnDefinition$Nullability.class +com/databend/kafka/connect/databendclient/DatabendConnection$ColumnConverter.class +com/databend/kafka/connect/databendclient/SQLExpressionBuilder$BasicListBuilder.class +com/databend/kafka/connect/sink/DatabendWriter.class +com/databend/kafka/connect/databendclient/ColumnMapping.class +com/databend/kafka/connect/databendclient/DropOptions.class +com/databend/kafka/connect/databendclient/SQLExpressionBuilder$1.class +com/databend/kafka/connect/sink/DatabendSinkConfig$InsertMode.class +com/databend/kafka/connect/sink/records/DatabendAvroConverter.class +com/databend/kafka/connect/DatabendSinkConnector.class +com/databend/kafka/connect/util/QuoteWay.class +com/databend/kafka/connect/sink/RecordValidator$1.class +com/databend/kafka/connect/databendclient/SQLExpressionBuilder$ListBuilder.class +com/databend/kafka/connect/sink/BufferedRecords$1.class +com/databend/kafka/connect/databendclient/TableDefinitions.class +com/databend/kafka/connect/databendclient/TableType.class +com/databend/kafka/connect/sink/BufferedRecords.class +com/databend/kafka/connect/databendclient/SQLExpressionBuilder$Transform.class +com/databend/kafka/connect/util/IdentifierRules.class +com/databend/kafka/connect/sink/metadata/FieldsMetadata.class +com/databend/kafka/connect/sink/DatabendSinkConfig$PrimaryKeyMode.class +com/databend/kafka/connect/sink/RecordValidator.class +com/databend/kafka/connect/databendclient/DatabendConnection$StatementBinder.class +com/databend/kafka/connect/sink/TableAlterOrCreateException.class +com/databend/kafka/connect/sink/metadata/SchemaPair.class +com/databend/kafka/connect/util/Version.class +com/databend/kafka/connect/sink/DbStructure.class com/databend/kafka/connect/sink/DatabendSinkConfig.class +com/databend/kafka/connect/databendclient/ColumnIdentity.class +com/databend/kafka/connect/sink/DbStructure$1.class +com/databend/kafka/connect/databendclient/ColumnDefinition.class +com/databend/kafka/connect/databendclient/CachedConnectionProvider.class +com/databend/kafka/connect/databendclient/SQLExpressionBuilder.class +com/databend/kafka/connect/databendclient/SQLExpressionBuilder$Expressable.class +com/databend/kafka/connect/databendclient/TableDefinition.class +com/databend/kafka/connect/sink/DatabendClient$1.class +com/databend/kafka/connect/sink/TimestampIncrementingCriteria.class +com/databend/kafka/connect/sink/records/DatabendRecordContent.class diff --git a/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst index 9b2dbb5..f5d7eff 100644 --- a/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst +++ b/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst @@ -8,6 +8,8 @@ /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/util/TimeZoneValidator.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/DatabendWriter.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/databendclient/ConnectionProvider.java +/Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/records/DatabendJsonSchema.java +/Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/records/DatabendRecordContent.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/util/QuoteWay.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/util/StringUtils.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/BufferedRecords.java @@ -15,13 +17,16 @@ /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/metadata/FieldsMetadata.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/DatabendSinkConnector.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/databendclient/DatabendConnection.java +/Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/records/DatabendAvroConverter.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/databendclient/TableIdentity.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/DatabendSinkTask.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/DatabendClient.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/util/Version.java +/Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/records/DatabendConverter.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/PreparedStatementBinder.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/TableAlterOrCreateException.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/databendclient/ColumnDefinition.java +/Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/records/AvroConverterConfig.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/util/BytesUtil.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/databendclient/TableType.java /Users/hanshanjie/git-works/databend-kafka-connect/src/main/java/com/databend/kafka/connect/sink/RecordValidator.java