Skip to content

Commit

Permalink
[Source-mongodb-v2] : Adopt new CDK (#39530)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk committed Jun 17, 2024
1 parent 3dfeb31 commit 94234e5
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.37.2'
cdkVersionRequired = '0.38.1'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.4.0
dockerImageTag: 1.4.1
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, queueSize, false);
final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager);
final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed);
final var cdcStreamList = incrementalOnlyStreamsCatalog.getStreams().stream()
.filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL)
.map(s -> s.getStream().getNamespace() + "\\." + s.getStream().getName())
.toList();
final var propertiesManager =
new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), incrementalOnlyStreamsCatalog);
new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), incrementalOnlyStreamsCatalog, cdcStreamList);
final var eventConverter =
new MongoDbDebeziumEventConverter(cdcMetadataInjector, incrementalOnlyStreamsCatalog, emittedAt, config.getDatabaseConfig());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ public class MongoDbDebeziumPropertiesManager extends DebeziumPropertiesManager

public MongoDbDebeziumPropertiesManager(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {
super(properties, config, catalog);
final ConfiguredAirbyteCatalog catalog,
final List<String> streamNames) {
super(properties, config, catalog, streamNames);
}

@Override
Expand Down Expand Up @@ -82,20 +83,21 @@ protected String getName(final JsonNode config) {
}

@Override
protected Properties getIncludeConfiguration(final ConfiguredAirbyteCatalog catalog, final JsonNode config) {
protected Properties getIncludeConfiguration(final ConfiguredAirbyteCatalog catalog, final JsonNode config, final List<String> cdcStreamNames) {
final Properties properties = new Properties();

// Database/collection selection
properties.setProperty(COLLECTION_INCLUDE_LIST_KEY, createCollectionIncludeString(catalog.getStreams()));
properties.setProperty(COLLECTION_INCLUDE_LIST_KEY, createCollectionIncludeString(catalog.getStreams(), cdcStreamNames));
properties.setProperty(DATABASE_INCLUDE_LIST_KEY, config.get(DATABASE_CONFIGURATION_KEY).asText());
properties.setProperty(CAPTURE_TARGET_KEY, config.get(DATABASE_CONFIGURATION_KEY).asText());

return properties;
}

protected String createCollectionIncludeString(final List<ConfiguredAirbyteStream> streams) {
protected String createCollectionIncludeString(final List<ConfiguredAirbyteStream> streams, final List<String> cdcStreamNames) {
return streams.stream()
.map(s -> s.getStream().getNamespace() + "\\." + s.getStream().getName())
.filter(s -> cdcStreamNames.contains(s))
.collect(Collectors.joining(","));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public Optional<BsonDocument> savedOffset(final Properties baseProperties,
final JsonNode config) {
LOGGER.debug("Initializing file offset backing store with state '{}'...", cdcState);
final var offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcState, Optional.empty());
final DebeziumPropertiesManager debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(baseProperties, config, catalog);
final DebeziumPropertiesManager debeziumPropertiesManager =
new MongoDbDebeziumPropertiesManager(baseProperties, config, catalog, Collections.emptyList());
final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
LOGGER.info("properties: " + debeziumProperties);
return parseSavedOffset(debeziumProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -66,8 +67,8 @@ void testDebeziumProperties() {

final Properties cdcProperties = new Properties();
cdcProperties.put("test", "value");

final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog);
final var cdcStreamList = createCdcStreamList(catalog);
final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList);

final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
assertEquals(21 + cdcProperties.size(), debeziumProperties.size());
Expand All @@ -79,7 +80,8 @@ void testDebeziumProperties() {
assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY));
assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY));
assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList),
debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY));
}

Expand All @@ -96,7 +98,8 @@ void testDebeziumProperties_captureMode_lookup() {
final Properties cdcProperties = new Properties();
cdcProperties.put("test", "value");

final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog);
final var cdcStreamList = createCdcStreamList(catalog);
final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList);

final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
assertEquals(21 + cdcProperties.size(), debeziumProperties.size());
Expand All @@ -108,7 +111,8 @@ void testDebeziumProperties_captureMode_lookup() {
assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY));
assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY));
assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList),
debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY));
}

Expand All @@ -125,7 +129,8 @@ void testDebeziumProperties_captureMode_postImage() {
final Properties cdcProperties = new Properties();
cdcProperties.put("test", "value");

final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog);
final var cdcStreamList = createCdcStreamList(catalog);
final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList);

final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
assertEquals(22 + cdcProperties.size(), debeziumProperties.size());
Expand All @@ -137,7 +142,8 @@ void testDebeziumProperties_captureMode_postImage() {
assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY));
assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY));
assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList),
debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY));
assertEquals(MONGODB_POST_IMAGE_VALUE, debeziumProperties.get(MONGODB_POST_IMAGE_KEY));
}
Expand All @@ -156,7 +162,8 @@ void testDebeziumPropertiesConnectionStringCredentialsPlaceholder() {
final Properties cdcProperties = new Properties();
cdcProperties.put("test", "value");

final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog);
final var cdcStreamList = createCdcStreamList(catalog);
final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList);

final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
assertEquals(21 + cdcProperties.size(), debeziumProperties.size());
Expand All @@ -168,7 +175,8 @@ void testDebeziumPropertiesConnectionStringCredentialsPlaceholder() {
assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY));
assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY));
assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList),
debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY));
}

Expand All @@ -185,7 +193,8 @@ void testDebeziumPropertiesQuotedConnectionString() {
final Properties cdcProperties = new Properties();
cdcProperties.put("test", "value");

final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog);
final var cdcStreamList = createCdcStreamList(catalog);
final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList);

final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
assertEquals(21 + cdcProperties.size(), debeziumProperties.size());
Expand All @@ -197,7 +206,8 @@ void testDebeziumPropertiesQuotedConnectionString() {
assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY));
assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY));
assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList),
debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY));
}

Expand All @@ -214,7 +224,8 @@ void testDebeziumPropertiesNoCredentials() {
final Properties cdcProperties = new Properties();
cdcProperties.put("test", "value");

final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog);
final var cdcStreamList = createCdcStreamList(catalog);
final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList);

final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
assertEquals(18 + cdcProperties.size(), debeziumProperties.size());
Expand All @@ -226,7 +237,8 @@ void testDebeziumPropertiesNoCredentials() {
assertFalse(debeziumProperties.containsKey(MONGODB_PASSWORD_KEY));
assertFalse(debeziumProperties.containsKey(MONGODB_AUTHSOURCE_KEY));
assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList),
debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY));
assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY));
}

Expand Down Expand Up @@ -292,4 +304,11 @@ private List<ConfiguredAirbyteStream> createStreams(final int numberOfStreams) {
return streams;
}

private List<String> createCdcStreamList(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream()
.filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL)
.map(s -> s.getStream().getNamespace() + "\\." + s.getStream().getName())
.toList();
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :------------------------------------------------------- |:----------------------------------------------------------------------------------------------------------|
| 1.4.1 | 2024-06-11 | [39530](https://github.com/airbytehq/airbyte/pull/39530) | Adopt new CDK. |
| 1.4.0 | 2024-06-11 | [38238](https://github.com/airbytehq/airbyte/pull/38238) | Update mongodbv2 to use dbz 2.6.2 |
| 1.3.15 | 2024-05-30 | [38781](https://github.com/airbytehq/airbyte/pull/38781) | Sync sending trace status messages indicating progress. |
| 1.3.14 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. |
Expand Down

0 comments on commit 94234e5

Please sign in to comment.