Skip to content

Commit

Permalink
Set breaking change to false when connection is fixed (#20315)
Browse files Browse the repository at this point in the history
* Set breaking change to false when connection is fixed
  • Loading branch information
alovew committed Dec 14, 2022
1 parent a718159 commit 3dae1f1
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1305,13 +1305,22 @@ public Optional<ActorCatalog> getActorCatalog(final UUID actorId,
return records.stream().findFirst().map(DbConverter::buildActorCatalog);
}

public Optional<ActorCatalog> getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException {
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk())
.from(ACTOR_CATALOG)
.join(ACTOR_CATALOG_FETCH_EVENT)
.on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID))
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId))
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch());
return records.stream().findFirst().map(DbConverter::buildActorCatalog);
}

public Optional<ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSource(final UUID sourceId) throws IOException {

final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG_FETCH_EVENT.asterisk())
.from(ACTOR_CATALOG_FETCH_EVENT)
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId))
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch());

return records.stream().findFirst().map(DbConverter::buildActorCatalogFetchEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationIdRequestBody;
import io.airbyte.api.model.generated.DestinationRead;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.OperationCreate;
import io.airbyte.api.model.generated.OperationReadList;
Expand All @@ -32,6 +33,7 @@
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum;
import io.airbyte.api.model.generated.WebBackendConnectionCreate;
import io.airbyte.api.model.generated.WebBackendConnectionListItem;
import io.airbyte.api.model.generated.WebBackendConnectionRead;
Expand All @@ -45,6 +47,7 @@
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -510,6 +513,25 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne

final UUID connectionId = webBackendConnectionPatch.getConnectionId();
final ConnectionRead originalConnectionRead = connectionsHandler.getConnection(connectionId);
boolean breakingChange = originalConnectionRead.getBreakingChange() != null && originalConnectionRead.getBreakingChange();

// If there have been changes to the sync catalog, check whether these changes result in or fix a
// broken connection
if (webBackendConnectionPatch.getSyncCatalog() != null) {
// Get the most recent actor catalog fetched for this connection's source and the newly updated sync
// catalog
Optional<ActorCatalog> mostRecentActorCatalog = configRepository.getMostRecentActorCatalogForSource(originalConnectionRead.getSourceId());
AirbyteCatalog newAirbyteCatalog = webBackendConnectionPatch.getSyncCatalog();
// Get the diff between these two catalogs to check for breaking changes
if (mostRecentActorCatalog.isPresent()) {
final io.airbyte.protocol.models.AirbyteCatalog mostRecentAirbyteCatalog =
Jsons.object(mostRecentActorCatalog.get().getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class);
final CatalogDiff catalogDiff =
connectionsHandler.getDiff(newAirbyteCatalog, CatalogConverter.toApi(mostRecentAirbyteCatalog),
CatalogConverter.toProtocol(newAirbyteCatalog));
breakingChange = containsBreakingChange(catalogDiff);
}
}

// before doing any updates, fetch the existing catalog so that it can be diffed
// with the final catalog to determine which streams might need to be reset.
Expand All @@ -520,7 +542,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne

// pass in operationIds because the patch object doesn't include operationIds that were just created
// above.
final ConnectionUpdate connectionPatch = toConnectionPatch(webBackendConnectionPatch, newAndExistingOperationIds);
final ConnectionUpdate connectionPatch = toConnectionPatch(webBackendConnectionPatch, newAndExistingOperationIds, breakingChange);

// persist the update and set the connectionRead to the updated form.
final ConnectionRead updatedConnectionRead = connectionsHandler.updateConnection(connectionPatch);
Expand Down Expand Up @@ -685,7 +707,8 @@ protected static ConnectionCreate toConnectionCreate(final WebBackendConnectionC
*/
@VisibleForTesting
protected static ConnectionUpdate toConnectionPatch(final WebBackendConnectionUpdate webBackendConnectionPatch,
final List<UUID> finalOperationIds) {
final List<UUID> finalOperationIds,
boolean breakingChange) {
final ConnectionUpdate connectionPatch = new ConnectionUpdate();

connectionPatch.connectionId(webBackendConnectionPatch.getConnectionId());
Expand All @@ -703,6 +726,7 @@ protected static ConnectionUpdate toConnectionPatch(final WebBackendConnectionUp
connectionPatch.geography(webBackendConnectionPatch.getGeography());
connectionPatch.notifySchemaChanges(webBackendConnectionPatch.getNotifySchemaChanges());
connectionPatch.nonBreakingChangesPreference(webBackendConnectionPatch.getNonBreakingChangesPreference());
connectionPatch.breakingChange(breakingChange);

connectionPatch.operationIds(finalOperationIds);

Expand All @@ -723,4 +747,19 @@ private record Stream(String name, String namespace) {

}

private boolean containsBreakingChange(final CatalogDiff diff) {
for (final StreamTransform streamTransform : diff.getTransforms()) {
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) {
continue;
}

final boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
if (anyBreakingFieldTransforms) {
return true;
}
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import java.util.stream.Collectors;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;

class WebBackendConnectionsHandlerTest {
Expand Down Expand Up @@ -667,9 +668,10 @@ void testToConnectionPatch() throws IOException {
.syncCatalog(catalog)
.geography(Geography.US)
.nonBreakingChangesPreference(NonBreakingChangesPreference.DISABLE)
.notifySchemaChanges(false);
.notifySchemaChanges(false)
.breakingChange(false);

final ConnectionUpdate actual = WebBackendConnectionsHandler.toConnectionPatch(input, operationIds);
final ConnectionUpdate actual = WebBackendConnectionsHandler.toConnectionPatch(input, operationIds, false);

assertEquals(expected, actual);
}
Expand Down Expand Up @@ -1049,6 +1051,66 @@ void testUpdateConnectionWithSkipReset() throws JsonValidationException, ConfigN
verify(eventRunner, times(0)).resetConnection(any(), any(), eq(true));
}

@Test
void testUpdateConnectionFixingBreakingSchemaChange() throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendConnectionUpdate updateBody = new WebBackendConnectionUpdate()
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.connectionId(expected.getConnectionId())
.schedule(expected.getSchedule())
.status(expected.getStatus())
.syncCatalog(expectedWithNewSchema.getSyncCatalog())
.skipReset(false)
.connectionId(expected.getConnectionId());

final UUID sourceId = UUID.randomUUID();

// existing connection has a breaking change
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(
new ConnectionRead().connectionId(expected.getConnectionId()).breakingChange(true).sourceId(sourceId));

final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of());
when(configRepository.getMostRecentActorCatalogForSource(sourceId)).thenReturn(Optional.of(new ActorCatalog().withCatalog(Jsons.deserialize(
"{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}"))));
when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff, catalogDiff);

when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId()))
.thenReturn(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog());
when(operationsHandler.listOperationsForConnection(any())).thenReturn(operationReadList);

final ConnectionRead connectionRead = new ConnectionRead()
.connectionId(expected.getConnectionId())
.sourceId(expected.getSourceId())
.destinationId(expected.getDestinationId())
.name(expected.getName())
.namespaceDefinition(expected.getNamespaceDefinition())
.namespaceFormat(expected.getNamespaceFormat())
.prefix(expected.getPrefix())
.syncCatalog(expectedWithNewSchema.getSyncCatalog())
.status(expected.getStatus())
.schedule(expected.getSchedule())
.breakingChange(false);

when(connectionsHandler.updateConnection(any())).thenReturn(connectionRead);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);

assertEquals(expectedWithNewSchema.getSyncCatalog(), result.getSyncCatalog());

final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId());
ArgumentCaptor<ConnectionUpdate> expectedArgumentCaptor = ArgumentCaptor.forClass(ConnectionUpdate.class);
verify(connectionsHandler, times(1)).updateConnection(expectedArgumentCaptor.capture());
List<ConnectionUpdate> connectionUpdateValues = expectedArgumentCaptor.getAllValues();
// Expect the ConnectionUpdate object to have breakingChange: false
assertEquals(false, connectionUpdateValues.get(0).getBreakingChange());

verify(schedulerHandler, times(0)).resetConnection(connectionId);
verify(schedulerHandler, times(0)).syncConnection(connectionId);
verify(connectionsHandler, times(2)).getDiff(any(), any(), any());
verify(connectionsHandler, times(1)).updateConnection(any());
}

@Test
void testUpdateSchemaWithDiscoveryFromEmpty() {
final AirbyteCatalog original = new AirbyteCatalog().streams(List.of());
Expand Down

0 comments on commit 3dae1f1

Please sign in to comment.