From 94234e57c62a86ffec0d696c58cc090f3c4fb0bb Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Mon, 17 Jun 2024 13:57:57 -0700 Subject: [PATCH] [Source-mongodb-v2] : Adopt new CDK (#39530) --- .../connectors/source-mongodb-v2/build.gradle | 2 +- .../source-mongodb-v2/metadata.yaml | 2 +- .../mongodb/cdc/MongoDbCdcInitializer.java | 6 ++- .../cdc/MongoDbDebeziumPropertiesManager.java | 12 ++--- .../mongodb/cdc/MongoDbDebeziumStateUtil.java | 3 +- .../MongoDbDebeziumPropertiesManagerTest.java | 45 +++++++++++++------ docs/integrations/sources/mongodb-v2.md | 1 + 7 files changed, 49 insertions(+), 22 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 9daad40b20d133..53b41209594aa1 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.37.2' + cdkVersionRequired = '0.38.1' features = ['db-sources', 'datastore-mongo'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index c737206f9f9ed7..412e0255bc63d3 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -8,7 +8,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.4.0 + dockerImageTag: 1.4.1 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 1032ab73355030..187036395aa044 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -153,8 +153,12 @@ public List> createCdcIterators( new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, queueSize, false); final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager); final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed); + final var cdcStreamList = incrementalOnlyStreamsCatalog.getStreams().stream() + .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) + .map(s -> s.getStream().getNamespace() + "\\." + s.getStream().getName()) + .toList(); final var propertiesManager = - new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), incrementalOnlyStreamsCatalog); + new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), incrementalOnlyStreamsCatalog, cdcStreamList); final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, incrementalOnlyStreamsCatalog, emittedAt, config.getDatabaseConfig()); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManager.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManager.java index 5d07a928646b0e..f789717e347007 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManager.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManager.java @@ -49,8 +49,9 @@ public class MongoDbDebeziumPropertiesManager extends DebeziumPropertiesManager public MongoDbDebeziumPropertiesManager(final Properties properties, final JsonNode config, - final ConfiguredAirbyteCatalog catalog) { - super(properties, config, catalog); + final ConfiguredAirbyteCatalog catalog, + final List streamNames) { + super(properties, config, catalog, streamNames); } @Override @@ -82,20 +83,21 @@ protected String getName(final JsonNode config) { } @Override - protected Properties getIncludeConfiguration(final ConfiguredAirbyteCatalog catalog, final JsonNode config) { + protected Properties getIncludeConfiguration(final ConfiguredAirbyteCatalog catalog, final JsonNode config, final List cdcStreamNames) { final Properties properties = new Properties(); // Database/collection selection - properties.setProperty(COLLECTION_INCLUDE_LIST_KEY, createCollectionIncludeString(catalog.getStreams())); + properties.setProperty(COLLECTION_INCLUDE_LIST_KEY, createCollectionIncludeString(catalog.getStreams(), cdcStreamNames)); properties.setProperty(DATABASE_INCLUDE_LIST_KEY, config.get(DATABASE_CONFIGURATION_KEY).asText()); properties.setProperty(CAPTURE_TARGET_KEY, config.get(DATABASE_CONFIGURATION_KEY).asText()); return properties; } - protected String createCollectionIncludeString(final List streams) { + protected String createCollectionIncludeString(final List streams, final List cdcStreamNames) { return streams.stream() .map(s -> s.getStream().getNamespace() + "\\." + s.getStream().getName()) + .filter(s -> cdcStreamNames.contains(s)) .collect(Collectors.joining(",")); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtil.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtil.java index adb895bda02e94..d6405476e2b87c 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtil.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumStateUtil.java @@ -146,7 +146,8 @@ public Optional savedOffset(final Properties baseProperties, final JsonNode config) { LOGGER.debug("Initializing file offset backing store with state '{}'...", cdcState); final var offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcState, Optional.empty()); - final DebeziumPropertiesManager debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(baseProperties, config, catalog); + final DebeziumPropertiesManager debeziumPropertiesManager = + new MongoDbDebeziumPropertiesManager(baseProperties, config, catalog, Collections.emptyList()); final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager); LOGGER.info("properties: " + debeziumProperties); return parseSavedOffset(debeziumProperties); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManagerTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManagerTest.java index 21b6908f796303..beb2fcd856c055 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManagerTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbDebeziumPropertiesManagerTest.java @@ -40,6 +40,7 @@ import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.SyncMode; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; @@ -66,8 +67,8 @@ void testDebeziumProperties() { final Properties cdcProperties = new Properties(); cdcProperties.put("test", "value"); - - final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog); + final var cdcStreamList = createCdcStreamList(catalog); + final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList); final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager); assertEquals(21 + cdcProperties.size(), debeziumProperties.size()); @@ -79,7 +80,8 @@ void testDebeziumProperties() { assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY)); assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY)); assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY)); - assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); + assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList), + debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY)); } @@ -96,7 +98,8 @@ void testDebeziumProperties_captureMode_lookup() { final Properties cdcProperties = new Properties(); cdcProperties.put("test", "value"); - final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog); + final var cdcStreamList = createCdcStreamList(catalog); + final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList); final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager); assertEquals(21 + cdcProperties.size(), debeziumProperties.size()); @@ -108,7 +111,8 @@ void testDebeziumProperties_captureMode_lookup() { assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY)); assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY)); assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY)); - assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); + assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList), + debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY)); } @@ -125,7 +129,8 @@ void testDebeziumProperties_captureMode_postImage() { final Properties cdcProperties = new Properties(); cdcProperties.put("test", "value"); - final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog); + final var cdcStreamList = createCdcStreamList(catalog); + final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList); final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager); assertEquals(22 + cdcProperties.size(), debeziumProperties.size()); @@ -137,7 +142,8 @@ void testDebeziumProperties_captureMode_postImage() { assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY)); assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY)); assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY)); - assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); + assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList), + debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY)); assertEquals(MONGODB_POST_IMAGE_VALUE, debeziumProperties.get(MONGODB_POST_IMAGE_KEY)); } @@ -156,7 +162,8 @@ void testDebeziumPropertiesConnectionStringCredentialsPlaceholder() { final Properties cdcProperties = new Properties(); cdcProperties.put("test", "value"); - final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog); + final var cdcStreamList = createCdcStreamList(catalog); + final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList); final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager); assertEquals(21 + cdcProperties.size(), debeziumProperties.size()); @@ -168,7 +175,8 @@ void testDebeziumPropertiesConnectionStringCredentialsPlaceholder() { assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY)); assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY)); assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY)); - assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); + assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList), + debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY)); } @@ -185,7 +193,8 @@ void testDebeziumPropertiesQuotedConnectionString() { final Properties cdcProperties = new Properties(); cdcProperties.put("test", "value"); - final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog); + final var cdcStreamList = createCdcStreamList(catalog); + final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList); final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager); assertEquals(21 + cdcProperties.size(), debeziumProperties.size()); @@ -197,7 +206,8 @@ void testDebeziumPropertiesQuotedConnectionString() { assertEquals(config.get(PASSWORD_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_PASSWORD_KEY)); assertEquals(config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(), debeziumProperties.get(MONGODB_AUTHSOURCE_KEY)); assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY)); - assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); + assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList), + debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY)); } @@ -214,7 +224,8 @@ void testDebeziumPropertiesNoCredentials() { final Properties cdcProperties = new Properties(); cdcProperties.put("test", "value"); - final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog); + final var cdcStreamList = createCdcStreamList(catalog); + final var debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(cdcProperties, config, catalog, cdcStreamList); final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager); assertEquals(18 + cdcProperties.size(), debeziumProperties.size()); @@ -226,7 +237,8 @@ void testDebeziumPropertiesNoCredentials() { assertFalse(debeziumProperties.containsKey(MONGODB_PASSWORD_KEY)); assertFalse(debeziumProperties.containsKey(MONGODB_AUTHSOURCE_KEY)); assertEquals(MONGODB_SSL_ENABLED_VALUE, debeziumProperties.get(MONGODB_SSL_ENABLED_KEY)); - assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams), debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); + assertEquals(debeziumPropertiesManager.createCollectionIncludeString(streams, cdcStreamList), + debeziumProperties.get(COLLECTION_INCLUDE_LIST_KEY)); assertEquals(DATABASE_NAME, debeziumProperties.get(DATABASE_INCLUDE_LIST_KEY)); } @@ -292,4 +304,11 @@ private List createStreams(final int numberOfStreams) { return streams; } + private List createCdcStreamList(final ConfiguredAirbyteCatalog catalog) { + return catalog.getStreams().stream() + .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) + .map(s -> s.getStream().getNamespace() + "\\." + s.getStream().getName()) + .toList(); + } + } diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index abc8fb5b9b0d82..c365ca32b1b35f 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -199,6 +199,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------| :------------------------------------------------------- |:----------------------------------------------------------------------------------------------------------| +| 1.4.1 | 2024-06-11 | [39530](https://github.com/airbytehq/airbyte/pull/39530) | Adopt new CDK. | | 1.4.0 | 2024-06-11 | [38238](https://github.com/airbytehq/airbyte/pull/38238) | Update mongodbv2 to use dbz 2.6.2 | | 1.3.15 | 2024-05-30 | [38781](https://github.com/airbytehq/airbyte/pull/38781) | Sync sending trace status messages indicating progress. | | 1.3.14 | 2024-05-29 | [38584](https://github.com/airbytehq/airbyte/pull/38584) | Set is_resumable flag in discover. |