Skip to content

Commit

Permalink
calculate diff from most recent schema fetch in update endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Dec 12, 2022
1 parent 8669c75 commit d45e124
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1304,16 +1304,6 @@ public Optional<ActorCatalog> getActorCatalog(final UUID actorId,

return records.stream().findFirst().map(DbConverter::buildActorCatalog);
}

public Optional<ActorCatalog> getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException {
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk())
.from(ACTOR_CATALOG)
.join(ACTOR_CATALOG_FETCH_EVENT)
.on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID))
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId))
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch());
return records.stream().findFirst().map(DbConverter::buildActorCatalog);
}

public Optional<ActorCatalog> getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException {
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk())
Expand All @@ -1327,7 +1317,6 @@ public Optional<ActorCatalog> getMostRecentActorCatalogForSource(final UUID sour

public Optional<ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSource(final UUID sourceId) throws IOException {


final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG_FETCH_EVENT.asterisk())
.from(ACTOR_CATALOG_FETCH_EVENT)
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationIdRequestBody;
import io.airbyte.api.model.generated.DestinationRead;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.OperationCreate;
import io.airbyte.api.model.generated.OperationReadList;
Expand All @@ -32,6 +33,7 @@
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum;
import io.airbyte.api.model.generated.WebBackendConnectionCreate;
import io.airbyte.api.model.generated.WebBackendConnectionListItem;
import io.airbyte.api.model.generated.WebBackendConnectionRead;
Expand All @@ -45,6 +47,7 @@
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -478,6 +481,28 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne

final UUID connectionId = webBackendConnectionPatch.getConnectionId();
final ConnectionRead originalConnectionRead = connectionsHandler.getConnection(connectionId);
boolean breakingChange = originalConnectionRead.getBreakingChange();

// If there have been changes to the sync catalog, check whether these changes result in or fix a
// broken connection
if (webBackendConnectionPatch.getSyncCatalog() != null) {
// Get the most recent actor catalog fetched for this connection's source and the newly updated sync
// catalog
Optional<ActorCatalog> mostRecentActorCatalog = configRepository.getMostRecentActorCatalogForSource(originalConnectionRead.getSourceId());
AirbyteCatalog newActorCatalog = webBackendConnectionPatch.getSyncCatalog();
// Get the diff between these two catalogs to check for breaking changes
if (!mostRecentActorCatalog.isEmpty()) {
final io.airbyte.api.model.generated.AirbyteCatalog mostRecentAirbyteCatalog =
Jsons.object(mostRecentActorCatalog.get().getCatalog(), io.airbyte.api.model.generated.AirbyteCatalog.class);
final CatalogDiff catalogDiff =
connectionsHandler.getDiff(mostRecentAirbyteCatalog, newActorCatalog, CatalogConverter.toProtocol(newActorCatalog));
if (containsBreakingChange(catalogDiff)) {
breakingChange = true;
} else {
breakingChange = false;
}
}
}

// before doing any updates, fetch the existing catalog so that it can be diffed
// with the final catalog to determine which streams might need to be reset.
Expand All @@ -488,7 +513,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne

// pass in operationIds because the patch object doesn't include operationIds that were just created
// above.
final ConnectionUpdate connectionPatch = toConnectionPatch(webBackendConnectionPatch, newAndExistingOperationIds);
final ConnectionUpdate connectionPatch = toConnectionPatch(webBackendConnectionPatch, newAndExistingOperationIds, breakingChange);

// persist the update and set the connectionRead to the updated form.
final ConnectionRead updatedConnectionRead = connectionsHandler.updateConnection(connectionPatch);
Expand Down Expand Up @@ -651,7 +676,8 @@ protected static ConnectionCreate toConnectionCreate(final WebBackendConnectionC
*/
@VisibleForTesting
protected static ConnectionUpdate toConnectionPatch(final WebBackendConnectionUpdate webBackendConnectionPatch,
final List<UUID> finalOperationIds) {
final List<UUID> finalOperationIds,
boolean breakingChange) {
final ConnectionUpdate connectionPatch = new ConnectionUpdate();

connectionPatch.connectionId(webBackendConnectionPatch.getConnectionId());
Expand All @@ -669,6 +695,7 @@ protected static ConnectionUpdate toConnectionPatch(final WebBackendConnectionUp
connectionPatch.geography(webBackendConnectionPatch.getGeography());
connectionPatch.notifySchemaChanges(webBackendConnectionPatch.getNotifySchemaChanges());
connectionPatch.nonBreakingChangesPreference(webBackendConnectionPatch.getNonBreakingChangesPreference());
connectionPatch.breakingChange(breakingChange);

connectionPatch.operationIds(finalOperationIds);

Expand All @@ -689,4 +716,19 @@ private record Stream(String name, String namespace) {

}

private boolean containsBreakingChange(final CatalogDiff diff) {
for (final StreamTransform streamTransform : diff.getTransforms()) {
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) {
continue;
}

final boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
if (anyBreakingFieldTransforms) {
return true;
}
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,10 @@ void testToConnectionPatch() throws IOException {
.syncCatalog(catalog)
.geography(Geography.US)
.nonBreakingChangesPreference(NonBreakingChangesPreference.DISABLE)
.notifySchemaChanges(false);
.notifySchemaChanges(false)
.breakingChange(false);

final ConnectionUpdate actual = WebBackendConnectionsHandler.toConnectionPatch(input, operationIds);
final ConnectionUpdate actual = WebBackendConnectionsHandler.toConnectionPatch(input, operationIds, false);

assertEquals(expected, actual);
}
Expand Down

0 comments on commit d45e124

Please sign in to comment.