Skip to content

Commit

Permalink
[source-postgres] Fix final state message on state (#38171)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed May 13, 2024
1 parent 8fdd981 commit 59cdc36
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.0
version=0.35.1
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
// Do nothing.
}

private fun assertStateDoNotHaveDuplicateStreams(stateMessage: AirbyteStateMessage) {
val dedupedStreamStates =
stateMessage.global.streamStates
.stream()
.map { streamState: AirbyteStreamState -> streamState.streamDescriptor }
.collect(Collectors.toSet())
Assertions.assertEquals(dedupedStreamStates.size, stateMessage.global.streamStates.size)
}

@BeforeEach
protected open fun setup() {
testdb = createTestDatabase()
Expand Down Expand Up @@ -616,6 +625,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {

val recordMessages1 = extractRecordMessages(actualRecords1)
val stateMessages1 = extractStateMessages(actualRecords1)
stateMessages1.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
val names = HashSet(STREAM_NAMES)
names.add(MODELS_STREAM_NAME_2)

Expand Down Expand Up @@ -657,7 +667,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
} else {
assertExpectedStateMessageCountMatches(
stateMessages1,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
)
assertExpectedRecords(
Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
Expand Down Expand Up @@ -1236,8 +1246,9 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {

assertExpectedStateMessageCountMatches(
stateAfterFirstBatch,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
)
stateAfterFirstBatch.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
}

protected open fun assertStateMessagesForNewTableSnapshotTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.34.2'
cdkVersionRequired = '0.35.1'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.1
dockerImageTag: 3.4.2
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamesp

resumableFullRefreshStreams.forEach(stream -> {
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(pair);
streamStates.add(getAirbyteStreamState(pair, Jsons.jsonNode(ctidStatusForFullRefreshStream)));
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ctidStatusForFullRefreshStream)));
});

return new AirbyteStateMessage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ private void assertStateTypes(final List<? extends AirbyteStateMessage> stateMes
}
}

@Override
protected void validateStreamStateInResumableFullRefresh(final JsonNode streamStateToBeTested) {
assertEquals("ctid", streamStateToBeTested.get("state_type").asText());
}

@Override
@Test
protected void testCdcAndNonResumableFullRefreshInSameSync() throws Exception {}
Expand Down

0 comments on commit 59cdc36

Please sign in to comment.