Skip to content

Commit

Permalink
Auto propagate schema change (#6192)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed May 2, 2023
1 parent 170af5c commit 69aac2a
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.commons.server.handlers;

import static io.airbyte.commons.server.handlers.helpers.AutoPropagateSchemaChangeHelper.getUpdatedSchema;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -94,7 +96,6 @@
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;

/**
* ScheduleHandler. Javadocs suppressed because api docs should be used as source of truth.
Expand Down Expand Up @@ -506,6 +507,7 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toConfiguredProtocol(currentAirbyteCatalog));
final boolean containsBreakingChange = containsBreakingChange(diff);

final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId());
final ConnectionStatus connectionStatus;
Expand All @@ -515,6 +517,16 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
connectionStatus = connectionRead.getStatus();
}
updateObject.status(connectionStatus);

if (!diff.getTransforms().isEmpty() && !containsBreakingChange) {
autoPropagateSchemaChange(workspaceId,
updateObject,
currentAirbyteCatalog,
discoveredSchema.getCatalog(),
diff.getTransforms(),
discoveredSchema.getCatalogId());
}

connectionsHandler.updateConnection(updateObject);

if (shouldNotifySchemaChange(diff, connectionRead, discoverSchemaRequestBody)) {
Expand All @@ -528,9 +540,19 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
}

@VisibleForTesting
void autoPropagateSchemaChange(final UUID workspaceId) {
void autoPropagateSchemaChange(final UUID workspaceId,
final ConnectionUpdate updateObject,
final io.airbyte.api.model.generated.AirbyteCatalog currentAirbyteCatalog,
final io.airbyte.api.model.generated.AirbyteCatalog newCatalog,
final List<StreamTransform> transformations,
final UUID sourceCatalogId) {
if (featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId))) {
throw new NotImplementedException();
io.airbyte.api.model.generated.AirbyteCatalog catalog = getUpdatedSchema(
currentAirbyteCatalog,
newCatalog,
transformations);
updateObject.setSyncCatalog(catalog);
updateObject.setSourceCatalogId(sourceCatalogId);
}
}

Expand Down Expand Up @@ -626,7 +648,8 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio
return jobConverter.getJobInfoRead(job);
}

private boolean containsBreakingChange(final CatalogDiff diff) {
@VisibleForTesting
boolean containsBreakingChange(final CatalogDiff diff) {
for (final StreamTransform streamTransform : diff.getTransforms()) {
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) {
continue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.server.handlers.helpers;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.api.model.generated.AirbyteCatalog;
import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.commons.json.Jsons;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.ws.rs.NotSupportedException;
import lombok.extern.slf4j.Slf4j;

/**
* Helper that allows to generate the catalogs to be auto propagated.
*/
@Slf4j
public class AutoPropagateSchemaChangeHelper {

/**
* This is auto propagating schema changes, it replaces the stream in the old catalog by using the
* ones from the new catalog. The list of transformations contains the information of which stream
* to update.
*
* @param oldCatalog the currently saved catalog
* @param newCatalog the new catalog, which contains all the stream even the unselected ones
* @param transformations list of transformation per stream
* @return an Airbyte catalog the changes being auto propagated
*/
public static AirbyteCatalog getUpdatedSchema(final AirbyteCatalog oldCatalog,
final AirbyteCatalog newCatalog,
final List<StreamTransform> transformations) {
AirbyteCatalog copiedOldCatalog = Jsons.clone(oldCatalog);
Map<StreamDescriptor, AirbyteStreamAndConfiguration> oldCatalogPerStream = extractStreamAndConfigPerStreamDescriptor(copiedOldCatalog);
Map<StreamDescriptor, AirbyteStreamAndConfiguration> newCatalogPerStream = extractStreamAndConfigPerStreamDescriptor(newCatalog);

transformations.forEach(transformation -> {
StreamDescriptor streamDescriptor = transformation.getStreamDescriptor();
switch (transformation.getTransformType()) {
case UPDATE_STREAM -> oldCatalogPerStream.get(streamDescriptor)
.stream(newCatalogPerStream.get(streamDescriptor).getStream());
case ADD_STREAM -> oldCatalogPerStream.put(streamDescriptor, newCatalogPerStream.get(streamDescriptor));
case REMOVE_STREAM -> oldCatalogPerStream.remove(streamDescriptor);
default -> throw new NotSupportedException("Not supported transformation.");
}
});

return new AirbyteCatalog().streams(List.copyOf(oldCatalogPerStream.values()));
}

@VisibleForTesting
static Map<StreamDescriptor, AirbyteStreamAndConfiguration> extractStreamAndConfigPerStreamDescriptor(AirbyteCatalog catalog) {
return catalog.getStreams().stream().collect(Collectors.toMap(
airbyteStreamAndConfiguration -> new StreamDescriptor().name(airbyteStreamAndConfiguration.getStream().getName())
.namespace(airbyteStreamAndConfiguration.getStream().getNamespace()),
airbyteStreamAndConfiguration -> airbyteStreamAndConfiguration));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -116,7 +117,6 @@
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.commons.lang3.NotImplementedException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -231,7 +231,7 @@ void setup() throws JsonValidationException, ConfigNotFoundException, IOExceptio

featureFlagClient = mock(TestClient.class);

schedulerHandler = new SchedulerHandler(
schedulerHandler = spy(new SchedulerHandler(
configRepository,
secretsRepositoryWriter,
synchronousSchedulerClient,
Expand All @@ -244,7 +244,7 @@ void setup() throws JsonValidationException, ConfigNotFoundException, IOExceptio
envVariableFeatureFlags,
webUrlHelper,
actorDefinitionVersionHelper,
featureFlagClient);
featureFlagClient));
}

@Test
Expand Down Expand Up @@ -1324,6 +1324,8 @@ void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn()
io.airbyte.protocol.models.AirbyteCatalog.class);
final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog, sourceDef);

when(featureFlagClient.boolVariation(eq(AutoPropagateSchema.INSTANCE), any()))
.thenReturn(false);
final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request);
assertEquals(catalogDiff1, actual.getCatalogDiff());
assertEquals(expectedActorCatalog, actual.getCatalog());
Expand Down Expand Up @@ -1608,14 +1610,75 @@ void testCancelJob() throws IOException {
}

@Test
void testAutopropagateChange() {
void testAutoPropagateChange() {
final UUID workspaceId = UUID.randomUUID();
when(featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId))).thenReturn(true);
assertThrows(NotImplementedException.class, () -> schedulerHandler.autoPropagateSchemaChange(workspaceId));

when(featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId))).thenReturn(false);
// Test that it doesn't throw
schedulerHandler.autoPropagateSchemaChange(workspaceId);
ConnectionUpdate connectionUpdate = mock(ConnectionUpdate.class);
io.airbyte.api.model.generated.AirbyteCatalog catalog = mock(io.airbyte.api.model.generated.AirbyteCatalog.class);
UUID sourceCatalogId = UUID.randomUUID();
when(featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId)))
.thenReturn(true);
schedulerHandler.autoPropagateSchemaChange(workspaceId, connectionUpdate, catalog, catalog, List.of(), sourceCatalogId);
verify(connectionUpdate).setSyncCatalog(any());
verify(connectionUpdate).setSourceCatalogId(sourceCatalogId);

connectionUpdate = mock(ConnectionUpdate.class);

when(featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId)))
.thenReturn(false);
schedulerHandler.autoPropagateSchemaChange(workspaceId, connectionUpdate, catalog, catalog, List.of(), sourceCatalogId);
verifyNoInteractions(connectionUpdate);
}

@Test
void testDiscoverSchemaWithAutoDetectSchema() throws IOException, JsonValidationException, ConfigNotFoundException {
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId())
.connectionId(UUID.randomUUID());

final SynchronousResponse<UUID> discoverResponse = (SynchronousResponse<UUID>) jobResponse;
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
when(discoverResponse.isSuccess()).thenReturn(true);
when(discoverResponse.getOutput()).thenReturn(UUID.randomUUID());
final ActorCatalog actorCatalog = new ActorCatalog()
.withCatalog(Jsons.jsonNode(airbyteCatalog))
.withCatalogHash("")
.withId(UUID.randomUUID());
when(configRepository.getActorCatalogById(any())).thenReturn(actorCatalog);
when(discoverResponse.getMetadata()).thenReturn(metadata);
when(metadata.isSucceeded()).thenReturn(true);

final ConnectionRead connectionRead = new ConnectionRead();
connectionRead.syncCatalog(new io.airbyte.api.model.generated.AirbyteCatalog())
.connectionId(UUID.randomUUID())
.notifySchemaChanges(false);
final ConnectionReadList connectionReadList = new ConnectionReadList().connections(List.of(connectionRead));
when(connectionsHandler.listConnectionsForSource(source.getSourceId(), false)).thenReturn(connectionReadList);
final CatalogDiff catalogDiff = mock(CatalogDiff.class);
List<StreamTransform> transforms = List.of(
new StreamTransform());
when(catalogDiff.getTransforms()).thenReturn(transforms);
when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff);
final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
.withProtocolVersion(SOURCE_PROTOCOL_VERSION)
.withSourceDefinitionId(source.getSourceDefinitionId());
when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
.thenReturn(sourceDefinition);
when(actorDefinitionVersionHelper.getSourceVersion(sourceDefinition, source.getWorkspaceId(), source.getSourceId()))
.thenReturn(new ActorDefinitionVersion()
.withDockerRepository(SOURCE_DOCKER_REPO)
.withDockerImageTag(SOURCE_DOCKER_TAG));
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
when(configRepository.getActorCatalog(any(), any(), any())).thenReturn(Optional.empty());
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION),
false))
.thenReturn(discoverResponse);
when(envVariableFeatureFlags.autoDisablesFailingConnections()).thenReturn(false);
doNothing().when(schedulerHandler).autoPropagateSchemaChange(any(), any(), any(), any(), any(), any());

schedulerHandler.discoverSchemaForSourceFromSourceId(request);

verify(schedulerHandler).autoPropagateSchemaChange(any(), any(), any(), any(), any(), any());
}

}
Loading

0 comments on commit 69aac2a

Please sign in to comment.