Skip to content

Commit

Permalink
Revert "Bmoric/api flag not running auto propagation" (#6558)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed May 12, 2023
1 parent 8129132 commit bf16d5a
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 268 deletions.
48 changes: 3 additions & 45 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -954,27 +954,7 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/sources/apply_schema_changes:
post:
tags:
- source
summary:
Auto propagate the change on a catalog to a catalog saved in the DB. It will fetch all the connections linked to
a source id and apply the provided diff to their catalog.
operationId: applySchemaChangeForSource
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceAutoPropagateChange"
required: true
responses:
"204":
description: The schema was properly auto propagate
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"

/v1/sources/write_discover_catalog_result:
post:
tags:
Expand Down Expand Up @@ -3457,28 +3437,6 @@ components:
type: boolean
connectionStatus:
$ref: "#/components/schemas/ConnectionStatus"
SourceAutoPropagateChange:
description:
Input of the source propagation, it contains the discovered catalog and a list of diff that need to be applied
to the existing catalog.
type: object
required:
- catalog
- catalogId
- sourceId
- workspaceId
properties:
catalog:
$ref: "#/components/schemas/AirbyteCatalog"
catalogId:
type: string
format: uuid
sourceId:
type: string
format: uuid
workspaceId:
type: string
format: uuid
SourceSearch:
type: object
properties:
Expand Down Expand Up @@ -5617,8 +5575,8 @@ components:
$ref: "#/components/schemas/NonBreakingChangesPreference"
NonBreakingChangesPreference:
enum:
- ignore # do nothing if we detect a schema change
- disable # disable the connection, pausing the sync
- ignore
- disable
type: string
WebBackendConnectionReadList:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.LogRead;
import io.airbyte.api.model.generated.NonBreakingChangesPreference;
import io.airbyte.api.model.generated.SourceAutoPropagateChange;
import io.airbyte.api.model.generated.SourceCoreConfig;
import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId;
import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead;
Expand Down Expand Up @@ -347,35 +346,6 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
.catalogId(currentCatalog.get().getId());
}

public void applySchemaChangeForSource(final SourceAutoPropagateChange sourceAutoPropagateChange)
throws IOException, JsonValidationException, ConfigNotFoundException {
final ConnectionReadList connectionsForSource =
connectionsHandler.listConnectionsForSource(sourceAutoPropagateChange.getSourceId(), false);
for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(connectionRead.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog),
sourceAutoPropagateChange.getCatalog(),
CatalogConverter.toConfiguredProtocol(currentAirbyteCatalog));

final ConnectionUpdate updateObject =
new ConnectionUpdate().connectionId(connectionRead.getConnectionId());

if (shouldAutoPropagate(diff, sourceAutoPropagateChange.getWorkspaceId(), connectionRead)) {
applySchemaChange(sourceAutoPropagateChange.getWorkspaceId(),
updateObject,
currentAirbyteCatalog,
sourceAutoPropagateChange.getCatalog(),
diff.getTransforms(),
sourceAutoPropagateChange.getCatalogId());
}
connectionsHandler.updateConnection(updateObject);
}
}

public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final SourceCoreConfig sourceCreate)
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(sourceCreate.getSourceDefinitionId());
Expand Down Expand Up @@ -474,8 +444,7 @@ public DestinationDefinitionSpecificationRead getSpecificationForDestinationId(f
}

@SuppressWarnings("LineLength")
public DestinationDefinitionSpecificationRead getDestinationSpecification(
final DestinationDefinitionIdWithWorkspaceId destinationDefinitionIdWithWorkspaceId)
public DestinationDefinitionSpecificationRead getDestinationSpecification(final DestinationDefinitionIdWithWorkspaceId destinationDefinitionIdWithWorkspaceId)
throws ConfigNotFoundException, IOException, JsonValidationException {
final UUID destinationDefinitionId = destinationDefinitionIdWithWorkspaceId.getDestinationDefinitionId();
final StandardDestinationDefinition destination = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
Expand Down Expand Up @@ -553,7 +522,6 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco

final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId());

final ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
Expand All @@ -562,6 +530,16 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
}
updateObject.status(connectionStatus);

if (!diff.getTransforms().isEmpty() && !containsBreakingChange) {
autoPropagateSchemaChange(workspaceId,
connectionRead.getConnectionId(),
updateObject,
currentAirbyteCatalog,
discoveredSchema.getCatalog(),
diff.getTransforms(),
discoveredSchema.getCatalogId());
}

connectionsHandler.updateConnection(updateObject);

if (shouldNotifySchemaChange(diff, connectionRead, discoverSchemaRequestBody)) {
Expand All @@ -574,29 +552,24 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
}
}

private boolean shouldAutoPropagate(final CatalogDiff diff, final UUID workspaceId, final ConnectionRead connectionRead) {
final boolean hasDiff = !diff.getTransforms().isEmpty();
final boolean nonBreakingChange = !containsBreakingChange(diff);
final boolean autoPropagationIsEnabledForWorkspace = featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId));
final boolean autoPropagationIsEnabledForConnection =
connectionRead.getNonBreakingChangesPreference() != null;
return hasDiff && nonBreakingChange && autoPropagationIsEnabledForWorkspace && autoPropagationIsEnabledForConnection;
}

private void applySchemaChange(final UUID connectionId,
@VisibleForTesting
void autoPropagateSchemaChange(final UUID workspaceId,
final UUID connectionId,
final ConnectionUpdate updateObject,
final io.airbyte.api.model.generated.AirbyteCatalog currentAirbyteCatalog,
final io.airbyte.api.model.generated.AirbyteCatalog newCatalog,
final List<StreamTransform> transformations,
final UUID sourceCatalogId) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.SCHEMA_CHANGE_AUTO_PROPAGATED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
final io.airbyte.api.model.generated.AirbyteCatalog catalog = getUpdatedSchema(
currentAirbyteCatalog,
newCatalog,
transformations);
updateObject.setSyncCatalog(catalog);
updateObject.setSourceCatalogId(sourceCatalogId);
if (featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId))) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.SCHEMA_CHANGE_AUTO_PROPAGATED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
io.airbyte.api.model.generated.AirbyteCatalog catalog = getUpdatedSchema(
currentAirbyteCatalog,
newCatalog,
transformations);
updateObject.setSyncCatalog(catalog);
updateObject.setSourceCatalogId(sourceCatalogId);
}
}

private boolean shouldNotifySchemaChange(final CatalogDiff diff,
Expand Down
Loading

0 comments on commit bf16d5a

Please sign in to comment.