Skip to content

Commit

Permalink
Postgres on Resumable full refresh (#37112)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed May 10, 2024
1 parent 715bdae commit 80920d1
Show file tree
Hide file tree
Showing 25 changed files with 843 additions and 331 deletions.
@@ -1 +1 @@
version=0.34.1
version=0.34.2
Expand Up @@ -715,7 +715,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
// sync, the
// data is replicated as expected.
@Throws(Exception::class)
fun testCdcAndNonResumableFullRefreshInSameSync() {
protected open fun testCdcAndNonResumableFullRefreshInSameSync() {
val configuredCatalog = Jsons.clone(configuredCatalog)

val MODEL_RECORDS_2: List<JsonNode> =
Expand All @@ -734,7 +734,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
createTableSqlFmt(),
modelsSchema(),
MODELS_STREAM_NAME_2,
columnClause(columns, Optional.of(COL_ID)),
columnClause(columns, Optional.empty()),
)

for (recordJson in MODEL_RECORDS_2) {
Expand Down
Expand Up @@ -420,9 +420,10 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
setEmittedAtToNull(actualMessages)

val expectedMessages = airbyteMessagesReadOneColumn
Assertions.assertEquals(expectedMessages.size, actualMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualMessages))
Assertions.assertTrue(actualMessages.containsAll(expectedMessages))
val actualRecordMessages = filterRecords(actualMessages)
Assertions.assertEquals(expectedMessages.size, actualRecordMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualRecordMessages))
Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages))
}

protected open val airbyteMessagesReadOneColumn: List<AirbyteMessage>
Expand Down Expand Up @@ -507,8 +508,6 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {

expectedMessages.addAll(getAirbyteMessagesSecondSync(streamName2))

System.out.println("catalog: " + catalog)

val actualMessages = MoreIterators.toList(source()!!.read(config(), catalog, null))
val actualRecordMessages = filterRecords(actualMessages)

Expand Down
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.34.1'
cdkVersionRequired = '0.34.2'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.33
dockerImageTag: 3.4.0
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -23,7 +23,7 @@ public class PostgresCdcConnectorMetadataInjector implements CdcMetadataInjector
this.lsn = null;
}

PostgresCdcConnectorMetadataInjector(final String transactionTimestamp, final Long lsn) {
public PostgresCdcConnectorMetadataInjector(final String transactionTimestamp, final Long lsn) {
this.transactionTimestamp = transactionTimestamp;
this.lsn = lsn;
}
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -16,7 +16,6 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -34,7 +33,6 @@ public static CtidStreams streamsToSyncViaCtid(final CdcStateManager stateManage
return new CtidStreams(
fullCatalog.getStreams()
.stream()
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
.collect(Collectors.toList()),
new HashMap<>());
}
Expand Down Expand Up @@ -78,7 +76,6 @@ private static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final Con
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
final Set<AirbyteStreamNameNamespacePair> newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams));
return catalog.getStreams().stream()
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
.filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))).map(Jsons::clone)
.collect(Collectors.toList());
}
Expand Down
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState;
import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidUtils.CtidStreams;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
Expand All @@ -33,29 +34,41 @@ public class CtidGlobalStateManager extends CtidStateManager {

private static final Logger LOGGER = LoggerFactory.getLogger(CtidGlobalStateManager.class);

private final CdcState cdcState;
private final Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot;
private final StateManager stateManager;
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot;
private final boolean savedOffsetAfterReplicationSlotLSN;
private final CdcState defaultCdcState;

public CtidGlobalStateManager(final CtidStreams ctidStreams,
final FileNodeHandler fileNodeHandler,
final CdcState cdcState,
final ConfiguredAirbyteCatalog catalog) {
final StateManager stateManager,
final ConfiguredAirbyteCatalog catalog,
final boolean savedOffsetAfterReplicationSlotLSN,
final CdcState defaultCdcState) {
super(filterOutExpiredFileNodes(ctidStreams.pairToCtidStatus(), fileNodeHandler));
this.cdcState = cdcState;
this.streamsThatHaveCompletedSnapshot = initStreamsCompletedSnapshot(ctidStreams, catalog);
this.stateManager = stateManager;
this.savedOffsetAfterReplicationSlotLSN = savedOffsetAfterReplicationSlotLSN;
this.defaultCdcState = defaultCdcState;
initStream(ctidStreams, catalog);
this.fileNodeHandler = fileNodeHandler;
}

private static Set<AirbyteStreamNameNamespacePair> initStreamsCompletedSnapshot(final CtidStreams ctidStreams,
final ConfiguredAirbyteCatalog catalog) {
final Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot = new HashSet<>();
private void initStream(final CtidStreams ctidStreams,
final ConfiguredAirbyteCatalog catalog) {
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
this.resumableFullRefreshStreams = new HashSet<>();
catalog.getStreams().forEach(configuredAirbyteStream -> {
if (ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) || configuredAirbyteStream.getSyncMode() != SyncMode.INCREMENTAL) {
return;
if (!ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) && configuredAirbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
streamsThatHaveCompletedSnapshot.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
}
if (ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream)
&& configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
this.resumableFullRefreshStreams.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
}
streamsThatHaveCompletedSnapshot.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
});
return streamsThatHaveCompletedSnapshot;
}

private static Map<AirbyteStreamNameNamespacePair, CtidStatus> filterOutExpiredFileNodes(
Expand All @@ -79,37 +92,65 @@ private static Map<AirbyteStreamNameNamespacePair, CtidStatus> filterOutExpiredF
public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespacePair pair, final CtidStatus ctidStatus) {
pairToCtidStatus.put(pair, ctidStatus);
final List<AirbyteStreamState> streamStates = new ArrayList<>();

streamsThatHaveCompletedSnapshot.forEach(stream -> {
final DbStreamState state = getFinalState(stream);
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state)));

});
streamStates.add(getAirbyteStreamState(pair, (Jsons.jsonNode(ctidStatus))));
final AirbyteGlobalState globalState = new AirbyteGlobalState();
globalState.setSharedState(Jsons.jsonNode(cdcState));
globalState.setStreamStates(streamStates);

resumableFullRefreshStreams.forEach(stream -> {
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(stream);
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(ctidStatusForFullRefreshStream))));
});

if (!resumableFullRefreshStreams.contains(pair)) {
streamStates.add(getAirbyteStreamState(pair, (Jsons.jsonNode(ctidStatus))));
}

return new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(globalState);
.withGlobal(generateGlobalState(streamStates));
}

public AirbyteGlobalState generateGlobalState(final List<AirbyteStreamState> streamStates) {
final CdcState stateToBeUsed = getCdcState();
final AirbyteGlobalState globalState = new AirbyteGlobalState();
globalState.setSharedState(Jsons.jsonNode(stateToBeUsed));
globalState.setStreamStates(streamStates);
return globalState;

}

public CdcState getCdcState() {
final CdcState stateManagerCdcState = stateManager.getCdcStateManager().getCdcState();

return !savedOffsetAfterReplicationSlotLSN || stateManagerCdcState == null
|| stateManagerCdcState.getState() == null ? defaultCdcState
: stateManagerCdcState;

}

@Override
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
streamsThatHaveCompletedSnapshot.add(pair);
// Only incremental streams can be transformed into the next phase.
if (!resumableFullRefreshStreams.contains(pair)) {
streamsThatHaveCompletedSnapshot.add(pair);
}
final List<AirbyteStreamState> streamStates = new ArrayList<>();
streamsThatHaveCompletedSnapshot.forEach(stream -> {
final DbStreamState state = getFinalState(stream);
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state)));
});

final AirbyteGlobalState globalState = new AirbyteGlobalState();
globalState.setSharedState(Jsons.jsonNode(cdcState));
globalState.setStreamStates(streamStates);
resumableFullRefreshStreams.forEach(stream -> {
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(pair);
streamStates.add(getAirbyteStreamState(pair, Jsons.jsonNode(ctidStatusForFullRefreshStream)));
});

return new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(globalState);
.withGlobal(generateGlobalState(streamStates));
}

private AirbyteStreamState getAirbyteStreamState(final AirbyteStreamNameNamespacePair pair, final JsonNode stateData) {
Expand Down
Expand Up @@ -81,6 +81,11 @@ public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespa

@Override
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
if (streamStateForIncrementalRun == null || streamStateForIncrementalRun.isEmpty()) {
// resumeable full refresh for cursor based stream.
var ctidStatus = generateCtidStatusForState(pair);
return createCtidStateMessage(pair, ctidStatus);
}
return XminStateManager.getAirbyteStateMessage(pair, Jsons.object(streamStateForIncrementalRun, XminStatus.class));
}

Expand Down
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
Expand All @@ -27,13 +28,14 @@ public abstract class CtidStateManager implements SourceStateMessageProducer<Air
public static final String STATE_TYPE_KEY = "state_type";

protected final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus;
private Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;
protected Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;

private String lastCtid;
private FileNodeHandler fileNodeHandler;
protected String lastCtid;
protected FileNodeHandler fileNodeHandler;

protected CtidStateManager(final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus) {
this.pairToCtidStatus = pairToCtidStatus;
this.streamStateForIncrementalRunSupplier = namespacePair -> Jsons.emptyObject();
}

public CtidStatus getCtidStatus(final AirbyteStreamNameNamespacePair pair) {
Expand All @@ -55,26 +57,39 @@ public static boolean validateRelationFileNode(final CtidStatus ctidstatus,

public abstract AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun);

public void setStreamStateIteratorFields(Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier,
FileNodeHandler fileNodeHandler) {
public void setStreamStateIteratorFields(Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier) {
this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier;
}

public void setFileNodeHandler(final FileNodeHandler fileNodeHandler) {
this.fileNodeHandler = fileNodeHandler;
}

public FileNodeHandler getFileNodeHandler() {
return fileNodeHandler;
}

@Override
public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());
final CtidStatus ctidStatus = generateCtidStatusForState(pair);
LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus);
return createCtidStateMessage(pair, ctidStatus);
}

protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespacePair pair) {
final Long fileNode = fileNodeHandler.getFileNode(pair);
assert fileNode != null;
final CtidStatus ctidStatus = new CtidStatus()
// If the table is empty, lastCtid will be set to zero for the final state message.
final String lastCtidInState = (Objects.nonNull(lastCtid)
&& StringUtils.isNotBlank(lastCtid)) ? lastCtid : Ctid.ZERO.toString();
return new CtidStatus()
.withVersion(CTID_STATUS_VERSION)
.withStateType(StateType.CTID)
.withCtid(lastCtid)
.withCtid(lastCtidInState)
.withIncrementalState(getStreamState(pair))
.withRelationFilenode(fileNode);
LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus);
return createCtidStateMessage(pair, ctidStatus);
}

/**
Expand Down Expand Up @@ -112,6 +127,7 @@ public boolean shouldEmitStateMessage(final ConfiguredAirbyteStream stream) {

private JsonNode getStreamState(final AirbyteStreamNameNamespacePair pair) {
final CtidStatus currentCtidStatus = getCtidStatus(pair);

return (currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair)
: currentCtidStatus.getIncrementalState();
}
Expand Down

0 comments on commit 80920d1

Please sign in to comment.