Skip to content

Commit

Permalink
upgrade debezium version to 1.9.6 (#17459)
Browse files Browse the repository at this point in the history
* upgrade debezium version to 1.9.6

* test override not required

* bump version

* auto-bump connector version [ci skip]

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
subodh1810 and octavia-squidington-iii committed Oct 1, 2022
1 parent aaa94f1 commit 0c1823d
Show file tree
Hide file tree
Showing 45 changed files with 83 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 1.0.0
dockerImageTag: 1.0.1
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down Expand Up @@ -836,7 +836,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.12
dockerImageTag: 1.0.13
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6965,7 +6965,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:1.0.0"
- dockerImage: "airbyte/source-mysql:1.0.1"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -8578,7 +8578,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.12"
- dockerImage: "airbyte/source-postgres:1.0.13"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-db:db-lib')

implementation 'io.debezium:debezium-api:1.9.2.Final'
implementation 'io.debezium:debezium-embedded:1.9.2.Final'
implementation 'io.debezium:debezium-api:1.9.6.Final'
implementation 'io.debezium:debezium-embedded:1.9.6.Final'
// implementation 'io.debezium:debezium-connector-sqlserver:1.9.2.Final'
implementation 'io.debezium:debezium-connector-mysql:1.9.2.Final'
implementation 'io.debezium:debezium-connector-postgres:1.9.2.Final'
implementation 'io.debezium:debezium-connector-mysql:1.9.6.Final'
implementation 'io.debezium:debezium-connector-postgres:1.9.6.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

testFixturesImplementation project(':airbyte-db:db-lib')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ ENV APPLICATION source-mysql-strict-encrypt
COPY --from=build /airbyte /airbyte


LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1

LABEL io.airbyte.name=airbyte/source-mysql
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ application {
dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:bases:debezium-v1-9-2')
implementation project(':airbyte-integrations:bases:debezium-v1-9-6')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'mysql:mysql-connector-java:8.0.22'
implementation 'org.apache.commons:commons-lang3:3.11'

testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-2'))
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6'))
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.12
LABEL io.airbyte.version=1.0.13
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.12
LABEL io.airbyte.version=1.0.13
LABEL io.airbyte.name=airbyte/source-postgres
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-postgres/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ application {
dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:bases:debezium-v1-9-2')
implementation project(':airbyte-integrations:bases:debezium-v1-9-6')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'org.apache.commons:commons-lang3:3.11'
implementation libs.postgresql

testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-2'))
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6'))
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation project(":airbyte-json-validation")
testImplementation project(':airbyte-test-utils')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -21,7 +20,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
Expand All @@ -45,23 +43,16 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -278,72 +269,6 @@ public String createSchemaQuery(final String schemaName) {
return "CREATE SCHEMA " + schemaName + ";";
}

@Override
@Test
public void testRecordsProducedDuringAndAfterSync() throws Exception {

final int recordsToCreate = 20;
// first batch of records. 20 created here and 6 created in setup method.
for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 100 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
writeModelRecord(record);
}

final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertEquals(1, stateAfterFirstBatch.size());
assertNotNull(stateAfterFirstBatch.get(0).getData());
assertExpectedStateMessages(stateAfterFirstBatch);
final Set<AirbyteRecordMessage> recordsFromFirstBatch = extractRecordMessages(
dataFromFirstBatch);
assertEquals((MODEL_RECORDS.size() + recordsToCreate), recordsFromFirstBatch.size());

// second batch of records again 20 being created
for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
writeModelRecord(record);
}

final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch);
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);

final List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
assertEquals(1, stateAfterSecondBatch.size());
assertNotNull(stateAfterSecondBatch.get(0).getData());
assertExpectedStateMessages(stateAfterSecondBatch);

final Set<AirbyteRecordMessage> recordsFromSecondBatch = extractRecordMessages(
dataFromSecondBatch);
assertEquals(recordsToCreate * 2, recordsFromSecondBatch.size(),
"Expected 40 records to be replicated in the second sync.");

// sometimes there can be more than one of these at the end of the snapshot and just before the
// first incremental.
final Set<AirbyteRecordMessage> recordsFromFirstBatchWithoutDuplicates = removeDuplicates(
recordsFromFirstBatch);
final Set<AirbyteRecordMessage> recordsFromSecondBatchWithoutDuplicates = removeDuplicates(
recordsFromSecondBatch);

final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size();
assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(),
"Expected first sync to include records created while the test was running.");
assertEquals((recordsToCreate * 3) + recordsCreatedBeforeTestCount,
recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates
.size());
}

@Override
protected String randomTableSchema() {
return MODELS_SCHEMA + "_random";
Expand Down

0 comments on commit 0c1823d

Please sign in to comment.