Skip to content

Commit

Permalink
Add Feature flag for autopropagation (#6109)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed May 1, 2023
1 parent c9a0a6f commit 4ced9e2
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.featureflag.AutoPropagateSchema;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Workspace;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WebUrlHelper;
import io.airbyte.persistence.job.models.Job;
Expand All @@ -91,6 +94,7 @@
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;

/**
* ScheduleHandler. Javadocs suppressed because api docs should be used as source of truth.
Expand All @@ -117,6 +121,7 @@ public class SchedulerHandler {
private final FeatureFlags envVariableFeatureFlags;
private final WebUrlHelper webUrlHelper;
private final ActorDefinitionVersionHelper actorDefinitionVersionHelper;
private final FeatureFlagClient featureFlagClient;

// TODO: Convert to be fully using micronaut
public SchedulerHandler(final ConfigRepository configRepository,
Expand All @@ -130,7 +135,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final ConnectionsHandler connectionsHandler,
final FeatureFlags envVariableFeatureFlags,
final WebUrlHelper webUrlHelper,
final ActorDefinitionVersionHelper actorDefinitionVersionHelper) {
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
final FeatureFlagClient featureFlagClient) {
this(
configRepository,
secretsRepositoryWriter,
Expand All @@ -143,7 +149,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
connectionsHandler,
envVariableFeatureFlags,
webUrlHelper,
actorDefinitionVersionHelper);
actorDefinitionVersionHelper,
featureFlagClient);
}

@VisibleForTesting
Expand All @@ -158,7 +165,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final ConnectionsHandler connectionsHandler,
final FeatureFlags envVariableFeatureFlags,
final WebUrlHelper webUrlHelper,
final ActorDefinitionVersionHelper actorDefinitionVersionHelper) {
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
final FeatureFlagClient featureFlagClient) {
this.configRepository = configRepository;
this.secretsRepositoryWriter = secretsRepositoryWriter;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -171,6 +179,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.envVariableFeatureFlags = envVariableFeatureFlags;
this.webUrlHelper = webUrlHelper;
this.actorDefinitionVersionHelper = actorDefinitionVersionHelper;
this.featureFlagClient = featureFlagClient;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -507,6 +516,7 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);

if (shouldNotifySchemaChange(diff, connectionRead, discoverSchemaRequestBody)) {
final String url = webUrlHelper.getConnectionUrl(workspaceId, connectionRead.getConnectionId());
eventRunner.sendSchemaChangeNotification(connectionRead.getConnectionId(), url, containsBreakingChange);
Expand All @@ -517,6 +527,13 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
}
}

@VisibleForTesting
void autoPropagateSchemaChange(final UUID workspaceId) {
if (featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId))) {
throw new NotImplementedException();
}
}

private boolean shouldNotifySchemaChange(final CatalogDiff diff,
final ConnectionRead connectionRead,
final SourceDiscoverSchemaRequestBody requestBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.featureflag.AutoPropagateSchema;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.TestClient;
import io.airbyte.featureflag.Workspace;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WebUrlHelper;
import io.airbyte.persistence.job.models.Job;
Expand All @@ -112,6 +116,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.commons.lang3.NotImplementedException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -194,6 +199,7 @@ class SchedulerHandlerTest {
private EnvVariableFeatureFlags envVariableFeatureFlags;
private WebUrlHelper webUrlHelper;
private ActorDefinitionVersionHelper actorDefinitionVersionHelper;
private FeatureFlagClient featureFlagClient;

@BeforeEach
void setup() throws JsonValidationException, ConfigNotFoundException, IOException {
Expand Down Expand Up @@ -223,6 +229,8 @@ void setup() throws JsonValidationException, ConfigNotFoundException, IOExceptio

jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY));

featureFlagClient = mock(TestClient.class);

schedulerHandler = new SchedulerHandler(
configRepository,
secretsRepositoryWriter,
Expand All @@ -235,7 +243,8 @@ void setup() throws JsonValidationException, ConfigNotFoundException, IOExceptio
connectionsHandler,
envVariableFeatureFlags,
webUrlHelper,
actorDefinitionVersionHelper);
actorDefinitionVersionHelper,
featureFlagClient);
}

@Test
Expand Down Expand Up @@ -1598,4 +1607,15 @@ void testCancelJob() throws IOException {
verify(eventRunner).startNewCancellation(connectionId);
}

@Test
void testAutopropagateChange() {
final UUID workspaceId = UUID.randomUUID();
when(featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId))).thenReturn(true);
assertThrows(NotImplementedException.class, () -> schedulerHandler.autoPropagateSchemaChange(workspaceId));

when(featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId))).thenReturn(false);
// Test that it doesn't throw
schedulerHandler.autoPropagateSchemaChange(workspaceId);
}

}
2 changes: 1 addition & 1 deletion airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ object ContainerOrchestratorJavaOpts : Temporary<String>(key = "container-orches

object NewTrialPolicyEnabled : Temporary<Boolean>(key = "billing.newTrialPolicy", default = false)

object AutoPropagateSchema : Temporary<Boolean>(key = "autopropagation.enabled", default = false)
object CheckConnectionUseApiEnabled : Temporary<Boolean>(key = "check-connection-use-api", default = false)


/**
* The default value is 3 hours, it is larger than what is configured by default in the airbyte self owned instance.
* The goal is to allow more room for OSS deployment that airbyte can not monitor.
Expand Down
2 changes: 2 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ flags:
serve: false
- name: validation.removeValidationLimit
serve: false
- name: autopropagation.enabled
serve: false

0 comments on commit 4ced9e2

Please sign in to comment.