Skip to content

Commit

Permalink
source-postgres: Remove strict check for stream states of unknown typ…
Browse files Browse the repository at this point in the history
…es (#40559)
  • Loading branch information
evantahler committed Jun 26, 2024
1 parent ffca542 commit 0cfaf79
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.19
dockerImageTag: 3.4.20
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,26 @@ public static StreamsCategorised<XminStreams> categoriseStreams(final StateManag
rawStateMessages.forEach(stateMessage -> {
final JsonNode streamState = stateMessage.getStream().getStreamState();
final StreamDescriptor streamDescriptor = stateMessage.getStream().getStreamDescriptor();
if (streamState == null || streamDescriptor == null) {
if (streamState == null || streamDescriptor == null || !streamState.has(STATE_TYPE_KEY)) {
return;
}

if (streamState.has(STATE_TYPE_KEY)) {
if (streamState.get(STATE_TYPE_KEY).asText().equalsIgnoreCase("ctid")) {
statesFromCtidSync.add(stateMessage);
streamsStillInCtidSync.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace()));
} else if (streamState.get(STATE_TYPE_KEY).asText().equalsIgnoreCase("xmin")) {
if (shouldPerformFullSync(currentXminStatus, streamState)) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamDescriptor.getName(),
streamDescriptor.getNamespace());
LOGGER.info("Detected multiple wraparounds. Will perform a full sync for {}", pair);
streamsStillInCtidSync.add(pair);
} else {
statesFromXminSync.add(stateMessage);
}
if (streamState.get(STATE_TYPE_KEY).asText().equalsIgnoreCase("ctid")) {
statesFromCtidSync.add(stateMessage);
streamsStillInCtidSync.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace()));
} else if (streamState.get(STATE_TYPE_KEY).asText().equalsIgnoreCase("xmin")) {
if (shouldPerformFullSync(currentXminStatus, streamState)) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamDescriptor.getName(),
streamDescriptor.getNamespace());
LOGGER.info("Detected multiple wraparounds. Will perform a full sync for {}", pair);
streamsStillInCtidSync.add(pair);
} else {
throw new ConfigErrorException("You've changed replication modes - please reset the streams in this connector");
statesFromXminSync.add(stateMessage);
}
} else {
throw new RuntimeException("State type not present");
throw new ConfigErrorException("You've changed replication modes - please reset the streams in this connector");
}

alreadySeenStreams.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace()));
});
}
Expand Down
7 changes: 4 additions & 3 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,10 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|---------| ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.4.19 | 2024-06-23 | [40223](https://github.com/airbytehq/airbyte/pull/40223) | Revert the changes introduced in version 3.4.15. |
| 3.4.18 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. |
| ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.4.20 | 2024-06-23 | [40559](https://github.com/airbytehq/airbyte/pull/40559) | Remove strict check for stream states of unknown types |
| 3.4.19 | 2024-06-23 | [40223](https://github.com/airbytehq/airbyte/pull/40223) | Revert the changes introduced in version 3.4.15. |
| 3.4.18 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. |
| 3.4.17 | 2024-06-13 | [39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
| 3.4.16 | 2024-05-29 | [39474](https://github.com/airbytehq/airbyte/pull/39474) | Adopt latest CDK. |
| 3.4.15 | 2024-05-29 | [38773](https://github.com/airbytehq/airbyte/pull/38773) | Connect with adaptiveFetch=true. |
Expand Down

0 comments on commit 0cfaf79

Please sign in to comment.