diff --git a/build.gradle.kts b/build.gradle.kts index e79a6946..fe60ce61 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -164,6 +164,7 @@ dependencies { runtimeOnly("org.xerial:sqlite-jdbc:3.45.2.0") runtimeOnly("org.postgresql:postgresql:42.7.3") + runtimeOnly("com.oracle.database.jdbc:ojdbc8:19.3.0.0") runtimeOnly("net.sourceforge.jtds:jtds:1.3.1") runtimeOnly("net.snowflake:snowflake-jdbc:3.14.2") runtimeOnly("com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11") @@ -204,6 +205,8 @@ dependencies { integrationTestImplementation("org.testcontainers:kafka:$testcontainersVersion") // this is not Kafka version integrationTestImplementation("org.testcontainers:testcontainers:$testcontainersVersion") integrationTestImplementation("org.testcontainers:postgresql:$testcontainersVersion") + integrationTestImplementation("org.testcontainers:oracle-free:$testcontainersVersion") + integrationTestImplementation("org.awaitility:awaitility:$awaitilityVersion") integrationTestImplementation("org.assertj:assertj-db:2.0.2") diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/AbstractOracleIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/AbstractOracleIT.java new file mode 100644 index 00000000..94906818 --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/AbstractOracleIT.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.oracle; + +import javax.sql.DataSource; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import io.aiven.kafka.connect.jdbc.AbstractIT; + +import oracle.jdbc.pool.OracleDataSource; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.oracle.OracleContainer; +import org.testcontainers.utility.DockerImageName; + +public class AbstractOracleIT extends AbstractIT { + + public static final String DEFAULT_ORACLE_TAG = "slim-faststart"; + private static final DockerImageName DEFAULT_ORACLE_IMAGE_NAME = + DockerImageName.parse("gvenzl/oracle-free") + .withTag(DEFAULT_ORACLE_TAG); + @Container + public static final OracleContainer ORACLE_CONTAINER = new OracleContainer(DEFAULT_ORACLE_IMAGE_NAME); + + protected void executeSqlStatement(final String sqlStatement) throws SQLException { + try (final Connection connection = getDatasource().getConnection(); + final Statement statement = connection.createStatement()) { + statement.executeUpdate(sqlStatement); + } + } + + protected DataSource getDatasource() throws SQLException { + final OracleDataSource dataSource = new OracleDataSource(); + dataSource.setServerName(ORACLE_CONTAINER.getHost()); + // Assuming the default Oracle port is 1521 + dataSource.setPortNumber(ORACLE_CONTAINER.getMappedPort(1521)); + // Or use setDatabaseName() if that's how your Oracle is configured + dataSource.setServiceName(ORACLE_CONTAINER.getDatabaseName()); + dataSource.setUser(ORACLE_CONTAINER.getUsername()); + dataSource.setPassword(ORACLE_CONTAINER.getPassword()); + dataSource.setDriverType("thin"); + return dataSource; + } + + + protected Map basicConnectorConfig() { + final HashMap config = new HashMap<>(); + config.put("tasks.max", "1"); + config.put("connection.url", ORACLE_CONTAINER.getJdbcUrl()); + config.put("connection.user", ORACLE_CONTAINER.getUsername()); + config.put("connection.password", ORACLE_CONTAINER.getPassword()); + config.put("dialect.name", "OracleDatabaseDialect"); + config.put("key.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + config.put("value.converter", "io.confluent.connect.avro.AvroConverter"); + config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl()); + return config; + } +} diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java new file mode 100644 index 00000000..2d1c4e39 --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java @@ -0,0 +1,317 @@ +/* + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.oracle; + +import java.sql.SQLException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; + +import io.aiven.connect.jdbc.JdbcSinkConnector; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.assertj.db.type.Table; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.db.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertNotSame; + +public class VerifyDeleteIT extends AbstractOracleIT { + + private static final String TEST_TOPIC_NAME = "SINK_TOPIC"; + private static final String CONNECTOR_NAME = "test-sink-connector"; + private static final int TEST_TOPIC_PARTITIONS = 1; + + private static final Schema VALUE_RECORD_SCHEMA = new Schema.Parser().parse("{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ]\n" + + "}"); + + private static final String DROP_TABLE = String.format("DROP TABLE IF EXISTS %s", TEST_TOPIC_NAME); + private static final String CREATE_TABLE = String.format("CREATE TABLE \"%s\" (\n" + + " \"id\" NUMBER NOT NULL,\n" + + " \"name\" VARCHAR2(255) NOT NULL,\n" + + " \"value\" VARCHAR2(255) NOT NULL,\n" + + "PRIMARY KEY(\"id\")" + + ")", TEST_TOPIC_NAME); + + private Map sinkConnectorConfigForDelete() { + final Map config = basicConnectorConfig(); + config.put("name", CONNECTOR_NAME); + config.put("connector.class", JdbcSinkConnector.class.getName()); + config.put("topics", TEST_TOPIC_NAME); + config.put("pk.mode", "record_key"); + config.put("pk.fields", "id"); // assigned name for the primitive key + config.put("delete.enabled", String.valueOf(true)); + return config; + } + + private void assertNotEmptyPoll(final Duration duration) { + final ConsumerRecords records = consumer.poll(duration); + assertNotSame(ConsumerRecords.empty(), records); + } + + private ProducerRecord createRecord( + final int id, final int partition, final String name, final String value) { + final GenericRecord record = new GenericData.Record(VALUE_RECORD_SCHEMA); + record.put("name", name); + record.put("value", value); + return new ProducerRecord<>(TEST_TOPIC_NAME, partition, String.valueOf(id), record); + } + + private ProducerRecord createTombstoneRecord( + final int id, final int partition) { + return new ProducerRecord<>(TEST_TOPIC_NAME, partition, String.valueOf(id), null); + } + + private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final String recordName = "user-" + i; + final String recordValue = "value-" + i; + final ProducerRecord msg = createRecord(i, partition, recordName, recordValue); + sendFutures.add(producer.send(msg)); + } + } + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + private void sendTestDataWithTombstone(final int numberOfRecords) throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final ProducerRecord record = createTombstoneRecord(i, partition); + sendFutures.add(producer.send(record)); + } + } + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + private void sendMixedTestDataWithTombstone(final int numberOfRecords, final int numberOfTombstoneRecords) + throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final String recordName = "user-" + i; + final String recordValue = "value-" + i; + final ProducerRecord msg = createRecord( + i, partition, recordName, recordValue); + sendFutures.add(producer.send(msg)); + if (i < numberOfTombstoneRecords) { + final ProducerRecord record = createTombstoneRecord(i, partition); + sendFutures.add(producer.send(record)); + } + } + } + + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + @BeforeEach + public void afterEach() throws SQLException { + executeSqlStatement(DROP_TABLE); + executeSqlStatement(CREATE_TABLE); + } + + @Test + public void testDeleteTombstoneRecord() throws Exception { + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0))); + + // Start the sink connector + connectRunner.createConnector(sinkConnectorConfigForDelete()); + + sendTestData(3); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0") + .value().isEqualTo("1") + .value().isEqualTo("2"); + }); + + // Send test data to Kafka topic (including a tombstone record) + sendTestDataWithTombstone(1); + + assertNotEmptyPoll(Duration.ofSeconds(50)); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(2); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("1") + .value().isEqualTo("2"); + }); + } + + @Test + public void testWithJustTombstoneRecordInInsertMode() throws Exception { + // Test logic is similar to previous tests, but with tombstone records. + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0))); + + // Start the sink connector + final Map config = sinkConnectorConfigForDelete(); + connectRunner.createConnector(config); + + sendTestDataWithTombstone(2); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0); + }); + } + + @Test + public void testMultiInsertMode() throws Exception { + // Test logic is similar to previous tests, but with multi-insert mode enabled + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0))); + + // Start the sink connector + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendTestData(5); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0") + .value().isEqualTo("1") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + } + + @Test + public void testDeleteTombstoneRecordWithMultiMode() throws Exception { + // Test logic is similar to previous tests, but with multi-insert mode enabled and tombstone records included + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0))); + + // Start the sink connector + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendTestData(5); + + await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofSeconds(20)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(5); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0") + .value().isEqualTo("1") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + + sendTestDataWithTombstone(1); + + await().atMost(Duration.ofSeconds(30)).pollInterval(Duration.ofSeconds(18)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(4); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("1") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + } + + @Test + public void testWithJustTombstoneRecordWithMultiMode() throws Exception { + // Test logic with multi-insert mode enabled and has only tombstone records + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0))); + + // Start the sink connector + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendTestDataWithTombstone(2); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(18)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(0); + }); + } + + @Test + public void testMixTombstoneRecordsWithMultiMode() throws Exception { + // Test logic is similar to previous tests, but with mixed tombstone records and multi-insert mode + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0))); + + // Start the sink connector + final Map config = sinkConnectorConfigForDelete(); + config.put("insert.mode", "MULTI"); + connectRunner.createConnector(config); + + sendMixedTestDataWithTombstone(5, 2); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(19)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(3); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("2") + .value().isEqualTo("3") + .value().isEqualTo("4"); + }); + } +} diff --git a/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyInsertIT.java b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyInsertIT.java new file mode 100644 index 00000000..32fb354a --- /dev/null +++ b/src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyInsertIT.java @@ -0,0 +1,164 @@ +/* + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.jdbc.oracle; + +import java.sql.SQLException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; + +import io.aiven.connect.jdbc.JdbcSinkConnector; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.assertj.db.type.Table; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.db.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + + +public class VerifyInsertIT extends AbstractOracleIT { + + private static final String TEST_TOPIC_NAME = "SINK_TOPIC"; + private static final String CONNECTOR_NAME = "test-sink-connector"; + private static final int TEST_TOPIC_PARTITIONS = 1; + private static final Schema VALUE_RECORD_SCHEMA = new Schema.Parser().parse("{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"id\",\n" + + " \"type\": \"int\"\n" + + " },\n" + + " {\n" + + " \"name\": \"name\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value\",\n" + + " \"type\": \"string\"\n" + + " }\n" + + " ]\n" + + "}"); + private static final String DROP_TABLE = String.format("DROP TABLE IF EXISTS %s", TEST_TOPIC_NAME); + private static final String CREATE_TABLE_WITH_PK = String.format("CREATE TABLE \"%s\" (\n" + + " \"id\" NUMBER NOT NULL,\n" + + " \"name\" VARCHAR2(255) NOT NULL,\n" + + " \"value\" VARCHAR2(255) NOT NULL,\n" + + "PRIMARY KEY(\"id\")" + + ")", TEST_TOPIC_NAME); + private static final String CREATE_TABLE = String.format("CREATE TABLE \"%s\" (\n" + + " \"id\" NUMBER NOT NULL,\n" + + " \"name\" VARCHAR2(255) NOT NULL,\n" + + " \"value\" VARCHAR2(255) NOT NULL\n" + + ")", TEST_TOPIC_NAME); + + + private Map basicSinkConnectorConfig() { + final Map config = basicConnectorConfig(); + config.put("name", CONNECTOR_NAME); + config.put("connector.class", JdbcSinkConnector.class.getName()); + config.put("topics", TEST_TOPIC_NAME); + return config; + } + + private Map sinkConnectorConfigWithPKModeRecordKey() { + final Map config = basicSinkConnectorConfig(); + config.put("pk.mode", "record_key"); + config.put("pk.fields", "id"); // assigned name for the primitive key + return config; + } + + private ProducerRecord createRecord( + final int id, final int partition, final String name, final String value) { + final GenericRecord record = new GenericData.Record(VALUE_RECORD_SCHEMA); + record.put("id", id); + record.put("name", name); + record.put("value", value); + return new ProducerRecord<>(TEST_TOPIC_NAME, partition, String.valueOf(id), record); + } + + private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException { + final List> sendFutures = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + for (int partition = 0; partition < TEST_TOPIC_PARTITIONS; partition++) { + final String recordName = "user-" + i; + final String recordValue = "value-" + i; + final ProducerRecord msg = createRecord(i, partition, recordName, recordValue); + sendFutures.add(producer.send(msg)); + } + } + producer.flush(); + for (final Future sendFuture : sendFutures) { + sendFuture.get(); + } + } + + @AfterEach + public void afterEach() throws SQLException { + executeSqlStatement(DROP_TABLE); + } + + @Test + public void testSinkConnector() throws Exception { + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + executeSqlStatement(CREATE_TABLE); + consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0))); + // Start the sink connector + connectRunner.createConnector(basicSinkConnectorConfig()); + + // Send test data to Kafka topic + sendTestData(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(19)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0"); + }); + } + + @Test + public void testSinkWithPKModeRecordKeyConnector() throws Exception { + createTopic(TEST_TOPIC_NAME, 1); // Create Kafka topic matching the table name + executeSqlStatement(CREATE_TABLE_WITH_PK); + consumer.assign(Collections.singleton(new TopicPartition(TEST_TOPIC_NAME, 0))); + // Start the sink connector + connectRunner.createConnector(sinkConnectorConfigWithPKModeRecordKey()); + + // Send test data to Kafka topic + sendTestData(1); + + await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofSeconds(19)) + .untilAsserted(() -> { + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1); + assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).column("ID") + .value().isEqualTo("0"); + }); + } +} diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java index 2a4befdc..93aac8b8 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2018 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -360,6 +360,20 @@ default String buildInsertStatement(TableId table, return buildInsertStatement(table, keyColumns, nonKeyColumns); } + /** + * Build the DELETE prepared statement expression for the given table and its columns. + * + * @param table the identifier of the table; may not be null + * @param keyColumns the identifiers of the columns in the primary/unique key; may not be null + * but may be empty + * @return the DELETE statement; may not be null + */ + default String buildDeleteStatement(TableId table, + int records, + Collection keyColumns) { + return buildDeleteStatement(table, records, keyColumns); + } + /** * Build the UPDATE prepared statement expression for the given table and its columns. Variables * for each key column should also appear in the WHERE clause of the statement. @@ -524,6 +538,16 @@ default void bindRecord(SinkRecord record) throws SQLException { * @throws SQLException if there is a problem binding values into the statement */ int bindRecord(int index, SinkRecord record) throws SQLException; + + /** + * Bind the values in the supplied tombstone record. + * + * @param record the sink record with values to be bound into the statement; never null + * @throws SQLException if there is a problem binding values into the statement + */ + default void bindTombstoneRecord(SinkRecord record) throws SQLException { + bindTombstoneRecord(record); + } } /** diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java index 1ccd2744..57acbec6 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2018 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -1398,6 +1398,30 @@ public String buildMultiInsertStatement(final TableId table, return insertStatement + allRowsPlaceholder; } + @Override + public String buildDeleteStatement( + final TableId table, + final int records, + final Collection keyColumns + ) { + if (records < 1) { + throw new IllegalArgumentException("number of records must be a positive number, but got: " + records); + } + if (isEmpty(keyColumns)) { + throw new IllegalArgumentException("no columns specified"); + } + requireNonNull(table, "table must not be null"); + final ExpressionBuilder builder = expressionBuilder(); + builder.append("DELETE FROM "); + builder.append(table); + builder.append(" WHERE "); + builder.appendList() + .delimitedBy(" AND ") + .transformedBy(ExpressionBuilder.columnNamesWith(" = ?")) + .of(keyColumns); + return builder.toString(); + } + @Override public String buildUpdateStatement( final TableId table, diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/OracleDatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/OracleDatabaseDialect.java index 2ea50dec..770e5ac5 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/OracleDatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/OracleDatabaseDialect.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2018 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java index 9d93bdea..3a0a641d 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -53,12 +53,16 @@ public class BufferedRecords { private final Connection connection; private List records = new ArrayList<>(); + private List tombstoneRecords = new ArrayList<>(); private SchemaPair currentSchemaPair; private FieldsMetadata fieldsMetadata; private TableDefinition tableDefinition; private PreparedStatement preparedStatement; private StatementBinder preparedStatementBinder; + private PreparedStatement deletePreparedStatement; + private StatementBinder deletePreparedStatementBinder; + public BufferedRecords( final JdbcSinkConfig config, final TableId tableId, @@ -86,10 +90,15 @@ public List add(final SinkRecord record) throws SQLException { } final List flushed; - if (currentSchemaPair.equals(schemaPair)) { + // Skip the schemaPair check for all tombstone records or the current schema pair matches + if (record.valueSchema() == null || currentSchemaPair.equals(schemaPair)) { // Continue with current batch state - records.add(record); - if (records.size() >= config.batchSize) { + if (config.deleteEnabled && isTombstone(record)) { + tombstoneRecords.add(record); + } else { + records.add(record); + } + if (records.size() >= config.batchSize || tombstoneRecords.size() >= config.batchSize) { log.debug("Flushing buffered records after exceeding configured batch size {}.", config.batchSize); flushed = flush(); @@ -109,25 +118,32 @@ public List add(final SinkRecord record) throws SQLException { } private void prepareStatement() throws SQLException { - final String sql; - log.debug("Generating query for insert mode {} and {} records", config.insertMode, records.size()); - if (config.insertMode == MULTI) { - sql = getMultiInsertSql(); - } else { - sql = getInsertSql(); + close(); + if (!records.isEmpty()) { + final String insertSql = config.insertMode == MULTI ? getMultiInsertSql() : getInsertSql(); + log.debug("Prepared SQL for insert mode {}: {}", config.insertMode, insertSql); + preparedStatement = connection.prepareStatement(insertSql); + preparedStatementBinder = dbDialect.statementBinder( + preparedStatement, + config.pkMode, + currentSchemaPair, + fieldsMetadata, + config.insertMode + ); } - log.debug("Prepared SQL {} for insert mode {}", sql, config.insertMode); - - close(); - preparedStatement = connection.prepareStatement(sql); - preparedStatementBinder = dbDialect.statementBinder( - preparedStatement, - config.pkMode, - currentSchemaPair, - fieldsMetadata, - config.insertMode - ); + if (!tombstoneRecords.isEmpty()) { + final String deleteSql = getDeleteSql(); + log.debug("Prepared SQL for tombstone records: {}", deleteSql); + deletePreparedStatement = connection.prepareStatement(deleteSql); + deletePreparedStatementBinder = dbDialect.statementBinder( + deletePreparedStatement, + config.pkMode, + currentSchemaPair, + fieldsMetadata, + config.insertMode + ); + } } /** @@ -153,14 +169,14 @@ private void reInitialize(final SchemaPair schemaPair) throws SQLException { } public List flush() throws SQLException { - if (records.isEmpty()) { - log.debug("Records is empty"); + if (records.isEmpty() && tombstoneRecords.isEmpty()) { + log.debug("Records are empty."); return new ArrayList<>(); } prepareStatement(); bindRecords(); - int totalUpdateCount = 0; + int totalSuccessfulExecutionCount = 0; boolean successNoInfo = false; log.debug("Executing batch..."); @@ -169,25 +185,40 @@ public List flush() throws SQLException { successNoInfo = true; continue; } - totalUpdateCount += updateCount; + totalSuccessfulExecutionCount += updateCount; + } + for (final int deleteCount : executeDeleteBatch()) { + if (deleteCount == Statement.SUCCESS_NO_INFO) { + successNoInfo = true; + continue; + } + totalSuccessfulExecutionCount += deleteCount; } log.debug("Done executing batch."); - if (totalUpdateCount != records.size() && !successNoInfo) { + verifySuccessfulExecutions(totalSuccessfulExecutionCount, successNoInfo); + + final List flushedRecords = records; + records = new ArrayList<>(); + return flushedRecords; + } + + private void verifySuccessfulExecutions(final int totalSuccessfulExecutionCount, final boolean successNoInfo) { + if (totalSuccessfulExecutionCount != records.size() + tombstoneRecords.size() && !successNoInfo) { switch (config.insertMode) { case INSERT: case MULTI: throw new ConnectException(String.format( - "Update count (%d) did not sum up to total number of records inserted (%d)", - totalUpdateCount, - records.size() + "Update count (%d) did not sum up to total number of records (%d)", + totalSuccessfulExecutionCount, + records.size() + tombstoneRecords.size() )); case UPSERT: case UPDATE: log.debug( - "{} records:{} resulting in in totalUpdateCount:{}", + "{} records:{} resulting in totalSuccessfulExecutionCount:{}", config.insertMode, - records.size(), - totalUpdateCount + records.size() + tombstoneRecords.size(), + totalSuccessfulExecutionCount ); break; default: @@ -198,21 +229,28 @@ public List flush() throws SQLException { log.info( "{} records:{} , but no count of the number of rows it affected is available", config.insertMode, - records.size() + records.size() + tombstoneRecords.size() ); } - - final List flushedRecords = records; - records = new ArrayList<>(); - return flushedRecords; } private int[] executeBatch() throws SQLException { - if (config.insertMode == MULTI) { - preparedStatement.addBatch(); + if (preparedStatement != null) { + if (config.insertMode == MULTI) { + preparedStatement.addBatch(); + } + log.debug("Executing batch with insert mode {}", config.insertMode); + return preparedStatement.executeBatch(); } - log.debug("Executing batch with insert mode {}", config.insertMode); - return preparedStatement.executeBatch(); + return new int[0]; + } + + private int[] executeDeleteBatch() throws SQLException { + if (deletePreparedStatement != null) { + log.debug("Executing batch delete"); + return deletePreparedStatement.executeBatch(); + } + return new int[0]; } private void bindRecords() throws SQLException { @@ -228,15 +266,29 @@ private void bindRecords() throws SQLException { preparedStatementBinder.bindRecord(record); } } + + for (final SinkRecord tombstoneRecord : tombstoneRecords) { + deletePreparedStatementBinder.bindTombstoneRecord(tombstoneRecord); + } log.debug("Done binding records."); } + private boolean isTombstone(final SinkRecord record) { + return record.value() == null; + } + public void close() throws SQLException { log.info("Closing BufferedRecords with preparedStatement: {}", preparedStatement); if (preparedStatement != null) { preparedStatement.close(); preparedStatement = null; } + + log.info("Closing BufferedRecords with deletePreparedStatement: {}", deletePreparedStatement); + if (deletePreparedStatement != null) { + deletePreparedStatement.close(); + deletePreparedStatement = null; + } } private String getMultiInsertSql() { @@ -305,6 +357,12 @@ private String getInsertSql() { } } + private String getDeleteSql() { + return dbDialect.buildDeleteStatement(tableId, + tombstoneRecords.size(), + asColumns(fieldsMetadata.keyFieldNames)); + } + private Collection asColumns(final Collection names) { return names.stream() .map(name -> new ColumnId(tableId, name)) diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java index 04be94c1..1abb5bca 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,7 +35,11 @@ import io.aiven.connect.jdbc.config.JdbcConfig; import io.aiven.connect.jdbc.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class JdbcSinkConfig extends JdbcConfig { + private static final Logger log = LoggerFactory.getLogger(JdbcSinkConfig.class); public enum InsertMode { INSERT, @@ -176,6 +180,12 @@ public enum PrimaryKeyMode { + " while this configuration is applicable for the other columns."; private static final String FIELDS_WHITELIST_DISPLAY = "Fields Whitelist"; + public static final String DELETE_ENABLED = "delete.enabled"; + private static final String DELETE_ENABLED_DEFAULT = "false"; + private static final String DELETE_ENABLED_DOC = + "Whether to enable the deletion of rows in the target table on tombstone messages"; + private static final String DELETE_ENABLED_DISPLAY = "Delete enabled"; + private static final ConfigDef.Range NON_NEGATIVE_INT_VALIDATOR = ConfigDef.Range.atLeast(0); private static final String WRITES_GROUP = "Writes"; @@ -218,7 +228,16 @@ public enum PrimaryKeyMode { BATCH_SIZE_DOC, WRITES_GROUP, 2, ConfigDef.Width.SHORT, - BATCH_SIZE_DISPLAY); + BATCH_SIZE_DISPLAY) + .define( + DELETE_ENABLED, + ConfigDef.Type.BOOLEAN, + DELETE_ENABLED_DEFAULT, + ConfigDef.Importance.MEDIUM, + DELETE_ENABLED_DOC, WRITES_GROUP, + 3, + ConfigDef.Width.SHORT, + DELETE_ENABLED_DISPLAY); // Data Mapping CONFIG_DEF @@ -370,6 +389,7 @@ public void ensureValid(final String name, final Object value) { public final List pkFields; public final Set fieldsWhitelist; public final TimeZone timeZone; + public boolean deleteEnabled; public JdbcSinkConfig(final Map props) { super(CONFIG_DEF, props); @@ -387,6 +407,17 @@ public JdbcSinkConfig(final Map props) { fieldsWhitelist = new HashSet<>(getList(FIELDS_WHITELIST)); final String dbTimeZone = getString(DB_TIMEZONE_CONFIG); timeZone = TimeZone.getTimeZone(ZoneId.of(dbTimeZone)); + if (pkMode.equals(PrimaryKeyMode.RECORD_KEY)) { + // Deletes can be enabled with delete.enabled=true, + // but only when the pk.mode is set to record_key. + // This is because deleting a row from the table + // requires the primary key be used as criteria. + deleteEnabled = getBoolean(DELETE_ENABLED); + } else { + if (getBoolean(DELETE_ENABLED)) { + log.error("Delete mode will enabled only if pk mode set to record_key"); + } + } } static Map topicToTableMapping(final List value) { diff --git a/src/main/java/io/aiven/connect/jdbc/sink/PreparedStatementBinder.java b/src/main/java/io/aiven/connect/jdbc/sink/PreparedStatementBinder.java index f49e30a4..ecbf6cec 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/PreparedStatementBinder.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/PreparedStatementBinder.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors * Copyright 2016 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -97,6 +97,11 @@ public int bindRecord(int index, final SinkRecord record) throws SQLException { return nextIndex; } + public void bindTombstoneRecord(final SinkRecord record) throws SQLException { + bindKeyFields(record, 1); + statement.addBatch(); + } + protected int bindKeyFields(final SinkRecord record, int index) throws SQLException { switch (pkMode) { case NONE: diff --git a/src/test/java/io/aiven/connect/jdbc/sink/BufferedRecordsTest.java b/src/test/java/io/aiven/connect/jdbc/sink/BufferedRecordsTest.java index ef63bd26..41e50536 100644 --- a/src/test/java/io/aiven/connect/jdbc/sink/BufferedRecordsTest.java +++ b/src/test/java/io/aiven/connect/jdbc/sink/BufferedRecordsTest.java @@ -45,6 +45,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.contains; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -190,6 +191,98 @@ public void testInsertModeUpdate() throws SQLException { } + @Test + public void testInsertModeWithDeleteEnabled() throws SQLException { + final HashMap props = new HashMap<>(); + props.put("connection.url", ""); + props.put("auto.create", true); + props.put("auto.evolve", true); + props.put("batch.size", 1000); + props.put("pk.mode", "record_key"); + props.put("pk.fields", "id"); + props.put("delete.enabled", true); + final JdbcSinkConfig config = new JdbcSinkConfig(props); + + final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(dbUrl, config); + final DbStructure dbStructureMock = mock(DbStructure.class); + when(dbStructureMock.createOrAmendIfNecessary(any(JdbcSinkConfig.class), + any(Connection.class), + any(TableId.class), + any(FieldsMetadata.class))) + .thenReturn(true); + + final Connection connectionMock = mock(Connection.class); + final PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(connectionMock.prepareStatement(contains("INSERT"))).thenReturn(preparedStatement); + when(preparedStatement.executeBatch()).thenReturn(new int[0]); + + final PreparedStatement deletePreparedStatement = mock(PreparedStatement.class); + when(connectionMock.prepareStatement(contains("DELETE"))).thenReturn(deletePreparedStatement); + when(deletePreparedStatement.executeBatch()).thenReturn(new int[]{1}); + + final TableId tableId = new TableId(null, null, "dummy"); + final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructureMock, connectionMock); + final Schema keySchema = SchemaBuilder.struct() + .field("id", SchemaBuilder.INT64_SCHEMA); + + final Struct keyStruct = new Struct(keySchema).put("id", 0L); + final Schema valueSchema = null; + final Struct valueStruct = null; + + final SinkRecord recordA = new SinkRecord("dummy-topic", 0, keySchema, keyStruct, valueSchema, valueStruct, 0); + buffer.add(recordA); + buffer.flush(); + + verify(connectionMock).prepareStatement(contains("DELETE FROM \"dummy\" WHERE \"id\" = ?")); + } + + @Test + public void testMultiInsertModeWithDeleteEnabled() throws SQLException { + final HashMap props = new HashMap<>(); + props.put("connection.url", ""); + props.put("auto.create", true); + props.put("auto.evolve", true); + props.put("batch.size", 1000); + props.put("pk.mode", "record_key"); + props.put("pk.fields", "id"); + props.put("delete.enabled", true); + props.put("insert.mode", "MULTI"); + final JdbcSinkConfig config = new JdbcSinkConfig(props); + + final DatabaseDialect dbDialect = DatabaseDialects.findBestFor(dbUrl, config); + final DbStructure dbStructureMock = mock(DbStructure.class); + when(dbStructureMock.createOrAmendIfNecessary(any(JdbcSinkConfig.class), + any(Connection.class), + any(TableId.class), + any(FieldsMetadata.class))) + .thenReturn(true); + + final Connection connectionMock = mock(Connection.class); + final PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(connectionMock.prepareStatement(contains("INSERT"))).thenReturn(preparedStatement); + when(preparedStatement.executeBatch()).thenReturn(new int[0]); + + final PreparedStatement deletePreparedStatement = mock(PreparedStatement.class); + when(connectionMock.prepareStatement(contains("DELETE"))).thenReturn(deletePreparedStatement); + when(deletePreparedStatement.executeBatch()).thenReturn(new int[]{1}); + + final TableId tableId = new TableId(null, null, "dummy"); + final BufferedRecords buffer = new BufferedRecords(config, tableId, dbDialect, dbStructureMock, connectionMock); + final Schema keySchema = SchemaBuilder.struct() + .field("id", SchemaBuilder.INT64_SCHEMA); + + final Struct keyStruct = new Struct(keySchema).put("id", 0L); + final Schema valueSchema = null; + final Struct valueStruct = null; + + final SinkRecord recordA = new SinkRecord("dummy-topic", 0, keySchema, keyStruct, valueSchema, valueStruct, 0); + buffer.add(recordA); + buffer.flush(); + + verify(connectionMock).prepareStatement(contains("DELETE FROM \"dummy\" WHERE \"id\" = ?")); + } + + @Test public void testInsertModeMultiAutomaticFlush() throws SQLException { final JdbcSinkConfig config = multiModeConfig(2);