Skip to content

Commit

Permalink
[Source-mssql] : Add meta error handling in initial load path (#37325)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk committed Apr 15, 2024
1 parent 63d4d5e commit ca394d2
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 15 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.10
dockerImageTag: 4.0.11
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.microsoft.sqlserver.jdbc.Geography;
import com.microsoft.sqlserver.jdbc.Geometry;
import com.microsoft.sqlserver.jdbc.SQLServerResultSetMetaData;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.source.mssql.initialsync.CdcMetadataInjector;
import io.airbyte.protocol.models.JsonSchemaType;
Expand Down Expand Up @@ -51,13 +52,14 @@ public MssqlSourceOperations(final Optional<CdcMetadataInjector> metadataInjecto
}

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
final ObjectNode jsonNode = (ObjectNode) super.rowToJson(queryContext);
public AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet queryContext) throws SQLException {
final AirbyteRecordData recordData = super.convertDatabaseRowToAirbyteRecordData(queryContext);
final ObjectNode jsonNode = (ObjectNode) recordData.rawRowData();
if (!metadataInjector.isPresent()) {
return jsonNode;
return recordData;
}
metadataInjector.get().inject(jsonNode);
return jsonNode;
return new AirbyteRecordData(jsonNode, recordData.meta());
}

/**
Expand Down
Expand Up @@ -15,6 +15,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.cdk.db.SqlDatabase;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
Expand All @@ -32,6 +33,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -166,7 +168,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
}
});

final AutoCloseableIterator<JsonNode> queryStream =
final AutoCloseableIterator<AirbyteRecordData> queryStream =
new MssqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair,
calculateChunkSize(tableSizeInfoMap.get(pair), pair), isCompositePrimaryKey(airbyteStream));
final AutoCloseableIterator<AirbyteMessage> recordIterator =
Expand All @@ -180,7 +182,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(

// Transforms the given iterator to create an {@link AirbyteRecordMessage}
private AutoCloseableIterator<AirbyteMessage> getRecordIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final AutoCloseableIterator<AirbyteRecordData> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
Expand All @@ -190,7 +192,12 @@ private AutoCloseableIterator<AirbyteMessage> getRecordIterator(
.withStream(streamName)
.withNamespace(namespace)
.withEmittedAt(emittedAt)
.withData(r)));
.withData(r.rawRowData())
.withMeta(isMetaChangesEmptyOrNull(r.meta()) ? null : r.meta())));
}

private boolean isMetaChangesEmptyOrNull(AirbyteRecordMessageMeta meta) {
return meta == null || meta.getChanges() == null || meta.getChanges().isEmpty();
}

// Augments the given iterator with record count logs.
Expand Down
Expand Up @@ -7,9 +7,9 @@
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.source.relationaldb.models.OrderedColumnLoadStatus;
import io.airbyte.commons.util.AutoCloseableIterator;
Expand All @@ -28,12 +28,12 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings("try")
public class MssqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>
implements AutoCloseableIterator<JsonNode> {
public class MssqlInitialLoadRecordIterator extends AbstractIterator<AirbyteRecordData>
implements AutoCloseableIterator<AirbyteRecordData> {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlInitialLoadRecordIterator.class);

private AutoCloseableIterator<JsonNode> currentIterator;
private AutoCloseableIterator<AirbyteRecordData> currentIterator;
private final JdbcDatabase database;
private int numSubqueries = 0;
private final String quoteString;
Expand Down Expand Up @@ -67,7 +67,7 @@ public class MssqlInitialLoadRecordIterator extends AbstractIterator<JsonNode>

@CheckForNull
@Override
protected JsonNode computeNext() {
protected AirbyteRecordData computeNext() {
if (shouldBuildNextSubquery()) {
try {
// We will only issue one query for a composite key load. If we have already processed all the data
Expand All @@ -82,8 +82,8 @@ protected JsonNode computeNext() {
}

LOGGER.info("Subquery number : {}", numSubqueries);
final Stream<JsonNode> stream = database.unsafeQuery(
this::getOcPreparedStatement, sourceOperations::rowToJson);
final Stream<AirbyteRecordData> stream = database.unsafeQuery(
this::getOcPreparedStatement, sourceOperations::convertDatabaseRowToAirbyteRecordData);
currentIterator = AutoCloseableIterators.fromStream(stream, pair);
numSubqueries++;
// If the current subquery has no records associated with it, the entire stream has been read.
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Expand Up @@ -327,6 +327,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.0.11 | 2024-04-15 | [37325](https://github.com/airbytehq/airbyte/pull/37325) | Populate airbyte_meta.changes + error handling. |
| 4.0.10 | 2024-04-15 | [37110](https://github.com/airbytehq/airbyte/pull/37110) | Internal cleanup. |
| 4.0.9 | 2024-04-10 | [36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix a bug in conversion of null values. |
| 4.0.8 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. |
Expand Down

0 comments on commit ca394d2

Please sign in to comment.