Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Source-postgres] : Throw exception if there are any streams acively undergoing vacuum #41651

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.4.24
dockerImageTag: 3.4.25
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ public static CtidGlobalStateManager getCtidInitialLoadGlobalStateManager(final
streamsUnderVacuum.addAll(streamsUnderVacuum(database,
ctidStreams.streamsForCtidSync(), quoteString).result());

final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid =
streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync()
: ctidStreams.streamsForCtidSync().stream()
.filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c)))
.toList();
if (!streamsUnderVacuum.isEmpty()) {
throw new ConfigErrorException(
"Postgres database is undergoing a full vacuum - cannot proceed with the sync. Please sync again when the vacuum is finished.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want this to be a ConfigErrorException? Don't we want to retry syncing later on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will change to TransientErrorException


final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid = ctidStreams.streamsForCtidSync();

LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size());
LOGGER.info("Streams: {}", prettyPrintConfiguredAirbyteStreamList(finalListOfStreamsToBeSyncedViaCtid));
final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database,
Expand Down Expand Up @@ -195,11 +197,12 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
streamsUnderVacuum.addAll(streamsUnderVacuum(database,
ctidStreams.streamsForCtidSync(), quoteString).result());

finalListOfStreamsToBeSyncedViaCtid =
streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync()
: ctidStreams.streamsForCtidSync().stream()
.filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c)))
.toList();
if (!streamsUnderVacuum.isEmpty()) {
throw new ConfigErrorException(
"Postgres database is undergoing a full vacuum - cannot proceed with the sync. Please sync again when the vacuum is finished.");
}

finalListOfStreamsToBeSyncedViaCtid = ctidStreams.streamsForCtidSync();
final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database,
finalListOfStreamsToBeSyncedViaCtid,
quoteString);
Expand Down Expand Up @@ -273,22 +276,16 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
.collect(Collectors.toList());
}

if (streamsUnderVacuum.isEmpty()) {
// This starts processing the WAL as soon as initial sync is complete, this is a bit different from
// the current cdc syncs.
// We finish the current CDC once the initial snapshot is complete and the next sync starts
// processing the WAL
return Stream
.of(initialSyncCtidIterators, cdcStreamsStartStatusEmitters,
Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)),
allStreamsCompleteStatusEmitters)
.flatMap(Collection::stream)
.collect(Collectors.toList());
} else {
LOGGER.warn("Streams are under vacuuming, not going to process WAL");
return Stream.of(initialSyncCtidIterators, cdcStreamsStartStatusEmitters, allStreamsCompleteStatusEmitters).flatMap(Collection::stream)
.collect(Collectors.toList());
}
// This starts processing the WAL as soon as initial sync is complete, this is a bit different from
// the current cdc syncs.
// We finish the current CDC once the initial snapshot is complete and the next sync starts
// processing the WAL
return Stream
.of(initialSyncCtidIterators, cdcStreamsStartStatusEmitters,
Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)),
allStreamsCompleteStatusEmitters)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

public static CdcState getCdcState(final JdbcDatabase database,
Expand Down
Loading