Skip to content

Commit

Permalink
use launchdarkly flag to control dataplane rollout (#6326)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed May 2, 2023
1 parent aac645e commit 3cf97ee
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

package io.airbyte.commons.temporal.scheduling;

import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.config.Geography;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.ShouldRunOnGkeDataplane;
import io.airbyte.featureflag.Workspace;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Set;
Expand All @@ -25,15 +27,15 @@ public class RouterService {
private final ConfigRepository configRepository;
private final TaskQueueMapper taskQueueMapper;

private final FeatureFlags featureFlags;
private final FeatureFlagClient featureFlagClient;

private static final Set<TemporalJobType> WORKSPACE_ROUTING_JOB_TYPE_SET =
Set.of(TemporalJobType.DISCOVER_SCHEMA, TemporalJobType.CHECK_CONNECTION);

public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper, final FeatureFlags featureFlags) {
public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper, final FeatureFlagClient featureFlagClient) {
this.configRepository = configRepository;
this.taskQueueMapper = taskQueueMapper;
this.featureFlags = featureFlags;
this.featureFlagClient = featureFlagClient;
}

/**
Expand All @@ -43,7 +45,7 @@ public RouterService(final ConfigRepository configRepository, final TaskQueueMap
public String getTaskQueue(final UUID connectionId, final TemporalJobType jobType) throws IOException {
final Geography geography = configRepository.getGeographyForConnection(connectionId);
final UUID workspaceId = configRepository.getStandardWorkspaceFromConnection(connectionId, false).getWorkspaceId();
if (featureFlags.processInGcpDataPlane(workspaceId.toString())) {
if (featureFlagClient.boolVariation(ShouldRunOnGkeDataplane.INSTANCE, new Workspace(workspaceId))) {
return taskQueueMapper.getTaskQueueFlagged(geography, jobType);
} else {
return taskQueueMapper.getTaskQueue(geography, jobType);
Expand All @@ -65,7 +67,7 @@ public String getTaskQueueForWorkspace(final UUID workspaceId, final TemporalJob
}

final Geography geography = configRepository.getGeographyForWorkspace(workspaceId);
if (featureFlags.processInGcpDataPlane(workspaceId.toString())) {
if (featureFlagClient.boolVariation(ShouldRunOnGkeDataplane.INSTANCE, new Workspace(workspaceId))) {
return taskQueueMapper.getTaskQueueFlagged(geography, jobType);
} else {
return taskQueueMapper.getTaskQueue(geography, jobType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

package io.airbyte.commons.features;

import java.util.Arrays;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,16 +70,6 @@ public String fieldSelectionWorkspaces() {
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg);
}

@Override
public boolean processInGcpDataPlane(final String workspaceId) {
boolean isFlagEnabledForAll = getEnvOrDefault(PROCESS_IN_GCP_DATA_PLANE, false, Boolean::parseBoolean);
if (isFlagEnabledForAll) {
return true;
}
Set<String> allowlistedWorkspaceIds = getEnvOrDefault(PROCESS_IN_GCP_DATA_PLANE_WORKSPACE_IDS, Set.of(), this::parseStringToSet);
return allowlistedWorkspaceIds.contains(workspaceId);
}

/**
* Get env variable.
*
Expand All @@ -103,8 +90,4 @@ public <T> T getEnvOrDefault(final String key, final T defaultValue, final Funct
}
}

private Set<String> parseStringToSet(String commaSeparatedArg) {
return Arrays.stream(commaSeparatedArg.split(",")).collect(Collectors.toSet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,4 @@ public interface FeatureFlags {
*/
String fieldSelectionWorkspaces();

boolean processInGcpDataPlane(final String workspaceId);

}
2 changes: 2 additions & 0 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object AutoPropagateSchema : Temporary<Boolean>(key = "autopropagation.enabled",

object CheckConnectionUseApiEnabled : Temporary<Boolean>(key = "check-connection-use-api", default = false)

object ShouldRunOnGkeDataplane : Temporary<Boolean>(key="should-run-on-gke-dataplane", default = false)
/**
* The default value is 3 hours, it is larger than what is configured by default in the airbyte self owned instance.
* The goal is to allow more room for OSS deployment that airbyte can not monitor.
Expand All @@ -58,6 +59,7 @@ object HeartbeatMaxSecondsBetweenMessages : Permanent<String>(key = "heartbeat-m

object ShouldFailSyncIfHeartbeatFailure : Permanent<Boolean>(key = "heartbeat.failSync", default = true)


// NOTE: this is deprecated in favor of FieldSelectionEnabled and will be removed once that flag is fully deployed.
object FieldSelectionWorkspaces : EnvVar(envVar = "FIELD_SELECTION_WORKSPACES") {
override fun enabled(ctx: Context): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mock.Strictness.LENIENT;

import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.config.Geography;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.ShouldRunOnGkeDataplane;
import io.airbyte.featureflag.TestClient;
import io.airbyte.featureflag.Workspace;
import java.io.IOException;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -44,15 +47,15 @@ class RouterServiceTest {
@Mock(strictness = LENIENT)
private TaskQueueMapper mTaskQueueMapper;

@Mock
private FeatureFlags mockFeatureFlag;
private FeatureFlagClient mockFeatureFlagClient;

private RouterService routerService;

@BeforeEach
void init() {
mockFeatureFlagClient = Mockito.mock(TestClient.class);
routerService = new RouterService(mConfigRepository, mTaskQueueMapper,
mockFeatureFlag);
mockFeatureFlagClient);

Mockito.when(mConfigRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, false))
.thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID));
Expand All @@ -68,8 +71,8 @@ void init() {

@Test
void testGetTaskQueue() throws IOException {
Mockito.when(mockFeatureFlag.processInGcpDataPlane(WORKSPACE_ID.toString())).thenReturn(false);
Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.AUTO);
Mockito.when(mockFeatureFlagClient.boolVariation(ShouldRunOnGkeDataplane.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(false);
assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID, TemporalJobType.SYNC));

Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.US);
Expand All @@ -81,7 +84,7 @@ void testGetTaskQueue() throws IOException {

@Test
void testGetTaskQueueBehindFlag() throws IOException {
Mockito.when(mockFeatureFlag.processInGcpDataPlane(WORKSPACE_ID.toString())).thenReturn(true);
Mockito.when(mockFeatureFlagClient.boolVariation(ShouldRunOnGkeDataplane.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(true);

Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.AUTO);
assertEquals(US_FLAGGED_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID, TemporalJobType.SYNC));
Expand All @@ -95,7 +98,7 @@ void testGetTaskQueueBehindFlag() throws IOException {

@Test
void testGetWorkspaceTaskQueue() throws IOException {
Mockito.when(mockFeatureFlag.processInGcpDataPlane(WORKSPACE_ID.toString())).thenReturn(false);
Mockito.when(mockFeatureFlagClient.boolVariation(ShouldRunOnGkeDataplane.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(false);

Mockito.when(mConfigRepository.getGeographyForWorkspace(WORKSPACE_ID)).thenReturn(Geography.AUTO);
assertEquals(US_TASK_QUEUE, routerService.getTaskQueueForWorkspace(WORKSPACE_ID, TemporalJobType.CHECK_CONNECTION));
Expand All @@ -109,7 +112,7 @@ void testGetWorkspaceTaskQueue() throws IOException {

@Test
void testGetWorkspaceTaskQueueBehindFlag() throws IOException {
Mockito.when(mockFeatureFlag.processInGcpDataPlane(WORKSPACE_ID.toString())).thenReturn(true);
Mockito.when(mockFeatureFlagClient.boolVariation(ShouldRunOnGkeDataplane.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(true);

Mockito.when(mConfigRepository.getGeographyForWorkspace(WORKSPACE_ID)).thenReturn(Geography.AUTO);
assertEquals(US_FLAGGED_TASK_QUEUE, routerService.getTaskQueueForWorkspace(WORKSPACE_ID, TemporalJobType.CHECK_CONNECTION));
Expand Down

0 comments on commit 3cf97ee

Please sign in to comment.