Skip to content

Commit

Permalink
[source-mongodb-v2] : Fail sync if initial snapshot for any stream fa…
Browse files Browse the repository at this point in the history
…ils (#34759)
  • Loading branch information
akashkulk committed Feb 5, 2024
1 parent 247bc17 commit c7c51ea
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.6
dockerImageTag: 1.2.7
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,15 @@ public class MongoDbStateIterator implements Iterator<AirbyteMessage> {
private boolean finalStateNext = false;

/**
* Tracks if the underlying iterator threw an exception. This helps to determine the final state
* status emitted from the final next call.
* Tracks if the underlying iterator threw an exception, indicating that the snapshot for this
* stream failed. This helps to determine the final state status emitted from the final next call.
*/
private boolean iterThrewException = false;
private boolean initialSnapshotFailed = false;

/**
* Tracks the exception thrown if there initial snapshot has failed.
*/
private Exception initialSnapshotException;

/**
* Constructor.
Expand Down Expand Up @@ -111,14 +116,24 @@ public MongoDbStateIterator(final MongoCursor<Document> iter,
@Override
public boolean hasNext() {
LOGGER.debug("Checking hasNext() for stream {}...", getStream());
if (initialSnapshotFailed) {
// If the initial snapshot is incomplete for this stream, throw an exception failing the sync. This
// will ensure the platform retry logic
// kicks in and keeps retrying the sync until the initial snapshot is complete.
throw new RuntimeException(initialSnapshotException);
}
try {
if (iter.hasNext()) {
return true;
}
} catch (final MongoException e) {
// If hasNext throws an exception, log it and then treat it as if hasNext returned false.
iterThrewException = true;
// If hasNext throws an exception, log it and set the flag to indicate that the initial snapshot
// failed. This indicates to the main iterator
// to emit state associated with what has been processed so far.
initialSnapshotFailed = true;
initialSnapshotException = e;
LOGGER.info("hasNext threw an exception for stream {}: {}", getStream(), e.getMessage(), e);
return true;
}

// no more records in cursor + no record messages have been emitted => collection is empty
Expand All @@ -145,9 +160,9 @@ public AirbyteMessage next() {
// Should a state message be emitted based on then last time a state message was emitted?
final var emitStateDueToDuration = count > 0 && Duration.between(lastCheckpoint, Instant.now()).compareTo(checkpointDuration) > 0;

if (finalStateNext) {
if (finalStateNext || initialSnapshotFailed) {
LOGGER.debug("Emitting final state status for stream {}:{}...", stream.getStream().getNamespace(), stream.getStream().getName());
final var finalStateStatus = iterThrewException ? InitialSnapshotStatus.IN_PROGRESS : InitialSnapshotStatus.COMPLETE;
final var finalStateStatus = initialSnapshotFailed ? InitialSnapshotStatus.IN_PROGRESS : InitialSnapshotStatus.COMPLETE;
final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName())
.orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName()));
final var state = new MongoDbStreamState(lastId.toString(), finalStateStatus, idType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void treatHasNextExceptionAsFalse() {
message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("status").asText(),
"state status should be in_progress");

assertFalse(iter.hasNext(), "should have no more records");
assertThrows(RuntimeException.class, iter::hasNext, "next iteration should throw exception to fail the sync");
}

@Test
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. |
| 1.2.6 | 2024-01-31 | [34594](https://github.com/airbytehq/airbyte/pull/34594) | Scope initial resume token to streams of interest. |
| 1.2.5 | 2024-01-29 | [34641](https://github.com/airbytehq/airbyte/pull/34641) | Allow resuming an initial snapshot when Id type is not of default ObjectId . |
| 1.2.4 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. |
Expand Down

0 comments on commit c7c51ea

Please sign in to comment.