Skip to content

Commit

Permalink
[source-postgres] Add test for legacy version of postgres (#35329)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored Feb 15, 2024
1 parent 34ca067 commit 5c7e3b9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,28 @@

package io.airbyte.integrations.source.postgres;

import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Order;

@Order(2)
public class CdcPostgresSourceLegacyCtidTest extends CdcPostgresSourceTest {

protected static String getServerImageName() {
return "debezium/postgres:13-bullseye";
@Override
protected void setBaseImage() {
this.postgresImage = BaseImage.POSTGRES_12;
}

@Override
@Disabled("https://github.com/airbytehq/airbyte/issues/35267")
public void newTableSnapshotTest() {

}

@Override
@Disabled("https://github.com/airbytehq/airbyte/issues/35267")
public void syncShouldIncrementLSN() {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,16 @@
@Order(1)
public class CdcPostgresSourceTest extends CdcSourceTest<PostgresSource, PostgresTestDatabase> {

protected BaseImage postgresImage;

protected void setBaseImage() {
this.postgresImage = getServerImage();
}

@Override
protected PostgresTestDatabase createTestDatabase() {
return PostgresTestDatabase.in(getServerImage(), ContainerModifier.CONF).withReplicationSlot();
setBaseImage();
return PostgresTestDatabase.in(this.postgresImage, ContainerModifier.CONF).withReplicationSlot();
}

@Override
Expand All @@ -101,6 +108,15 @@ protected void setup() {
testdb.withPublicationForAllTables();
}

// For legacy Postgres we will call advanceLsn() after we retrieved target LSN, so that debezium
// would not drop any record.
// However, that might cause unexpected state and cause failure in the test. Thus we need to bypass
// some check if they are on legacy postgres
// versions.
private boolean isOnLegacyPostgres() {
return postgresImage.majorVersion < 15;
}

@Test
void testDebugMode() {
final JsonNode invalidDebugConfig = testdb.testConfigBuilder()
Expand Down Expand Up @@ -196,7 +212,12 @@ private void assertStateTypes(final List<AirbyteStateMessage> stateMessages, fin
if (Objects.isNull(sharedState)) {
sharedState = global.getSharedState();
} else {
assertEquals(sharedState, global.getSharedState());
// This validation is only true for versions on or after postgres 15. We execute
// EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS for earlier versions of
// Postgres. See https://github.com/airbytehq/airbyte/pull/33605 for details.
if (!isOnLegacyPostgres()) {
assertEquals(sharedState, global.getSharedState());
}
}
assertEquals(1, global.getStreamStates().size());
final AirbyteStreamState streamState = global.getStreamStates().get(0);
Expand Down Expand Up @@ -324,7 +345,11 @@ public void testTwoStreamSync() throws Exception {
if (Objects.isNull(sharedState)) {
sharedState = global.getSharedState();
} else {
assertEquals(sharedState, global.getSharedState());
// LSN will be advanced for postgres version before 15. See
// https://github.com/airbytehq/airbyte/pull/33605
if (!isOnLegacyPostgres()) {
assertEquals(sharedState, global.getSharedState());
}
}

if (Objects.isNull(firstStreamInState)) {
Expand Down Expand Up @@ -755,7 +780,11 @@ protected void assertLsnPositionForSyncShouldIncrementLSN(final Long lsnPosition
if (syncNumber == 1) {
assertEquals(1, lsnPosition2.compareTo(lsnPosition1));
} else if (syncNumber == 2) {
assertEquals(0, lsnPosition2.compareTo(lsnPosition1));
// Earlier Postgres version will advance lsn even if there is no sync records. See
// https://github.com/airbytehq/airbyte/pull/33605.
if (!isOnLegacyPostgres()) {
assertEquals(0, lsnPosition2.compareTo(lsnPosition1));
}
} else {
throw new RuntimeException("Unknown sync number " + syncNumber);
}
Expand Down Expand Up @@ -791,7 +820,9 @@ protected void verifyCheckpointStatesByRecords() throws Exception {
.toListAndClose(secondBatchIterator);
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
if (!isOnLegacyPostgres()) {
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
}
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
}

Expand Down Expand Up @@ -830,7 +861,9 @@ protected void verifyCheckpointStatesBySeconds() throws Exception {

assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
if (!isOnLegacyPostgres()) {
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
}
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ public class PostgresTestDatabase extends

public static enum BaseImage {

POSTGRES_16("postgres:16-bullseye"),
POSTGRES_12("postgres:12-bullseye"),
POSTGRES_9("postgres:9-alpine"),
POSTGRES_SSL_DEV("marcosmarxm/postgres-ssl:dev");
POSTGRES_16("postgres:16-bullseye", 16),
POSTGRES_12("postgres:12-bullseye", 12),
POSTGRES_9("postgres:9-alpine", 9),
POSTGRES_SSL_DEV("marcosmarxm/postgres-ssl:dev", 16);

private final String reference;
public final String reference;
public final int majorVersion;

private BaseImage(String reference) {
private BaseImage(String reference, int majorVersion) {
this.reference = reference;
this.majorVersion = majorVersion;
};

}
Expand Down

0 comments on commit 5c7e3b9

Please sign in to comment.