diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java index 09d98789f86..2cb622e6a22 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java @@ -191,7 +191,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate) populateSyncFromLegacySchedule(standardSync, connectionCreate); } final UUID workspaceId = workspaceHelper.getWorkspaceForDestinationId(connectionCreate.getDestinationId()); - if (workspaceId != null && featureFlagClient.enabled(CheckWithCatalog.INSTANCE, new Workspace(workspaceId))) { + if (workspaceId != null && featureFlagClient.boolVariation(CheckWithCatalog.INSTANCE, new Workspace(workspaceId))) { // TODO this is the hook for future check with catalog work LOGGER.info("Entered into Dark Launch Code for Check with Catalog"); } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/ReplicationWorkerFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/ReplicationWorkerFactory.java index 81da6f36876..2cb0025e9f2 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/ReplicationWorkerFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/ReplicationWorkerFactory.java @@ -186,7 +186,7 @@ private static ReplicationWorker createReplicationWorker(final AirbyteSource sou // The latter FeatureFlagHelper will be removed once the flag client is fully deployed. final UUID workspaceId = syncInput.getWorkspaceId(); final boolean fieldSelectionEnabled = workspaceId != null - && (featureFlagClient.enabled(FieldSelectionEnabled.INSTANCE, new Workspace(workspaceId)) + && (featureFlagClient.boolVariation(FieldSelectionEnabled.INSTANCE, new Workspace(workspaceId)) || FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); final boolean removeValidationLimit = workspaceId != null && featureFlagClient.boolVariation(RemoveValidationLimit.INSTANCE, new Workspace(workspaceId)); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/HeartbeatTimeoutChaperone.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/HeartbeatTimeoutChaperone.java index 780d716ada9..af64df475df 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/HeartbeatTimeoutChaperone.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/HeartbeatTimeoutChaperone.java @@ -30,7 +30,7 @@ * that means that the heartbeat has stopped. If this occurs the chaperone cancels the runnable * thread and then throws an exception. If the runnable thread completes first, the chaperone * cancels the heartbeat and then returns. - * + *

* This allows us to run an arbitrary runnable that we can kill if a heartbeat stops. This is useful * in cases like the platform reading from the source. The thread that reads from the source is * allowed to run as long as the heartbeat from the sources is fresh. @@ -111,7 +111,7 @@ public void runWithHeartbeatThread(final CompletableFuture runnableFuture) LOGGER.info("thread status... heartbeat thread: {} , replication thread: {}", heartbeatFuture.isDone(), runnableFuture.isDone()); if (heartbeatFuture.isDone() && !runnableFuture.isDone()) { - if (featureFlagClient.enabled(ShouldFailSyncIfHeartbeatFailure.INSTANCE, new Workspace(workspaceId))) { + if (featureFlagClient.boolVariation(ShouldFailSyncIfHeartbeatFailure.INSTANCE, new Workspace(workspaceId))) { runnableFuture.cancel(true); throw new HeartbeatTimeoutException( String.format("Heartbeat has stopped. Heartbeat freshness threshold: %s secs Actual heartbeat age: %s secs", diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/HeartBeatTimeoutChaperoneTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/HeartBeatTimeoutChaperoneTest.java index d5244495645..a56178891ac 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/HeartBeatTimeoutChaperoneTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/HeartBeatTimeoutChaperoneTest.java @@ -39,7 +39,7 @@ class HeartBeatTimeoutChaperoneTest { @Test void testFailHeartbeat() { - when(featureFlagClient.enabled(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(true); + when(featureFlagClient.boolVariation(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(true); final HeartbeatTimeoutChaperone heartbeatTimeoutChaperone = new HeartbeatTimeoutChaperone( heartbeatMonitor, timeoutCheckDuration, @@ -81,7 +81,7 @@ void testNotFailingHeartbeat() { @Test void testNotFailingHeartbeatIfFalseFlag() { - when(featureFlagClient.enabled(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(false); + when(featureFlagClient.boolVariation(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(false); final HeartbeatTimeoutChaperone heartbeatTimeoutChaperone = new HeartbeatTimeoutChaperone( heartbeatMonitor, timeoutCheckDuration, @@ -108,7 +108,7 @@ void testMonitor() { workspaceId, connectionId, metricClient); - when(featureFlagClient.enabled(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(true); + when(featureFlagClient.boolVariation(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(true); when(heartbeatMonitor.isBeating()).thenReturn(Optional.of(false)); assertDoesNotThrow(() -> CompletableFuture.runAsync(() -> heartbeatTimeoutChaperone.monitor()).get(1000, TimeUnit.MILLISECONDS)); verify(metricClient, times(1)).count(OssMetricsRegistry.SOURCE_HEARTBEAT_FAILURE, 1, @@ -124,7 +124,7 @@ void testMonitorDontFailIfDontStopBeating() { workspaceId, connectionId, metricClient); - when(featureFlagClient.enabled(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(false); + when(featureFlagClient.boolVariation(eq(ShouldFailSyncIfHeartbeatFailure.INSTANCE), any())).thenReturn(false); when(heartbeatMonitor.isBeating()).thenReturn(Optional.of(true), Optional.of(false)); assertDoesNotThrow(() -> CompletableFuture.runAsync(() -> heartbeatTimeoutChaperone.monitor()).get(1000, TimeUnit.MILLISECONDS)); diff --git a/airbyte-featureflag/src/main/kotlin/Client.kt b/airbyte-featureflag/src/main/kotlin/Client.kt index c2cab7929ab..dd0371b7462 100644 --- a/airbyte-featureflag/src/main/kotlin/Client.kt +++ b/airbyte-featureflag/src/main/kotlin/Client.kt @@ -34,12 +34,6 @@ import kotlin.io.path.notExists * of a sealed interface. */ sealed interface FeatureFlagClient { - /** - * Returns true if the flag with the provided context should be enabled. Returns false otherwise. - */ - @Deprecated("use boolVariation instead") - fun enabled(flag: Flag, context: Context): Boolean - /** * Calculates the boolean value of the [flag] for the given [context]. * @@ -107,8 +101,6 @@ class ConfigFileClient(@Property(name = CONFIG_FF_PATH) config: Path?) : Feature } } - override fun enabled(flag: Flag, context: Context) = boolVariation(flag, context) - override fun boolVariation(flag: Flag, context: Context): Boolean { return when (flag) { is EnvVar -> flag.enabled(context) @@ -140,8 +132,6 @@ class ConfigFileClient(@Property(name = CONFIG_FF_PATH) config: Path?) : Feature @Singleton @Requires(property = CONFIG_FF_CLIENT, value = CONFIG_FF_CLIENT_VAL_LAUNCHDARKLY) class LaunchDarklyClient(private val client: LDClient) : FeatureFlagClient { - override fun enabled(flag: Flag, context: Context) = boolVariation(flag, context) - override fun boolVariation(flag: Flag, context: Context): Boolean { return when (flag) { is EnvVar -> flag.enabled(context) @@ -168,8 +158,6 @@ class LaunchDarklyClient(private val client: LDClient) : FeatureFlagClient { * @param [values] is a map of [Flag.key] to its status. */ class TestClient @JvmOverloads constructor(val values: Map = mapOf()) : FeatureFlagClient { - override fun enabled(flag: Flag, context: Context) = boolVariation(flag, context) - override fun boolVariation(flag: Flag, context: Context): Boolean { return when (flag) { is EnvVar -> { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityImpl.java index f0e48dcb795..a2da572a5a2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/FeatureFlagFetchActivityImpl.java @@ -56,10 +56,10 @@ public FeatureFlagFetchOutput getFeatureFlags(final FeatureFlagFetchInput input) // No feature flags are currently in use. // To get value for a feature flag with the workspace context, add it to the workspaceFlags list. - final List workspaceFlags = List.of(CheckConnectionUseApiEnabled.INSTANCE); + final List> workspaceFlags = List.of(CheckConnectionUseApiEnabled.INSTANCE); final Map featureFlags = new HashMap<>(); - for (final Flag flag : workspaceFlags) { - featureFlags.put(flag.getKey(), featureFlagClient.enabled(flag, new Workspace(workspaceId))); + for (final Flag flag : workspaceFlags) { + featureFlags.put(flag.getKey(), featureFlagClient.boolVariation(flag, new Workspace(workspaceId))); } return new FeatureFlagFetchOutput(featureFlags);