Skip to content

Commit

Permalink
feat: Implement Tombstone Message Handling
Browse files Browse the repository at this point in the history
- Added support for handling tombstone messages in the JDBC sink connector.
- Implemented the ability to delete rows based on tombstone messages.
- Introduced a new parameter, `delete.enabled`, to control delete behavior.
- Aligned functionality with the documented approach for processing tombstones,
  similar to Confluent JDBC driver behavior.

Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
  • Loading branch information
Joel-hanson authored and Joel Hanson committed May 2, 2024
1 parent 39babfb commit d7e51af
Show file tree
Hide file tree
Showing 11 changed files with 843 additions and 47 deletions.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ dependencies {

runtimeOnly("org.xerial:sqlite-jdbc:3.45.2.0")
runtimeOnly("org.postgresql:postgresql:42.7.3")
runtimeOnly("com.oracle.database.jdbc:ojdbc8:19.3.0.0")
runtimeOnly("net.sourceforge.jtds:jtds:1.3.1")
runtimeOnly("net.snowflake:snowflake-jdbc:3.14.2")
runtimeOnly("com.microsoft.sqlserver:mssql-jdbc:12.6.1.jre11")
Expand Down Expand Up @@ -204,6 +205,8 @@ dependencies {
integrationTestImplementation("org.testcontainers:kafka:$testcontainersVersion") // this is not Kafka version
integrationTestImplementation("org.testcontainers:testcontainers:$testcontainersVersion")
integrationTestImplementation("org.testcontainers:postgresql:$testcontainersVersion")
integrationTestImplementation("org.testcontainers:oracle-free:$testcontainersVersion")

integrationTestImplementation("org.awaitility:awaitility:$awaitilityVersion")
integrationTestImplementation("org.assertj:assertj-db:2.0.2")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.jdbc.oracle;

import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

import io.aiven.kafka.connect.jdbc.AbstractIT;

import oracle.jdbc.pool.OracleDataSource;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.oracle.OracleContainer;
import org.testcontainers.utility.DockerImageName;

public class AbstractOracleIT extends AbstractIT {

public static final String DEFAULT_ORACLE_TAG = "slim-faststart";
private static final DockerImageName DEFAULT_ORACLE_IMAGE_NAME =
DockerImageName.parse("gvenzl/oracle-free")
.withTag(DEFAULT_ORACLE_TAG);
@Container
public static final OracleContainer ORACLE_CONTAINER = new OracleContainer(DEFAULT_ORACLE_IMAGE_NAME);

protected void executeSqlStatement(final String sqlStatement) throws SQLException {
try (final Connection connection = getDatasource().getConnection();
final Statement statement = connection.createStatement()) {
statement.executeUpdate(sqlStatement);
}
}

protected DataSource getDatasource() throws SQLException {
final OracleDataSource dataSource = new OracleDataSource();
dataSource.setServerName(ORACLE_CONTAINER.getHost());
// Assuming the default Oracle port is 1521
dataSource.setPortNumber(ORACLE_CONTAINER.getMappedPort(1521));
// Or use setDatabaseName() if that's how your Oracle is configured
dataSource.setServiceName(ORACLE_CONTAINER.getDatabaseName());
dataSource.setUser(ORACLE_CONTAINER.getUsername());
dataSource.setPassword(ORACLE_CONTAINER.getPassword());
dataSource.setDriverType("thin");
return dataSource;
}


protected Map<String, String> basicConnectorConfig() {
final HashMap<String, String> config = new HashMap<>();
config.put("tasks.max", "1");
config.put("connection.url", ORACLE_CONTAINER.getJdbcUrl());
config.put("connection.user", ORACLE_CONTAINER.getUsername());
config.put("connection.password", ORACLE_CONTAINER.getPassword());
config.put("dialect.name", "OracleDatabaseDialect");
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
return config;
}
}

0 comments on commit d7e51af

Please sign in to comment.