Skip to content

Commit

Permalink
Upgrade mongodb to use dbz 2.6.2 (#38238)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed Jun 11, 2024
1 parent 00e037b commit 3c9a612
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 210 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.37.1
version=0.37.2
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class AirbyteFileOffsetBackingStore(
Jsons.`object`(cdcState, MutableMap::class.java) as Map<String, String>
else emptyMap()

val updatedMap = updateStateForDebezium2_1(mapAsString)
var updatedMap = updateStateForDebezium2_1(mapAsString)
updatedMap = updateStateForDebezium2_6(updatedMap)

val mappedAsStrings: Map<ByteBuffer?, ByteBuffer?> =
updatedMap.entries.associate {
Expand All @@ -71,7 +72,7 @@ class AirbyteFileOffsetBackingStore(
return mapAsString
}

LOGGER.info { "Mutating sate to make it Debezium 2.1 compatible" }
LOGGER.info { "Mutating state to make it Debezium 2.1 compatible" }
val newKey =
if (dbName.isPresent)
SQL_SERVER_STATE_MUTATION.apply(key.substring(i, i1 + 1), dbName.get())
Expand All @@ -82,6 +83,28 @@ class AirbyteFileOffsetBackingStore(
return updatedMap
}

// Previously:
// {"["ci-test-database",{"rs":"atlas-pexnnq-shard-0","server_id":"ci-test-database"}]":"{"sec":1715722523,"ord":2,"transaction_id":null,"resume_token":"826643D91B000000022B0429296E1404"}"}
// Now:
// {["ci-test-database",{"server_id":"ci-test-database"}]={"sec":0,"ord":-1,"resume_token":"826643FA09000000022B0429296E1404"}}
private fun updateStateForDebezium2_6(mapAsString: Map<String, String>): Map<String, String> {
val updatedMap: MutableMap<String, String> = LinkedHashMap()
if (mapAsString.size > 0) {
val key = mapAsString.keys.stream().toList()[0]

if (!key.contains("\"rs\":")) {
// The state is Debezium 2.6 compatible. No need to change anything.
return mapAsString
}

LOGGER.info { "Mutating state to make it Debezium 2.6 compatible" }
val newKey = mongoShardMutation(key)
val value = mapAsString.getValue(key)
updatedMap[newKey] = value
}
return updatedMap
}

/**
* See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this
* method is not public. Reduced the try catch block to only the read operation from original
Expand Down Expand Up @@ -164,6 +187,22 @@ class AirbyteFileOffsetBackingStore(
"\"" +
key.substring(key.length - 2))
}
private fun mongoShardMutation(input: String): String {
val jsonObjectStart = input.indexOf("{", input.indexOf("["))
val jsonObjectEnd = input.lastIndexOf("}")

// Extract the JSON object as a substring
val jsonObjectString = input.substring(jsonObjectStart, jsonObjectEnd + 1)

// Remove the "rs" key-value pair using a regex
val modifiedJsonObjectString =
jsonObjectString.replace(Regex("""("rs":\s*".+?",\s*)"""), "")

// Replace the old JSON object with the modified one in the input string
val finalString = input.replace(jsonObjectString, modifiedJsonObjectString)

return finalString
}

private fun byteBufferToString(byteBuffer: ByteBuffer?): String {
Preconditions.checkNotNull(byteBuffer)
Expand Down
10 changes: 5 additions & 5 deletions airbyte-integrations/connectors/source-mongodb-v2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.36.3'
cdkVersionRequired = '0.37.2'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
}
Expand Down Expand Up @@ -38,8 +38,8 @@ java {
}

dependencies {
implementation 'io.debezium:debezium-embedded:2.5.1.Final'
implementation 'io.debezium:debezium-connector-mongodb:2.5.1.Final'
implementation 'io.debezium:debezium-embedded:2.6.2.Final'
implementation 'io.debezium:debezium-connector-mongodb:2.6.2.Final'

testImplementation 'org.testcontainers:mongodb:1.19.0'

Expand All @@ -53,8 +53,8 @@ dependencies {
dataGeneratorImplementation 'org.jetbrains.kotlinx:kotlinx-cli-jvm:0.3.5'
dataGeneratorImplementation 'org.mongodb:mongodb-driver-sync:4.10.2'

debeziumTestImplementation 'io.debezium:debezium-embedded:2.5.1.Final'
debeziumTestImplementation 'io.debezium:debezium-connector-mongodb:2.5.1.Final'
debeziumTestImplementation 'io.debezium:debezium-embedded:2.6.0.Final'
debeziumTestImplementation 'io.debezium:debezium-connector-mongodb:2.6.0.Final'
debeziumTestImplementation 'org.jetbrains.kotlinx:kotlinx-cli-jvm:0.3.5'
debeziumTestImplementation 'com.github.spotbugs:spotbugs-annotations:4.7.3'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.3.15
dockerImageTag: 1.4.0
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public MongoDbCdcInitializer() {
* @param mongoClient The {@link MongoClient} used to interact with the target MongoDB server.
* @param cdcMetadataInjector The {@link MongoDbCdcConnectorMetadataInjector} used to add metadata
* to generated records.
* @param streams The configured Airbyte catalog of streams for the source.
* @param stateManager The {@link MongoDbStateManager} that provides state information used for
* iterator selection.
* @param emittedAt The timestamp of the sync.
Expand Down Expand Up @@ -98,7 +99,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
final BsonDocument initialResumeToken =
MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient, databaseName, incrementalOnlyStreamsCatalog);
final JsonNode initialDebeziumState =
mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, mongoClient, databaseName);
mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, databaseName);
final MongoDbCdcState cdcState =
(stateManager.getCdcState() == null || stateManager.getCdcState().state() == null || stateManager.getCdcState().state().isNull())
? new MongoDbCdcState(initialDebeziumState, isEnforceSchema)
Expand All @@ -107,8 +108,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
Jsons.clone(defaultDebeziumProperties),
incrementalOnlyStreamsCatalog,
cdcState.state(),
config.getDatabaseConfig(),
mongoClient);
config.getDatabaseConfig());

// We should always be able to extract offset out of state if it's not null
if (cdcState.state() != null && optSavedOffset.isEmpty()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public static class Configuration {
*/
public static class OffsetState {

public static final String KEY_REPLICA_SET = SourceInfo.REPLICA_SET_NAME;
public static final String KEY_SERVER_ID = SourceInfo.SERVER_ID_KEY;
// public static final String KEY_REPLICA_SET = SourceInfo.REPLICA_SET_NAME;
public static final String KEY_SERVER_ID = "server_id";
public static final String VALUE_INCREMENT = SourceInfo.ORDER;
public static final String VALUE_RESUME_TOKEN = "resume_token";
public static final String VALUE_SECONDS = SourceInfo.TIMESTAMP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class MongoDbDebeziumPropertiesManager extends DebeziumPropertiesManager
static final String DOUBLE_QUOTES_PATTERN = "\"";
static final String MONGODB_AUTHSOURCE_KEY = "mongodb.authsource";
static final String MONGODB_CONNECTION_MODE_KEY = "mongodb.connection.mode";
static final String MONGODB_CONNECTION_MODE_VALUE = "replica_set";
static final String MONGODB_CONNECTION_MODE_VALUE = "sharded";
static final String MONGODB_CONNECTION_STRING_KEY = "mongodb.connection.string";
static final String MONGODB_PASSWORD_KEY = "mongodb.password";
static final String MONGODB_SSL_ENABLED_KEY = "mongodb.ssl.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.source.mongodb.cdc;

import static io.airbyte.integrations.source.mongodb.cdc.MongoDbDebeziumConstants.OffsetState.KEY_SERVER_ID;

import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.MongoChangeStreamException;
import com.mongodb.MongoCommandException;
Expand All @@ -17,14 +19,12 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.debezium.config.Configuration;
import io.debezium.connector.common.OffsetReader;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.MongoUtil;
import io.debezium.connector.mongodb.ReplicaSetDiscovery;
import io.debezium.connector.mongodb.ReplicaSets;
import io.debezium.connector.mongodb.MongoDbPartition;
import io.debezium.connector.mongodb.ResumeTokens;
import java.util.Collection;
import io.debezium.pipeline.spi.Partition;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
Expand All @@ -33,6 +33,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.bson.BsonDocument;
Expand All @@ -53,16 +54,14 @@ public class MongoDbDebeziumStateUtil implements DebeziumStateUtil {
* Constructs the initial Debezium offset state that will be used by the incremental CDC snapshot
* after an initial snapshot sync.
*
* @param mongoClient The {@link MongoClient} used to query the MongoDB server.
* @param serverId The ID of the target server.
* @return The initial Debezium offset state storage document as a {@link JsonNode}.
* @throws IllegalStateException if unable to determine the replica set.
*/
public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, final MongoClient mongoClient, final String serverId) {
final String replicaSet = getReplicaSetName(mongoClient);
public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, final String serverId) {
LOGGER.info("Initial resume token '{}' constructed, corresponding to timestamp (seconds after epoch) {}",
ResumeTokens.getData(resumeToken).asString().getValue(), ResumeTokens.getTimestamp(resumeToken).getTime());
final JsonNode state = formatState(serverId, replicaSet, ((BsonString) ResumeTokens.getData(resumeToken)).getValue());
final JsonNode state = formatState(serverId, ((BsonString) ResumeTokens.getData(resumeToken)).getValue());
LOGGER.info("Initial Debezium state constructed: {}", state);
return state;
}
Expand All @@ -71,36 +70,22 @@ public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, fi
* Formats the Debezium initial state into a format suitable for storage in the offset data file.
*
* @param serverId The ID target MongoDB database.
* @param replicaSet The name of the target MongoDB replica set.
* @param resumeTokenData The MongoDB resume token that represents the offset state.
* @return The offset state as a {@link JsonNode}.
*/
public static JsonNode formatState(final String serverId, final String replicaSet, final String resumeTokenData) {
public static JsonNode formatState(final String serverId, final String resumeTokenData) {
final BsonTimestamp timestamp = ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeTokenData));

final List<Object> key = generateOffsetKey(serverId, replicaSet);
final List<Object> key = generateOffsetKey(serverId);

final Map<String, Object> value = new LinkedHashMap<>();
value.put(MongoDbDebeziumConstants.OffsetState.VALUE_SECONDS, timestamp.getTime());
value.put(MongoDbDebeziumConstants.OffsetState.VALUE_INCREMENT, timestamp.getInc());
value.put(MongoDbDebeziumConstants.OffsetState.VALUE_TRANSACTION_ID, null);
value.put(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN, resumeTokenData);

return Jsons.jsonNode(Map.of(Jsons.serialize(key), Jsons.serialize(value)));
}

/**
* Retrieves the replica set name for the current connection.
*
* @param mongoClient The {@link MongoClient} used to retrieve the replica set name.
* @return The replica set name.
* @throws IllegalStateException if unable to determine the replica set.
*/
public static String getReplicaSetName(final MongoClient mongoClient) {
final Optional<String> replicaSetName = MongoUtil.replicaSetName(mongoClient.getClusterDescription());
return replicaSetName.orElseThrow(() -> new IllegalStateException("Unable to determine replica set."));
}

/**
* Test whether the retrieved saved offset resume token value is valid. A valid resume token is one
* that can be used to resume a change event stream in MongoDB.
Expand Down Expand Up @@ -158,13 +143,13 @@ public boolean isValidResumeToken(final BsonDocument savedOffset,
public Optional<BsonDocument> savedOffset(final Properties baseProperties,
final ConfiguredAirbyteCatalog catalog,
final JsonNode cdcState,
final JsonNode config,
final MongoClient mongoClient) {
final JsonNode config) {
LOGGER.debug("Initializing file offset backing store with state '{}'...", cdcState);
final var offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcState, Optional.empty());
final DebeziumPropertiesManager debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(baseProperties, config, catalog);
final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
return parseSavedOffset(debeziumProperties, mongoClient);
LOGGER.info("properties: " + debeziumProperties);
return parseSavedOffset(debeziumProperties);
}

/**
Expand All @@ -175,7 +160,7 @@ public Optional<BsonDocument> savedOffset(final Properties baseProperties,
* state
* @return Returns the resume token that Airbyte has acknowledged in the source database server.
*/
private Optional<BsonDocument> parseSavedOffset(final Properties properties, final MongoClient mongoClient) {
private Optional<BsonDocument> parseSavedOffset(final Properties properties) {
FileOffsetBackingStore fileOffsetBackingStore = null;
OffsetStorageReaderImpl offsetStorageReader = null;

Expand All @@ -184,31 +169,33 @@ private Optional<BsonDocument> parseSavedOffset(final Properties properties, fin
offsetStorageReader = getOffsetStorageReader(fileOffsetBackingStore, properties);

final Configuration config = Configuration.from(properties);
final MongoDbTaskContext taskContext = new MongoDbTaskContext(config);
final MongoDbConnectorConfig mongoDbConnectorConfig = new MongoDbConnectorConfig(config);
final ReplicaSets replicaSets = new ReplicaSetDiscovery(taskContext).getReplicaSets(mongoClient);

LOGGER.debug("Parsing saved offset state for replica set '{}' and server ID '{}'...", replicaSets.all().get(0), properties.getProperty("name"));

final MongoDbOffsetContext.Loader loader = new MongoDbCustomLoader(mongoDbConnectorConfig, replicaSets);
final Collection<Map<String, String>> partitions = loader.getPartitions();
final Map<Map<String, String>, Map<String, Object>> offsets = offsetStorageReader.offsets(partitions);

if (offsets != null && offsets.values().stream().anyMatch(Objects::nonNull)) {
final MongoDbOffsetContext offsetContext = loader.loadOffsets(offsets);
final Map<String, ?> offset = offsetContext.getReplicaSetOffsetContext(replicaSets.all().get(0)).getOffset();
final Object resumeTokenData = offset.get(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN);
if (resumeTokenData != null) {
final BsonDocument resumeToken = ResumeTokens.fromData(resumeTokenData.toString());
return Optional.of(resumeToken);
} else {
LOGGER.warn("Offset data does not contain a resume token: {}", offset);
return Optional.empty();
}

final MongoDbOffsetContext.Loader loader = new MongoDbOffsetContext.Loader(mongoDbConnectorConfig);

final Partition mongoDbPartition = new MongoDbPartition(properties.getProperty(CONNECTOR_NAME_PROPERTY));

final Set<Partition> partitions =
Collections.singleton(mongoDbPartition);
final OffsetReader<Partition, MongoDbOffsetContext, MongoDbOffsetContext.Loader> offsetReader = new OffsetReader<>(offsetStorageReader, loader);
final Map<Partition, MongoDbOffsetContext> offsets = offsetReader.offsets(partitions);

if (offsets == null || offsets.values().stream().noneMatch(Objects::nonNull)) {
return Optional.empty();
}

final MongoDbOffsetContext context = offsets.get(mongoDbPartition);
final var offset = context.getOffset();

final Object resumeTokenData = offset.get(MongoDbDebeziumConstants.OffsetState.VALUE_RESUME_TOKEN);

if (resumeTokenData != null) {
final BsonDocument resumeToken = ResumeTokens.fromData(resumeTokenData.toString());
return Optional.of(resumeToken);
} else {
LOGGER.warn("Loaded offset data is null or empty: {}", offsets);
return Optional.empty();
}

} finally {
LOGGER.info("Closing offsetStorageReader and fileOffsetBackingStore");
if (offsetStorageReader != null) {
Expand All @@ -221,7 +208,7 @@ private Optional<BsonDocument> parseSavedOffset(final Properties properties, fin
}
}

private static List<Object> generateOffsetKey(final String serverId, final String replicaSet) {
private static List<Object> generateOffsetKey(final String serverId) {
/*
* N.B. The order of the keys in the sourceInfoMap and key list matters! DO NOT CHANGE the order
* unless you have verified that Debezium has changed its order of the key it builds when retrieving
Expand All @@ -230,8 +217,7 @@ private static List<Object> generateOffsetKey(final String serverId, final Strin
*/
final Map<String, String> sourceInfoMap = new LinkedHashMap<>();
final String normalizedServerId = MongoDbDebeziumPropertiesManager.normalizeName(serverId);
sourceInfoMap.put(MongoDbDebeziumConstants.OffsetState.KEY_REPLICA_SET, replicaSet);
sourceInfoMap.put(MongoDbDebeziumConstants.OffsetState.KEY_SERVER_ID, normalizedServerId);
sourceInfoMap.put(KEY_SERVER_ID, normalizedServerId);

final List<Object> key = new LinkedList<>();
key.add(normalizedServerId);
Expand Down
Loading

0 comments on commit 3c9a612

Please sign in to comment.