Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name: Cron Datax test
name: TEST CI

on:
pull_request:
branches: [main]

jobs:
build-datax:
build-connect:
runs-on: [self-hosted, X64, Linux, 8c16g]
steps:
- name: Checkout
Expand Down
150 changes: 132 additions & 18 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
</licenses>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<derby.version>10.14.2.0</derby.version>
<commons-io.version>2.4</commons-io.version>
<kafka.connect.maven.plugin.version>0.11.1</kafka.connect.maven.plugin.version>
Expand All @@ -64,6 +66,26 @@
<testcontainers.version>1.17.3</testcontainers.version>
<databend.jdbc.driver.version>0.1.2</databend.jdbc.driver.version>
</properties>
<repositories>
<repository>
<id>confluent</id>
<name>Confluent</name>
<url>https://packages.confluent.io/maven/</url>
</repository>

<repository>
<id>cloudera-repo</id>
<url>
https://repository.cloudera.com/content/repositories/releases/
</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<!-- <repositories>-->
<!-- <repository>-->
Expand Down Expand Up @@ -271,7 +293,103 @@
<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.1.2</version>
<version>0.2.7</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.2.1</version>
<exclusions>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-avro-converter -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>7.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.2.1</version>
<exclusions>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
</exclusion>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>common-config</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

Expand All @@ -280,14 +398,10 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<inherited>true</inherited>
<version>3.7.0</version>
<configuration>
<compilerArgs>
<arg>-Xlint:all</arg>
<arg>-Werror</arg>
</compilerArgs>
<source>8</source>
<target>8</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -418,16 +532,16 @@
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>3.0.1</version>
<configuration>
<autoVersionSubmodules>true</autoVersionSubmodules>
<remoteTagging>false</remoteTagging>
<tagNameFormat>v@{project.version}</tagNameFormat>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-release-plugin</artifactId>-->
<!-- <version>3.0.1</version>-->
<!-- <configuration>-->
<!-- <autoVersionSubmodules>true</autoVersionSubmodules>-->
<!-- <remoteTagging>false</remoteTagging>-->
<!-- <tagNameFormat>v@{project.version}</tagNameFormat>-->
<!-- </configuration>-->
<!-- </plugin>-->
</plugins>
<resources>
<resource>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public TableDefinition get(
) throws SQLException {
TableDefinition dbTable = cache.get(tableId);
if (dbTable == null) {
log.info("Table {} not found in cache; checking database", tableId);
if (dialect.tableExists(connection, tableId)) {
log.info("Table {} exists in database", tableId);
dbTable = dialect.describeTable(connection, tableId);
if (dbTable != null) {
log.info("Setting metadata for table {} to {}", tableId, dbTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public BufferedRecords(
}

public List<SinkRecord> add(SinkRecord record) throws SQLException, TableAlterOrCreateException {
log.info("Adding record to buffer: {}", record);
recordValidator.validate(record);
final List<SinkRecord> flushed = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ public boolean tableExists(
String[] tableTypes = tableTypes(metadata, this.tableTypes);
String tableTypeDisplay = displayableTableTypes(tableTypes, "/");
glog.info("Checking {} dialect for existence of {} {}", this, tableTypeDisplay, tableId);
glog.info("catalogName is {}, schemaName is {}, tableName is {}", tableId.catalogName(),tableId.schemaName(), tableId.tableName());
try (ResultSet rs = connection.getMetaData().getTables(
tableId.catalogName(),
tableId.schemaName(),
Expand Down Expand Up @@ -1399,7 +1400,7 @@ public String buildCreateTableStatement(
SQLExpressionBuilder builder = expressionBuilder();

final List<String> pkFieldNames = extractPrimaryKeyFieldNames(fields);
builder.append("CREATE TABLE ");
builder.append("CREATE TABLE IF NOT EXISTS ");
builder.append(table);
builder.append(" (");
writeColumnsSpec(builder, fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class DatabendSinkTask extends SinkTask {

boolean shouldTrimSensitiveLogs;

public DatabendSinkTask() {
}

@Override
public void start(final Map<String, String> props) {
log.info("Starting Databend Sink task");
Expand All @@ -53,19 +56,41 @@ void initWriter() {
log.info("Databend writer initialized");
}

@Override
public void open(final Collection<TopicPartition> partitions) {
// This method is called when the task's assigned partitions are changed.
// You can initialize resources related to the assigned partitions here.
// For now, we are just logging the assigned partitions.

log.info("Opening Databend Sink task for the following partitions:");
for (TopicPartition partition : partitions) {
log.info("Partition: {}", partition);
}
}

@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) throws RetriableException {
// You can add any processing you need to do before committing the offsets here.
// For now, we are just returning the offsets as is.
return offsets;
}

@Override
public void put(Collection<SinkRecord> records) {
log.info("###: {}", records);
log.info("Received {} records", records.size());
if (records.isEmpty()) {
return;
}
final SinkRecord first = records.iterator().next();
final int recordsCount = records.size();
log.debug(
log.info(
"Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the "
+ "database...",
recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset()
);
try {
log.info("Writing {} records", records.size());
writer.write(records);
} catch (TableAlterOrCreateException tace) {
if (reporter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,17 @@ protected void onConnect(final Connection connection) throws SQLException {
void write(final Collection<SinkRecord> records)
throws SQLException, TableAlterOrCreateException {
final Connection connection = cachedConnectionProvider.getConnection();
log.info("DatabendWriter Writing {} records", records.size());
log.info("DatabendWriter Writing records is: {}", records);
try {
final Map<TableIdentity, BufferedRecords> bufferByTable = new HashMap<>();
for (SinkRecord record : records) {
log.info("DatabendWriter Writing record keySchema is: {}", record.keySchema());
log.info("DatabendWriter Writing record valueSchema is: {}", record.valueSchema().fields());
log.info("DatabendWriter Writing record key is: {}", record.key());
log.info("DatabendWriter Writing record value is: {}", record.value());
log.info("DatabendWriter Writing record topic is: {}", record.topic());
log.info("DatabendWriter Writing record timestamp is: {}", record.timestamp());
final TableIdentity tableId = destinationTable(record.topic());
BufferedRecords buffer = bufferByTable.get(tableId);
if (buffer == null) {
Expand All @@ -64,7 +72,7 @@ void write(final Collection<SinkRecord> records)
buffer.flush();
buffer.close();
}
connection.commit();
// connection.commit();
} catch (SQLException | TableAlterOrCreateException e) {
// e.addSuppressed(e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ boolean amendIfNecessary(
final int maxRetries
) throws SQLException, TableAlterOrCreateException {
final TableDefinition tableDefn = tableDefns.get(connection, tableId);
log.info("tableDefn: {}", tableDefn);
log.info("Amending table {} with fieldsMetadata: {}", tableId, fieldsMetadata);

final Set<SinkRecordField> missingFields = missingFields(
fieldsMetadata.allFields.values(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

Expand All @@ -12,6 +14,7 @@ public class FieldsMetadata {
public final Set<String> keyFieldNames;
public final Set<String> nonKeyFieldNames;
public final Map<String, SinkRecordField> allFields;
private static final Logger LOGGER = LoggerFactory.getLogger(DatabendSinkConfig.class);

// visible for testing
public FieldsMetadata(
Expand Down Expand Up @@ -86,6 +89,7 @@ public static FieldsMetadata extract(
}

final Set<String> nonKeyFieldNames = new LinkedHashSet<>();
LOGGER.info("@@Value schema is: {}", valueSchema);
if (valueSchema != null) {
for (Field field : valueSchema.fields()) {
if (keyFieldNames.contains(field.name())) {
Expand Down Expand Up @@ -246,6 +250,9 @@ private static void extractRecordValuePk(
DatabendSinkConfig.PrimaryKeyMode.RECORD_VALUE)
);
}

LOGGER.info("Value schema is: {}", valueSchema.toString());
LOGGER.info("Value fields are: {}", valueSchema.fields());
if (configuredPkFields.isEmpty()) {
for (Field keyField : valueSchema.fields()) {
keyFieldNames.add(keyField.name());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.databend.kafka.connect.sink.records;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import java.util.Map;

class AvroConverterConfig extends AbstractKafkaAvroSerDeConfig {
AvroConverterConfig(final Map<?, ?> props) {
super(baseConfigDef(), props);
}
}
Loading