Skip to content

Commit

Permalink
Disable all broken connections when source is refreshed (#20208)
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Dec 9, 2022
1 parent c4c26c0 commit 0a8fc9e
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,20 @@ public List<StandardSync> listWorkspaceStandardSyncs(final UUID workspaceId, fin
return getStandardSyncsFromResult(connectionAndOperationIdsResult);
}

public List<StandardSync> listConnectionsBySource(final UUID sourceId, final boolean includeDeleted) throws IOException {
final Result<Record> connectionAndOperationIdsResult = database.query(ctx -> ctx
.select(
CONNECTION.asterisk(),
groupConcat(CONNECTION_OPERATION.OPERATION_ID).separator(OPERATION_IDS_AGG_DELIMITER).as(OPERATION_IDS_AGG_FIELD))
.from(CONNECTION)
.leftJoin(CONNECTION_OPERATION).on(CONNECTION_OPERATION.CONNECTION_ID.eq(CONNECTION.ID))
.where(CONNECTION.SOURCE_ID.eq(sourceId)
.and(includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated)))
.groupBy(CONNECTION.ID)).fetch();

return getStandardSyncsFromResult(connectionAndOperationIdsResult);
}

private List<StandardSync> getStandardSyncsFromResult(final Result<Record> connectionAndOperationIdsResult) {
final List<StandardSync> standardSyncs = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,14 @@ public ConnectionReadList listConnectionsForWorkspace(final WorkspaceIdRequestBo
return new ConnectionReadList().connections(connectionReads);
}

public ConnectionReadList listConnectionsForSource(final UUID sourceId, final boolean includeDeleted) throws IOException {
final List<ConnectionRead> connectionReads = Lists.newArrayList();
for (final StandardSync standardSync : configRepository.listConnectionsBySource(sourceId, includeDeleted)) {
connectionReads.add(ApiPojoConverters.internalToConnectionRead(standardSync));
}
return new ConnectionReadList().connections(connectionReads);
}

public ConnectionReadList listConnections() throws JsonValidationException, ConfigNotFoundException, IOException {
final List<ConnectionRead> connectionReads = Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionReadList;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationCoreConfig;
Expand Down Expand Up @@ -264,7 +265,8 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId);

if (discoverSchemaRequestBody.getConnectionId() != null) {
discoveredSchemaWithCatalogDiff(discoveredSchema, discoverSchemaRequestBody);
// modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody);
}

return discoveredSchema;
Expand Down Expand Up @@ -383,30 +385,37 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
return submitCancellationToWorker(jobIdRequestBody.getId());
}

private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema,
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
// Find all connections that use the source from the SourceDiscoverSchemaRequestBody. For each one,
// determine whether 1. the source schema change resulted in a broken connection or 2. the user
// wants the connection disabled when non-breaking changes are detected. If so, disable that
// connection. Modify the current discoveredSchema object to add a CatalogDiff,
// containsBreakingChange paramter, and connectionStatus parameter.
private void generateCatalogDiffsAndDisableConnectionsIfNeeded(SourceDiscoverSchemaRead discoveredSchema,
SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
final boolean containsBreakingChange = containsBreakingChange(diff);
final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
final ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
connectionStatus = connectionRead.getStatus();
final ConnectionReadList connectionsForSource = connectionsHandler.listConnectionsForSource(discoverSchemaRequestBody.getSourceId(), false);
for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(connectionRead.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId());
ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
connectionStatus = connectionRead.getStatus();
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);
if (connectionRead.getConnectionId().equals(discoverSchemaRequestBody.getConnectionId())) {
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus);
}
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus);

}

private boolean shouldDisableConnection(final boolean containsBreakingChange,
Expand Down
Loading

0 comments on commit 0a8fc9e

Please sign in to comment.