diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 4f73156e25222..7937002b71778 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -8,7 +8,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.3.7 + dockerImageTag: 1.3.8 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java index a0f924cc843ca..edab8408daf88 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java @@ -31,6 +31,7 @@ public class InitialSnapshotHandler { private static final Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotHandler.class); + private static final int DEFAULT_CHUNK_SIZE = 1_000_000; /** * For each given stream configured as incremental sync it will output an iterator that will @@ -67,7 +68,7 @@ public List> getIterators( final Optional existingState = stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); - final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema); + final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema, DEFAULT_CHUNK_SIZE); final var stateIterator = new SourceStateIterator<>(recordIterator, airbyteStream, stateManager, new StateEmitFrequency(checkpointInterval, MongoConstants.CHECKPOINT_DURATION)); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbInitialLoadRecordIterator.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbInitialLoadRecordIterator.java index 298e0d1d28baf..254fa38148eab 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbInitialLoadRecordIterator.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbInitialLoadRecordIterator.java @@ -39,6 +39,8 @@ public class MongoDbInitialLoadRecordIterator extends AbstractIterator private final boolean isEnforceSchema; private final MongoCollection collection; private final Bson fields; + // Represents the number of rows to get with each query. + private final int chunkSize; private Optional currentState; private MongoCursor currentIterator; @@ -48,11 +50,13 @@ public class MongoDbInitialLoadRecordIterator extends AbstractIterator MongoDbInitialLoadRecordIterator(final MongoCollection collection, final Bson fields, final Optional existingState, - final boolean isEnforceSchema) { + final boolean isEnforceSchema, + final int chunkSize) { this.collection = collection; this.fields = fields; this.currentState = existingState; this.isEnforceSchema = isEnforceSchema; + this.chunkSize = chunkSize; this.currentIterator = buildNewQueryIterator(); } @@ -60,7 +64,8 @@ public class MongoDbInitialLoadRecordIterator extends AbstractIterator protected Document computeNext() { if (shouldBuildNextQuery()) { try { - LOGGER.info("Finishing subquery number : {}", numSubqueries); + LOGGER.info("Finishing subquery number : {}, processing at id : {}", numSubqueries, + currentState.get() == null ? "starting null" : currentState.get().id()); currentIterator.close(); currentIterator = buildNewQueryIterator(); numSubqueries++; @@ -98,11 +103,13 @@ private MongoCursor buildNewQueryIterator() { return isEnforceSchema ? collection.find() .filter(filter) .projection(fields) + .limit(chunkSize) .sort(Sorts.ascending(MongoConstants.ID_FIELD)) .allowDiskUse(true) .cursor() : collection.find() .filter(filter) + .limit(chunkSize) .sort(Sorts.ascending(MongoConstants.ID_FIELD)) .allowDiskUse(true) .cursor(); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java index 0417f119fe92c..558a35ec2aea1 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializerTest.java @@ -14,6 +14,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; @@ -153,6 +154,7 @@ void setUp() { when(mongoCollection.find()).thenReturn(findIterable); when(findIterable.filter(any())).thenReturn(findIterable); when(findIterable.projection(any())).thenReturn(findIterable); + when(findIterable.limit(anyInt())).thenReturn(findIterable); when(findIterable.sort(any())).thenReturn(findIterable); when(findIterable.cursor()).thenReturn(findCursor); when(findCursor.hasNext()).thenReturn(true); diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 608a21e340e7b..c34b15e801f8d 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -221,9 +221,10 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| +| 1.3.8 | 2024-04-24 | [37559](https://github.com/airbytehq/airbyte/pull/37559) | Implement fixed-size chunking while performing initial load. | | 1.3.7 | 2024-04-24 | [37557](https://github.com/airbytehq/airbyte/pull/37557) | Change bug in resume token validity check. | -| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Internal refactor. | -| 1.3.5 | 2024-04-22 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Do not send estimate trace if we do not have data. | +| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Internal refactor. | +| 1.3.5 | 2024-04-22 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Do not send estimate trace if we do not have data. | | 1.3.4 | 2024-04-16 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Populate null values in airbyte record messages. | | 1.3.3 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. | | 1.3.2 | 2024-04-04 | [36845](https://github.com/airbytehq/airbyte/pull/36845) | Adopt Kotlin CDK. |