Skip to content

Commit

Permalink
Support resuming initial snapshot when id type is String, Int, Long (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich committed Jan 30, 2024
1 parent 966a9c3 commit f9df3df
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.4
dockerImageTag: 1.2.5
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import java.util.List;
import java.util.Optional;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
Expand Down Expand Up @@ -86,8 +90,13 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
// "where _id > [last saved state] order by _id ASC".
// If no state exists, it will create a query akin to "where 1=1 order by _id ASC"
final Bson filter = existingState
// TODO add type support here when we add support for _id fields that are not ObjectId types
.map(state -> Filters.gt(MongoConstants.ID_FIELD, new ObjectId(state.id())))
.map(state -> Filters.gt(MongoConstants.ID_FIELD,
switch (state.idType()) {
case STRING -> new BsonString(state.id());
case OBJECT_ID -> new BsonObjectId(new ObjectId(state.id()));
case INT -> new BsonInt32(Integer.parseInt(state.id()));
case LONG -> new BsonInt64(Long.parseLong(state.id()));
}))
// if nothing was found, return a new BsonDocument
.orElseGet(BsonDocument::new);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ class InitialSnapshotHandlerTest {
private static final String COLLECTION3 = "collection3";

private static final String OBJECT_ID1_STRING = "64c0029d95ad260d69ef28a1";
private static final String OBJECT_ID2_STRING = "64c0029d95ad260d69ef28a2";
private static final String OBJECT_ID3_STRING = "64c0029d95ad260d69ef28a3";
private static final ObjectId OBJECT_ID1 = new ObjectId(OBJECT_ID1_STRING);
private static final ObjectId OBJECT_ID2 = new ObjectId("64c0029d95ad260d69ef28a2");
private static final ObjectId OBJECT_ID3 = new ObjectId("64c0029d95ad260d69ef28a3");
private static final ObjectId OBJECT_ID2 = new ObjectId(OBJECT_ID2_STRING);
private static final ObjectId OBJECT_ID3 = new ObjectId(OBJECT_ID3_STRING);
private static final ObjectId OBJECT_ID4 = new ObjectId("64c0029d95ad260d69ef28a4");
private static final ObjectId OBJECT_ID5 = new ObjectId("64c0029d95ad260d69ef28a5");
private static final ObjectId OBJECT_ID6 = new ObjectId("64c0029d95ad260d69ef28a6");
Expand Down Expand Up @@ -332,4 +334,58 @@ void testGetIteratorsWithOneEmptyCollection() {
assertFalse(collection2.hasNext());
}

@Test
void testGetIteratorsWithInitialStateNonDefaultIdType() {
insertDocuments(COLLECTION1, List.of(
new Document(Map.of(
CURSOR_FIELD, OBJECT_ID1_STRING,
NAME_FIELD, NAME1)),
new Document(Map.of(
CURSOR_FIELD, OBJECT_ID2_STRING,
NAME_FIELD, NAME2))));

insertDocuments(COLLECTION2, List.of(
new Document(Map.of(
CURSOR_FIELD, OBJECT_ID3_STRING,
NAME_FIELD, NAME3))));

final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler();
final MongoDbStateManager stateManager = mock(MongoDbStateManager.class);
when(stateManager.getStreamState(COLLECTION1, NAMESPACE))
.thenReturn(Optional.of(new MongoDbStreamState(OBJECT_ID1_STRING, null, IdType.STRING)));
final List<AutoCloseableIterator<AirbyteMessage>> iterators =
initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME), null, Instant.now(),
MongoConstants.CHECKPOINT_INTERVAL, true);

assertEquals(iterators.size(), 2, "Only two streams are configured as incremental, full refresh streams should be ignored");

final AutoCloseableIterator<AirbyteMessage> collection1 = iterators.get(0);
final AutoCloseableIterator<AirbyteMessage> collection2 = iterators.get(1);

// collection1, first document should be skipped
final AirbyteMessage collection1StreamMessage1 = collection1.next();
assertEquals(Type.RECORD, collection1StreamMessage1.getType());
assertEquals(COLLECTION1, collection1StreamMessage1.getRecord().getStream());
assertEquals(OBJECT_ID2.toString(), collection1StreamMessage1.getRecord().getData().get(CURSOR_FIELD).asText());
assertEquals(NAME2, collection1StreamMessage1.getRecord().getData().get(NAME_FIELD).asText());
assertConfiguredFieldsEqualsRecordDataFields(Set.of(CURSOR_FIELD, NAME_FIELD), collection1StreamMessage1.getRecord().getData());

final AirbyteMessage collection1SateMessage = collection1.next();
assertEquals(Type.STATE, collection1SateMessage.getType(), "State message is expected after all records in a stream are emitted");

assertFalse(collection1.hasNext());

// collection2, no documents should be skipped
final AirbyteMessage collection2StreamMessage1 = collection2.next();
assertEquals(Type.RECORD, collection2StreamMessage1.getType());
assertEquals(COLLECTION2, collection2StreamMessage1.getRecord().getStream());
assertEquals(OBJECT_ID3.toString(), collection2StreamMessage1.getRecord().getData().get(CURSOR_FIELD).asText());
assertConfiguredFieldsEqualsRecordDataFields(Set.of(CURSOR_FIELD), collection2StreamMessage1.getRecord().getData());

final AirbyteMessage collection2SateMessage = collection2.next();
assertEquals(Type.STATE, collection2SateMessage.getType(), "State message is expected after all records in a stream are emitted");

assertFalse(collection2.hasNext());
}

}
3 changes: 2 additions & 1 deletion docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.2.4 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. |
| 1.2.5 | 2024-01-29 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Allow resuming an initial snapshot when Id type is not of default ObjectId . |
| 1.2.4 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. |
| 1.2.3 | 2024-01-18 | [34364](https://github.com/airbytehq/airbyte/pull/34364) | Add additional logging for resume token + reduce discovery size to 10. |
| 1.2.2 | 2024-01-16 | [34314](https://github.com/airbytehq/airbyte/pull/34314) | Reduce minimum document discovery size to 100. |
| 1.2.1 | 2023-12-18 | [33549](https://github.com/airbytehq/airbyte/pull/33549) | Add logging to understand op log size. |
Expand Down

0 comments on commit f9df3df

Please sign in to comment.