diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index f6a818c9219b2a..82c67a79fd31ae 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -167,6 +167,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| | 0.14.0 | 2023-12-21 | [\#33606](https://github.com/airbytehq/airbyte/pull/33606) | Add updated Initial and Incremental Stream State definitions for DB Sources | +| 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector | | 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator | | 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes | | 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method | diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java index 1bceb20940e65a..6e646d53e7c0e4 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java @@ -56,7 +56,8 @@ public class MongoDbDebeziumStateUtil implements DebeziumStateUtil { */ public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, final MongoClient mongoClient, final String serverId) { final String replicaSet = getReplicaSetName(mongoClient); - LOGGER.info("Initial resume token '{}' constructed", ResumeTokens.getData(resumeToken).asString().getValue()); + LOGGER.info("Initial resume token '{}' constructed, corresponding to timestamp (seconds after epoch) {}", + ResumeTokens.getData(resumeToken).asString().getValue(), ResumeTokens.getTimestamp(resumeToken).getTime()); final JsonNode state = formatState(serverId, replicaSet, ((BsonString) ResumeTokens.getData(resumeToken)).getValue()); LOGGER.info("Initial Debezium state constructed: {}", state); return state; @@ -113,12 +114,14 @@ public boolean isValidResumeToken(final BsonDocument savedOffset, final MongoCli final ChangeStreamIterable stream = mongoClient.watch(BsonDocument.class); stream.resumeAfter(savedOffset); try (final var ignored = stream.cursor()) { - LOGGER.info("Valid resume token '{}' present. Incremental sync will be performed for up-to-date streams.", - ResumeTokens.getData(savedOffset).asString().getValue()); + LOGGER.info("Valid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Incremental sync will be performed for " + + "up-to-date streams.", + ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime()); return true; } catch (final MongoCommandException | MongoChangeStreamException e) { - LOGGER.info("Invalid resume token '{}' present. Initial snapshot will be performed for all streams.", - ResumeTokens.getData(savedOffset).asString().getValue()); + LOGGER.info("Invalid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Initial snapshot will be performed for " + + "all streams.", + ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime()); return false; } } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 0c079534e55246..ed08fabcc6837f 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -5,7 +5,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.7.9' + cdkVersionRequired = '0.13.2' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json b/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json index 8de6fd3bc30ea8..e77e9d63278003 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json +++ b/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json @@ -164,7 +164,7 @@ "description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.", "default": 10000, "order": 10, - "minimum": 100, + "minimum": 10, "maximum": 100000, "group": "advanced" } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 086852451f42df..3bba900cd50355 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.2 + dockerImageTag: 1.2.3 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index ce979bc13353fe..bc8957c98179d2 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -92,9 +92,9 @@ public List> createCdcIterators( final boolean isEnforceSchema = config.getEnforceSchema(); final Properties defaultDebeziumProperties = MongoDbCdcProperties.getDebeziumProperties(); logOplogInfo(mongoClient); - final BsonDocument resumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient); + final BsonDocument initialResumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient); final JsonNode initialDebeziumState = - mongoDbDebeziumStateUtil.constructInitialDebeziumState(resumeToken, mongoClient, databaseName); + mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, mongoClient, databaseName); final MongoDbCdcState cdcState = (stateManager.getCdcState() == null || stateManager.getCdcState().state() == null) ? new MongoDbCdcState(initialDebeziumState, isEnforceSchema) : new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced()); @@ -137,7 +137,7 @@ public List> createCdcIterators( emittedAt, config.getCheckpointInterval(), isEnforceSchema); final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(config.rawConfig(), - new MongoDbCdcTargetPosition(resumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize); + new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize); final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager); final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed); @@ -164,8 +164,8 @@ private void logOplogInfo(final MongoClient mongoClient) { final Document command = new Document("collStats", "oplog.rs"); final Document result = localDatabase.runCommand(command); if (result != null) { - LOGGER.info("Max oplog size is {} bytes", result.getInteger("maxSize")); - LOGGER.info("Free space in oplog is {} bytes", result.getInteger("freeStorageSize")); + LOGGER.info("Max oplog size is {} bytes", result.getLong("maxSize")); + LOGGER.info("Free space in oplog is {} bytes", result.getLong("freeStorageSize")); } } catch (final Exception e) { LOGGER.warn("Unable to query for op log stats, exception: {}" + e.getMessage()); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json index 4acd5c67d25f8c..9c4af4f046c9a0 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json @@ -164,7 +164,7 @@ "description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.", "default": 10000, "order": 10, - "minimum": 100, + "minimum": 10, "maximum": 100000, "group": "advanced" } diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index c7821542174bba..4ff1a9a2f0fb01 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -214,6 +214,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| +| 1.2.3 | 2024-01-18 | [34364](https://github.com/airbytehq/airbyte/pull/34364) | Add additional logging for resume token + reduce discovery size to 10. | | 1.2.2 | 2024-01-16 | [34314](https://github.com/airbytehq/airbyte/pull/34314) | Reduce minimum document discovery size to 100. | | 1.2.1 | 2023-12-18 | [33549](https://github.com/airbytehq/airbyte/pull/33549) | Add logging to understand op log size. | | 1.2.0 | 2023-12-18 | [33438](https://github.com/airbytehq/airbyte/pull/33438) | Remove LEGACY state flag |