Skip to content

Commit

Permalink
✨ Source MongoDB V2: Refactor common offset loading logic (airbytehq#…
Browse files Browse the repository at this point in the history
…30029)

Co-authored-by: jdpgrailsdev <jdpgrailsdev@users.noreply.github.com>
Co-authored-by: Jose Pefaur <jose.pefaur@gmail.com>
Co-authored-by: Cole Snodgrass <cole@airbyte.io>
Co-authored-by: colesnodgrass <colesnodgrass@users.noreply.github.com>
Co-authored-by: subodh <subodh1810@gmail.com>
Co-authored-by: Ben Church <ben@airbyte.io>
  • Loading branch information
7 people authored and ariesgun committed Oct 20, 2023
1 parent 24ee6c6 commit cc26174
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debezium.internals;

import io.debezium.config.Configuration;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;

/**
* Represents a utility class that assists with the parsing of Debezium offset state.
*/
public interface DebeziumStateUtil {

/**
* The name of the Debezium property that contains the unique name for the Debezium connector.
*/
String CONNECTOR_NAME_PROPERTY = "name";

/**
* Configuration for offset state key/value converters.
*/
Map<String, String> INTERNAL_CONVERTER_CONFIG = Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, Boolean.FALSE.toString());

/**
* Creates and starts a {@link FileOffsetBackingStore} that is used to store the tracked Debezium
* offset state.
*
* @param properties The Debezium configuration properties for the selected Debezium connector.
* @return A configured and started {@link FileOffsetBackingStore} instance.
*/
default FileOffsetBackingStore getFileOffsetBackingStore(final Properties properties) {
final FileOffsetBackingStore fileOffsetBackingStore = new FileOffsetBackingStore();
final Map<String, String> propertiesMap = Configuration.from(properties).asMap();
propertiesMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
propertiesMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
fileOffsetBackingStore.configure(new StandaloneConfig(propertiesMap));
fileOffsetBackingStore.start();
return fileOffsetBackingStore;
}

/**
* Creates and returns a {@link JsonConverter} that can be used to parse keys in the Debezium offset
* state storage.
*
* @return A {@link JsonConverter} for key conversion.
*/
default JsonConverter getKeyConverter() {
final JsonConverter keyConverter = new JsonConverter();
keyConverter.configure(INTERNAL_CONVERTER_CONFIG, true);
return keyConverter;
}

/**
* Creates and returns an {@link OffsetStorageReaderImpl} instance that can be used to load offset
* state from the offset file storage.
*
* @param fileOffsetBackingStore The {@link FileOffsetBackingStore} that contains the offset state
* saved to disk.
* @param properties The Debezium configuration properties for the selected Debezium connector.
* @return An {@link OffsetStorageReaderImpl} instance that can be used to load the offset state
* from the offset file storage.
*/
default OffsetStorageReaderImpl getOffsetStorageReader(final FileOffsetBackingStore fileOffsetBackingStore, final Properties properties) {
return new OffsetStorageReaderImpl(fileOffsetBackingStore, properties.getProperty(CONNECTOR_NAME_PROPERTY), getKeyConverter(),
getValueConverter());
}

/**
* Creates and returns a {@link JsonConverter} that can be used to parse values in the Debezium
* offset state storage.
*
* @return A {@link JsonConverter} for value conversion.
*/
default JsonConverter getValueConverter() {
final JsonConverter valueConverter = new JsonConverter();
valueConverter.configure(INTERNAL_CONVERTER_CONFIG, false);
return valueConverter;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.mongodb.client.MongoClient;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateUtil;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.debezium.config.Configuration;
Expand All @@ -22,18 +23,13 @@
import io.debezium.connector.mongodb.ReplicaSets;
import io.debezium.connector.mongodb.ResumeTokens;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.bson.BsonDocument;
Expand All @@ -45,7 +41,7 @@
/**
* Collection of utility methods related to the Debezium offset state.
*/
public class MongoDbDebeziumStateUtil {
public class MongoDbDebeziumStateUtil implements DebeziumStateUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbDebeziumStateUtil.class);

Expand Down Expand Up @@ -162,19 +158,10 @@ public Optional<BsonDocument> savedOffset(final Properties baseProperties,
private Optional<BsonDocument> parseSavedOffset(final Properties properties, final MongoClient mongoClient) {
FileOffsetBackingStore fileOffsetBackingStore = null;
OffsetStorageReaderImpl offsetStorageReader = null;

try {
fileOffsetBackingStore = new FileOffsetBackingStore();
final Map<String, String> propertiesMap = Configuration.from(properties).asMap();
propertiesMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
propertiesMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
fileOffsetBackingStore.configure(new StandaloneConfig(propertiesMap));
fileOffsetBackingStore.start();

final Map<String, String> internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
final JsonConverter keyConverter = new JsonConverter();
keyConverter.configure(internalConverterConfig, true);
final JsonConverter valueConverter = new JsonConverter();
valueConverter.configure(internalConverterConfig, false);
fileOffsetBackingStore = getFileOffsetBackingStore(properties);
offsetStorageReader = getOffsetStorageReader(fileOffsetBackingStore, properties);

final Configuration config = Configuration.from(properties);
final MongoDbTaskContext taskContext = new MongoDbTaskContext(config);
Expand All @@ -183,8 +170,6 @@ private Optional<BsonDocument> parseSavedOffset(final Properties properties, fin

LOGGER.debug("Parsing saved offset state for replica set '{}' and server ID '{}'...", replicaSets.all().get(0), properties.getProperty("name"));

offsetStorageReader = new OffsetStorageReaderImpl(fileOffsetBackingStore, properties.getProperty("name"), keyConverter, valueConverter);

final MongoDbOffsetContext.Loader loader = new MongoDbCustomLoader(mongoDbConnectorConfig, replicaSets);
final Collection<Map<String, String>> partitions = loader.getPartitions();
final Map<Map<String, String>, Map<String, Object>> offsets = offsetStorageReader.offsets(partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage.SchemaHistory;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateUtil;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -44,16 +45,12 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlDebeziumStateUtil {
public class MySqlDebeziumStateUtil implements DebeziumStateUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDebeziumStateUtil.class);

Expand Down Expand Up @@ -179,36 +176,23 @@ public Optional<MysqlDebeziumStateAttributes> savedOffset(final Properties baseP
}

private Optional<MysqlDebeziumStateAttributes> parseSavedOffset(final Properties properties) {

FileOffsetBackingStore fileOffsetBackingStore = null;
OffsetStorageReaderImpl offsetStorageReader = null;

try {
fileOffsetBackingStore = new FileOffsetBackingStore();
final Map<String, String> propertiesMap = Configuration.from(properties).asMap();
propertiesMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
propertiesMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
fileOffsetBackingStore.configure(new StandaloneConfig(propertiesMap));
fileOffsetBackingStore.start();

final Map<String, String> internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
final JsonConverter keyConverter = new JsonConverter();
keyConverter.configure(internalConverterConfig, true);
final JsonConverter valueConverter = new JsonConverter();
valueConverter.configure(internalConverterConfig, false);
fileOffsetBackingStore = getFileOffsetBackingStore(properties);
offsetStorageReader = getOffsetStorageReader(fileOffsetBackingStore, properties);

final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(Configuration.from(properties));
final MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(connectorConfig);
final Set<Partition> partitions =
Collections.singleton(new MySqlPartition(connectorConfig.getLogicalName(), properties.getProperty(DATABASE_NAME.name())));

offsetStorageReader = new OffsetStorageReaderImpl(fileOffsetBackingStore, properties.getProperty("name"), keyConverter,
valueConverter);
final OffsetReader<Partition, MySqlOffsetContext, Loader> offsetReader = new OffsetReader<>(offsetStorageReader,
loader);
final Map<Partition, MySqlOffsetContext> offsets = offsetReader.offsets(partitions);

return extractStateAttributes(partitions, offsets);

} finally {
LOGGER.info("Closing offsetStorageReader and fileOffsetBackingStore");
if (offsetStorageReader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateUtil;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand All @@ -39,10 +40,6 @@
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.postgresql.core.BaseConnection;
Expand All @@ -56,7 +53,7 @@
* This class is inspired by Debezium's Postgres connector internal implementation on how it parses
* the state
*/
public class PostgresDebeziumStateUtil {
public class PostgresDebeziumStateUtil implements DebeziumStateUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresDebeziumStateUtil.class);

Expand Down Expand Up @@ -152,41 +149,30 @@ private ChainedLogicalStreamBuilder addSlotOption(final String publicationName,
}

/**
* Loads the offset data from the saved Debezium offset file.
*
* @param properties Properties should contain the relevant properties like path to the debezium
* state file, etc. It's assumed that the state file is already initialised with the saved
* state
* @return Returns the LSN that Airbyte has acknowledged in the source database server
*/
private OptionalLong parseSavedOffset(final Properties properties) {

FileOffsetBackingStore fileOffsetBackingStore = null;
OffsetStorageReaderImpl offsetStorageReader = null;

try {
fileOffsetBackingStore = new FileOffsetBackingStore();
final Map<String, String> propertiesMap = Configuration.from(properties).asMap();
propertiesMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
propertiesMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
fileOffsetBackingStore.configure(new StandaloneConfig(propertiesMap));
fileOffsetBackingStore.start();

final Map<String, String> internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
final JsonConverter keyConverter = new JsonConverter();
keyConverter.configure(internalConverterConfig, true);
final JsonConverter valueConverter = new JsonConverter();
valueConverter.configure(internalConverterConfig, false);
fileOffsetBackingStore = getFileOffsetBackingStore(properties);
offsetStorageReader = getOffsetStorageReader(fileOffsetBackingStore, properties);

final PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(Configuration.from(properties));
final PostgresCustomLoader loader = new PostgresCustomLoader(postgresConnectorConfig);
final Set<Partition> partitions =
Collections.singleton(new PostgresPartition(postgresConnectorConfig.getLogicalName(), properties.getProperty(DATABASE_NAME.name())));
offsetStorageReader = new OffsetStorageReaderImpl(fileOffsetBackingStore, properties.getProperty("name"), keyConverter,
valueConverter);

final OffsetReader<Partition, PostgresOffsetContext, Loader> offsetReader = new OffsetReader<>(offsetStorageReader, loader);
final Map<Partition, PostgresOffsetContext> offsets = offsetReader.offsets(partitions);

return extractLsn(partitions, offsets, loader);

} finally {
LOGGER.info("Closing offsetStorageReader and fileOffsetBackingStore");
if (offsetStorageReader != null) {
Expand Down

0 comments on commit cc26174

Please sign in to comment.