Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source MSSQL: Added event_serial_no to cdc metadata #16798

Merged
merged 9 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.4.28
dockerImageTag: 0.4.29
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8234,7 +8234,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.4.28"
- dockerImage: "airbyte/source-mssql:0.4.29"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/mssql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public abstract class SourceAcceptanceTest extends AbstractSourceConnectorTest {
public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at";
public static final String CDC_LOG_FILE = "_ab_cdc_log_file";
public static final String CDC_LOG_POS = "_ab_cdc_log_pos";
public static final String CDC_EVENT_SERIAL_NO = "_ab_cdc_event_serial_no";

private static final Logger LOGGER = LoggerFactory.getLogger(SourceAcceptanceTest.class);

Expand Down Expand Up @@ -357,6 +358,7 @@ private AirbyteRecordMessage pruneCdcMetadata(final AirbyteRecordMessage m) {
((ObjectNode) clone.getData()).remove(CDC_LOG_POS);
((ObjectNode) clone.getData()).remove(CDC_UPDATED_AT);
((ObjectNode) clone.getData()).remove(CDC_DELETED_AT);
((ObjectNode) clone.getData()).remove(CDC_EVENT_SERIAL_NO);
return clone;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.28
LABEL io.airbyte.version=0.4.29
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.28
LABEL io.airbyte.version=0.4.29
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.mssql;

import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN;
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_EVENT_SERIAL_NO;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -15,7 +16,9 @@ public class MssqlCdcConnectorMetadataInjector implements CdcMetadataInjector {
@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final String commitLsn = source.get("commit_lsn").asText();
final String eventSerialNo = source.get("event_serial_no").asText();
event.put(CDC_LSN, commitLsn);
event.put(CDC_EVENT_SERIAL_NO, eventSerialNo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
public static final String MSSQL_CDC_OFFSET = "mssql_cdc_offset";
public static final String MSSQL_DB_HISTORY = "mssql_db_history";
public static final String CDC_LSN = "_ab_cdc_lsn";
public static final String CDC_EVENT_SERIAL_NO = "_ab_cdc_event_serial_no";
private static final String HIERARCHYID = "hierarchyid";
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;
private List<String> schemas;
Expand Down Expand Up @@ -426,6 +427,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
properties.set(CDC_LSN, stringType);
properties.set(CDC_UPDATED_AT, stringType);
properties.set(CDC_DELETED_AT, stringType);
properties.set(CDC_EVENT_SERIAL_NO, stringType);

return stream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN;
import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_EVENT_SERIAL_NO;
import static io.airbyte.integrations.source.mssql.MssqlSource.DRIVER_CLASS;
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY;
Expand Down Expand Up @@ -356,6 +357,7 @@ protected void removeCDCColumns(final ObjectNode data) {
data.remove(CDC_LSN);
data.remove(CDC_UPDATED_AT);
data.remove(CDC_DELETED_AT);
data.remove(CDC_EVENT_SERIAL_NO);
}

@Override
Expand Down Expand Up @@ -389,11 +391,13 @@ protected void assertNullCdcMetaData(final JsonNode data) {
assertNull(data.get(CDC_LSN));
assertNull(data.get(CDC_UPDATED_AT));
assertNull(data.get(CDC_DELETED_AT));
assertNull(data.get(CDC_EVENT_SERIAL_NO));
}

@Override
protected void assertCdcMetaData(final JsonNode data, final boolean deletedAtNull) {
assertNotNull(data.get(CDC_LSN));
assertNotNull(data.get(CDC_EVENT_SERIAL_NO));
assertNotNull(data.get(CDC_UPDATED_AT));
if (deletedAtNull) {
assertTrue(data.get(CDC_DELETED_AT).isNull());
Expand All @@ -411,6 +415,7 @@ protected void addCdcMetadataColumns(final AirbyteStream stream) {
properties.set(CDC_LSN, stringType);
properties.set(CDC_UPDATED_AT, stringType);
properties.set(CDC_DELETED_AT, stringType);
properties.set(CDC_EVENT_SERIAL_NO, stringType);

}

Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
| **Marketo** | <img alt="Marketo icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/marketo.svg" height="30" height="30"/> | Source | airbyte/source-marketo:1.0.2 | generally_available | [link](https://docs.airbyte.com/integrations/sources/marketo) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-marketo) | <small>`9e0556f4-69df-4522-a3fb-03264d36b348`</small> |
| **Metabase** | <img alt="Metabase icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/metabase.svg" height="30" height="30"/> | Source | airbyte/source-metabase:0.3.1 | beta | [link](https://docs.airbyte.com/integrations/sources/metabase) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-metabase) | <small>`c7cb421b-942e-4468-99ee-e369bcabaec5`</small> |
| **Microsoft Dataverse** | <img alt="Microsoft Dataverse icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/microsoftdataverse.svg" height="30" height="30"/> | Source | airbyte/source-microsoft-dataverse:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/microsoft-dataverse) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-microsoft-dataverse) | <small>`9220e3de-3b60-4bb2-a46f-046d59ea235a`</small> |
| **Microsoft SQL Server (MSSQL)** | <img alt="Microsoft SQL Server (MSSQL) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/mssql.svg" height="30" height="30"/> | Source | airbyte/source-mssql:0.4.28 | alpha | [link](https://docs.airbyte.com/integrations/sources/mssql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mssql) | <small>`b5ea17b1-f170-46dc-bc31-cc744ca984c1`</small> |
| **Microsoft SQL Server (MSSQL)** | <img alt="Microsoft SQL Server (MSSQL) icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/mssql.svg" height="30" height="30"/> | Source | airbyte/source-mssql:0.4.29 | alpha | [link](https://docs.airbyte.com/integrations/sources/mssql) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mssql) | <small>`b5ea17b1-f170-46dc-bc31-cc744ca984c1`</small> |
| **Microsoft teams** | <img alt="Microsoft teams icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/microsoft-teams.svg" height="30" height="30"/> | Source | airbyte/source-microsoft-teams:0.2.5 | alpha | [link](https://docs.airbyte.com/integrations/sources/microsoft-teams) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-microsoft-teams) | <small>`eaf50f04-21dd-4620-913b-2a83f5635227`</small> |
| **Mixpanel** | <img alt="Mixpanel icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/mixpanel.svg" height="30" height="30"/> | Source | airbyte/source-mixpanel:0.1.30 | generally_available | [link](https://docs.airbyte.com/integrations/sources/mixpanel) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-mixpanel) | <small>`12928b32-bf0a-4f1e-964f-07e12e37153a`</small> |
| **Monday** | <img alt="Monday icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/monday.svg" height="30" height="30"/> | Source | airbyte/source-monday:0.2.2 | beta | [link](https://docs.airbyte.com/integrations/sources/monday) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-monday) | <small>`80a54ea2-9959-4040-aac1-eee42423ec9b`</small> |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.29 | 2023-02-24 | [16798](https://github.com/airbytehq/airbyte/pull/16798) | Add event_serial_no to cdc metadata |
| 0.4.28 | 2023-01-18 | [21348](https://github.com/airbytehq/airbyte/pull/21348) | Fix error introduced in [18959](https://github.com/airbytehq/airbyte/pull/18959) in which option `initial_waiting_seconds` was removed |
| 0.4.27 | 2022-12-14 | [20436](https://github.com/airbytehq/airbyte/pull/20346) | Consolidate date/time values mapping for JDBC sources |
| 0.4.26 | 2022-12-12 | [18959](https://github.com/airbytehq/airbyte/pull/18959) | CDC : Don't timeout if snapshot is not complete. |
Expand Down