Skip to content

Commit

Permalink
Attempt to fix flakiness (#36258)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed Mar 19, 2024
1 parent e5aea95 commit bb478d5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ acceptance_tests:
- config_path: "secrets/config.json"
expect_records:
path: "integration_tests/expected_records.txt"
validate_state_messages: false
- config_path: "secrets/config_cdc.json"
expect_records:
path: "integration_tests/expected_records.txt"
validate_state_messages: false
full_refresh:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import static io.airbyte.integrations.source.postgres.xmin.XminTestConstants.RECORD_MESSAGE_2;
import static io.airbyte.integrations.source.postgres.xmin.XminTestConstants.RECORD_MESSAGE_3;
import static io.airbyte.integrations.source.postgres.xmin.XminTestConstants.STREAM_NAME1;
import static io.airbyte.integrations.source.postgres.xmin.XminTestConstants.XMIN_STATE_MESSAGE_1;
import static io.airbyte.integrations.source.postgres.xmin.XminTestConstants.XMIN_STATUS1;
import static io.airbyte.integrations.source.postgres.xmin.XminTestConstants.createStateMessage1WithCount;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -19,7 +19,6 @@
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.sql.SQLException;
Expand Down Expand Up @@ -70,12 +69,9 @@ void testSuccessfulSync() {
manager,
new StateEmitFrequency(0L, Duration.ofSeconds(1L)));

var expectedStateMessage =
XMIN_STATE_MESSAGE_1.withState(XMIN_STATE_MESSAGE_1.getState().withSourceStats(new AirbyteStateStats().withRecordCount(2.0)));

assertEquals(RECORD_MESSAGE_1, iterator.next());
assertEquals(RECORD_MESSAGE_2, iterator.next());
assertEquals(expectedStateMessage, iterator.next());
assertEquals(createStateMessage1WithCount(2.0), iterator.next());
assertFalse(iterator.hasNext());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -72,4 +73,18 @@ public static AirbyteMessage createRecordMessage(final String recordValue) {
.withData(Jsons.jsonNode(ImmutableMap.of(UUID_FIELD_NAME, recordValue))));
}

public static AirbyteMessage createStateMessage1WithCount(final double count) {
return new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
.withStreamDescriptor(
new StreamDescriptor()
.withName(PAIR1.getName())
.withNamespace(PAIR1.getNamespace()))
.withStreamState(new ObjectMapper().valueToTree(XMIN_STATUS1)))
.withSourceStats(new AirbyteStateStats().withRecordCount(count)));
}

}

0 comments on commit bb478d5

Please sign in to comment.