Skip to content

Commit

Permalink
Add monitoring to the auto propagation (#6230)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed May 3, 2023
1 parent b95335d commit 8cb1624
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
import io.airbyte.featureflag.AutoPropagateSchema;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WebUrlHelper;
import io.airbyte.persistence.job.models.Job;
Expand Down Expand Up @@ -508,6 +512,14 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
CatalogConverter.toConfiguredProtocol(currentAirbyteCatalog));
final boolean containsBreakingChange = containsBreakingChange(diff);

if (containsBreakingChange) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.BREAKING_SCHEMA_CHANGE_DETECTED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionRead.getConnectionId().toString()));
} else {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.NON_BREAKING_SCHEMA_CHANGE_DETECTED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionRead.getConnectionId().toString()));
}

final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId());
final ConnectionStatus connectionStatus;
Expand All @@ -520,6 +532,7 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco

if (!diff.getTransforms().isEmpty() && !containsBreakingChange) {
autoPropagateSchemaChange(workspaceId,
connectionRead.getConnectionId(),
updateObject,
currentAirbyteCatalog,
discoveredSchema.getCatalog(),
Expand All @@ -541,12 +554,15 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco

@VisibleForTesting
void autoPropagateSchemaChange(final UUID workspaceId,
final UUID connectionId,
final ConnectionUpdate updateObject,
final io.airbyte.api.model.generated.AirbyteCatalog currentAirbyteCatalog,
final io.airbyte.api.model.generated.AirbyteCatalog newCatalog,
final List<StreamTransform> transformations,
final UUID sourceCatalogId) {
if (featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId))) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.SCHEMA_CHANGE_AUTO_PROPAGATED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
io.airbyte.api.model.generated.AirbyteCatalog catalog = getUpdatedSchema(
currentAirbyteCatalog,
newCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1612,21 +1612,22 @@ void testCancelJob() throws IOException {
@Test
void testAutoPropagateChange() {
final UUID workspaceId = UUID.randomUUID();
final UUID connectionId = UUID.randomUUID();

ConnectionUpdate connectionUpdate = mock(ConnectionUpdate.class);
io.airbyte.api.model.generated.AirbyteCatalog catalog = mock(io.airbyte.api.model.generated.AirbyteCatalog.class);
UUID sourceCatalogId = UUID.randomUUID();
when(featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId)))
.thenReturn(true);
schedulerHandler.autoPropagateSchemaChange(workspaceId, connectionUpdate, catalog, catalog, List.of(), sourceCatalogId);
schedulerHandler.autoPropagateSchemaChange(workspaceId, connectionId, connectionUpdate, catalog, catalog, List.of(), sourceCatalogId);
verify(connectionUpdate).setSyncCatalog(any());
verify(connectionUpdate).setSourceCatalogId(sourceCatalogId);

connectionUpdate = mock(ConnectionUpdate.class);

when(featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId)))
.thenReturn(false);
schedulerHandler.autoPropagateSchemaChange(workspaceId, connectionUpdate, catalog, catalog, List.of(), sourceCatalogId);
schedulerHandler.autoPropagateSchemaChange(workspaceId, connectionId, connectionUpdate, catalog, catalog, List.of(), sourceCatalogId);
verifyNoInteractions(connectionUpdate);
}

Expand Down Expand Up @@ -1674,11 +1675,11 @@ void testDiscoverSchemaWithAutoDetectSchema() throws IOException, JsonValidation
false))
.thenReturn(discoverResponse);
when(envVariableFeatureFlags.autoDisablesFailingConnections()).thenReturn(false);
doNothing().when(schedulerHandler).autoPropagateSchemaChange(any(), any(), any(), any(), any(), any());
doNothing().when(schedulerHandler).autoPropagateSchemaChange(any(), any(), any(), any(), any(), any(), any());

schedulerHandler.discoverSchemaForSourceFromSourceId(request);

verify(schedulerHandler).autoPropagateSchemaChange(any(), any(), any(), any(), any(), any());
verify(schedulerHandler).autoPropagateSchemaChange(any(), any(), any(), any(), any(), any(), any());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,22 @@ public enum OssMetricsRegistry implements MetricsRegistry {
MetricTags.ATTEMPT_QUEUE,
MetricTags.ATTEMPT_OUTCOME,
MetricTags.FAILURE_ORIGIN, // only includes the first failure origin
MetricTags.FAILURE_TYPE); // only includes the first failure type
MetricTags.FAILURE_TYPE), // only includes the first failure type

BREAKING_SCHEMA_CHANGE_DETECTED(MetricEmittingApps.SERVER,
"breaking_change_detected",
"a breaking schema change has been detected",
MetricTags.CONNECTION_ID),

NON_BREAKING_SCHEMA_CHANGE_DETECTED(MetricEmittingApps.SERVER,
"non_breaking_change_detected",
"a non breaking schema change has been detected",
MetricTags.CONNECTION_ID),

SCHEMA_CHANGE_AUTO_PROPAGATED(MetricEmittingApps.SERVER,
"schema_change_auto_propagated",
"a schema change have been propagated",
MetricTags.CONNECTION_ID);

private final MetricEmittingApp application;
private final String metricName;
Expand Down

0 comments on commit 8cb1624

Please sign in to comment.