Skip to content

Commit

Permalink
[Source-mongo] : Implement fixed chunk size (1million) (#37559)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk committed Apr 25, 2024
1 parent 135e623 commit 993aece
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.3.7
dockerImageTag: 1.3.8
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class InitialSnapshotHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotHandler.class);
private static final int DEFAULT_CHUNK_SIZE = 1_000_000;

/**
* For each given stream configured as incremental sync it will output an iterator that will
Expand Down Expand Up @@ -67,7 +68,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
final Optional<MongoDbStreamState> existingState =
stateManager.getStreamState(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());

final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema);
final var recordIterator = new MongoDbInitialLoadRecordIterator(collection, fields, existingState, isEnforceSchema, DEFAULT_CHUNK_SIZE);
final var stateIterator =
new SourceStateIterator<>(recordIterator, airbyteStream, stateManager, new StateEmitFrequency(checkpointInterval,
MongoConstants.CHECKPOINT_DURATION));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class MongoDbInitialLoadRecordIterator extends AbstractIterator<Document>
private final boolean isEnforceSchema;
private final MongoCollection<Document> collection;
private final Bson fields;
// Represents the number of rows to get with each query.
private final int chunkSize;

private Optional<MongoDbStreamState> currentState;
private MongoCursor<Document> currentIterator;
Expand All @@ -48,19 +50,22 @@ public class MongoDbInitialLoadRecordIterator extends AbstractIterator<Document>
MongoDbInitialLoadRecordIterator(final MongoCollection<Document> collection,
final Bson fields,
final Optional<MongoDbStreamState> existingState,
final boolean isEnforceSchema) {
final boolean isEnforceSchema,
final int chunkSize) {
this.collection = collection;
this.fields = fields;
this.currentState = existingState;
this.isEnforceSchema = isEnforceSchema;
this.chunkSize = chunkSize;
this.currentIterator = buildNewQueryIterator();
}

@Override
protected Document computeNext() {
if (shouldBuildNextQuery()) {
try {
LOGGER.info("Finishing subquery number : {}", numSubqueries);
LOGGER.info("Finishing subquery number : {}, processing at id : {}", numSubqueries,
currentState.get() == null ? "starting null" : currentState.get().id());
currentIterator.close();
currentIterator = buildNewQueryIterator();
numSubqueries++;
Expand Down Expand Up @@ -98,11 +103,13 @@ private MongoCursor<Document> buildNewQueryIterator() {
return isEnforceSchema ? collection.find()
.filter(filter)
.projection(fields)
.limit(chunkSize)
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
.allowDiskUse(true)
.cursor()
: collection.find()
.filter(filter)
.limit(chunkSize)
.sort(Sorts.ascending(MongoConstants.ID_FIELD))
.allowDiskUse(true)
.cursor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -153,6 +154,7 @@ void setUp() {
when(mongoCollection.find()).thenReturn(findIterable);
when(findIterable.filter(any())).thenReturn(findIterable);
when(findIterable.projection(any())).thenReturn(findIterable);
when(findIterable.limit(anyInt())).thenReturn(findIterable);
when(findIterable.sort(any())).thenReturn(findIterable);
when(findIterable.cursor()).thenReturn(findCursor);
when(findCursor.hasNext()).thenReturn(true);
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.3.8 | 2024-04-24 | [37559](https://github.com/airbytehq/airbyte/pull/37559) | Implement fixed-size chunking while performing initial load. |
| 1.3.7 | 2024-04-24 | [37557](https://github.com/airbytehq/airbyte/pull/37557) | Change bug in resume token validity check. |
| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Internal refactor. |
| 1.3.5 | 2024-04-22 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Do not send estimate trace if we do not have data. |
| 1.3.6 | 2024-04-24 | [37525](https://github.com/airbytehq/airbyte/pull/37525) | Internal refactor. |
| 1.3.5 | 2024-04-22 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Do not send estimate trace if we do not have data. |
| 1.3.4 | 2024-04-16 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Populate null values in airbyte record messages. |
| 1.3.3 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. |
| 1.3.2 | 2024-04-04 | [36845](https://github.com/airbytehq/airbyte/pull/36845) | Adopt Kotlin CDK. |
Expand Down

0 comments on commit 993aece

Please sign in to comment.