Skip to content

Commit

Permalink
inject envvariable feature flags
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Nov 30, 2022
1 parent 2805300 commit 527d2fc
Show file tree
Hide file tree
Showing 17 changed files with 69 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.commons.temporal;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.temporal.exception.DeletedWorkflowException;
import io.airbyte.commons.temporal.exception.UnreachableWorkflowException;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
Expand Down Expand Up @@ -62,9 +63,10 @@ public void deleteWorkflowIfItExist(final WorkflowClient client,
*/
public ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc> signalMethod)
final Function<ConnectionManagerWorkflow, Proc> signalMethod,
final EnvVariableFeatureFlags envVariableFeatureFlags)
throws DeletedWorkflowException {
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.empty());
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.empty(), envVariableFeatureFlags);
}

/**
Expand All @@ -85,9 +87,10 @@ public ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final Workfl
public <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc1<T>> signalMethod,
final T signalArgument)
final T signalArgument,
final EnvVariableFeatureFlags envVariableFeatureFlags)
throws DeletedWorkflowException {
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.of(signalArgument));
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.of(signalArgument), envVariableFeatureFlags);
}

// This method unifies the logic of the above two, by using the optional signalArgument parameter to
Expand All @@ -98,7 +101,8 @@ public <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final Wo
private <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, ? extends TemporalFunctionalInterfaceMarker> signalMethod,
final Optional<T> signalArgument)
final Optional<T> signalArgument,
final EnvVariableFeatureFlags envVariableFeatureFlags)
throws DeletedWorkflowException {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionManagerWorkflow(client, connectionId);
Expand Down Expand Up @@ -127,7 +131,7 @@ private <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final W
final ConnectionUpdaterInput startWorkflowInput = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId);

final BatchRequest batchRequest = client.newSignalWithStartRequest();
batchRequest.add(connectionManagerWorkflow::run, startWorkflowInput);
batchRequest.add(connectionManagerWorkflow::run, startWorkflowInput, envVariableFeatureFlags);

// retrieve the signal from the lambda
final TemporalFunctionalInterfaceMarker signal = signalMethod.apply(connectionManagerWorkflow);
Expand Down Expand Up @@ -161,10 +165,12 @@ public void safeTerminateWorkflow(final WorkflowClient client, final UUID connec
safeTerminateWorkflow(client, getConnectionManagerName(connectionId), reason);
}

public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client,
final UUID connectionId,
final EnvVariableFeatureFlags envVariableFeatureFlags) {
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput input = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId);
WorkflowClient.start(connectionManagerWorkflow::run, input);
WorkflowClient.start(connectionManagerWorkflow::run, input, envVariableFeatureFlags);

return connectionManagerWorkflow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void restartClosedWorkflowByStatus(final WorkflowExecutionStatus executio
nonRunningWorkflow.forEach(connectionId -> {
connectionManagerUtils.safeTerminateWorkflow(client, connectionId, "Terminating workflow in "
+ "unreachable state before starting a new workflow for this connection");
connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId);
connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId, envVariableFeatureFlags);
});
}

Expand Down Expand Up @@ -214,7 +214,8 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) {
}

try {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::submitManualSync);
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::submitManualSync,
envVariableFeatureFlags);
} catch (final DeletedWorkflowException e) {
log.error("Can't sync a deleted connection.", e);
return new ManualOperationResult(
Expand Down Expand Up @@ -247,7 +248,7 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
final long jobId = connectionManagerUtils.getCurrentJobId(client, connectionId);

try {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::cancelJob);
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::cancelJob, envVariableFeatureFlags);
} catch (final DeletedWorkflowException e) {
log.error("Can't cancel a deleted workflow", e);
return new ManualOperationResult(
Expand Down Expand Up @@ -293,9 +294,11 @@ public ManualOperationResult resetConnection(final UUID connectionId,

try {
if (syncImmediatelyAfter) {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnectionAndSkipNextScheduling);
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnectionAndSkipNextScheduling,
envVariableFeatureFlags);
} else {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnection);
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnection,
envVariableFeatureFlags);
}
} catch (final DeletedWorkflowException e) {
log.error("Can't reset a deleted workflow", e);
Expand Down Expand Up @@ -475,7 +478,7 @@ private <T> T getWorkflowStub(final Class<T> workflowClass, final TemporalJobTyp
public ConnectionManagerWorkflow submitConnectionUpdaterAsync(final UUID connectionId) {
log.info("Starting the scheduler temporal wf");
final ConnectionManagerWorkflow connectionManagerWorkflow =
connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId);
connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId, envVariableFeatureFlags);
try {
CompletableFuture.supplyAsync(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.commons.temporal.scheduling;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.SignalMethod;
Expand All @@ -25,7 +26,7 @@ public interface ConnectionManagerWorkflow {
* for scheduling syncs. This workflow will run and then continue running until deleted.
*/
@WorkflowMethod
void run(ConnectionUpdaterInput connectionUpdaterInput);
void run(ConnectionUpdaterInput connectionUpdaterInput, EnvVariableFeatureFlags envVariableFeatureFlags);

/**
* Send a signal that will bypass the waiting time and run a sync. Nothing will happen if a sync is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void testRestartFailed() {
temporalClient.restartClosedWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);
verify(mConnectionManagerUtils).safeTerminateWorkflow(eq(workflowClient), eq(connectionId),
anyString());
verify(mConnectionManagerUtils).startConnectionManagerNoSignal(eq(workflowClient), eq(connectionId));
verify(mConnectionManagerUtils).startConnectionManagerNoSignal(eq(workflowClient), eq(connectionId), any(EnvVariableFeatureFlags.class));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationIdForUpdate(

public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
log.info("discover schema request: " + discoverSchemaRequestBody);
final SourceConnection source = configRepository.getSourceConnection(discoverSchemaRequestBody.getSourceId());
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(source.getSourceDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.TemporalWorkflowUtils;
import io.airbyte.commons.temporal.exception.RetryableException;
Expand Down Expand Up @@ -159,7 +160,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws RetryableException {
public void run(final ConnectionUpdaterInput connectionUpdaterInput, final EnvVariableFeatureFlags envVariableFeatureFlags)
throws RetryableException {
try {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionUpdaterInput.getConnectionId()));

Expand All @@ -170,7 +172,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
recordMetric(new RecordMetricInput(connectionUpdaterInput, Optional.empty(), OssMetricsRegistry.TEMPORAL_WORKFLOW_ATTEMPT, null));

try {
cancellableSyncWorkflow = generateSyncWorkflowRunnable(connectionUpdaterInput);
cancellableSyncWorkflow = generateSyncWorkflowRunnable(connectionUpdaterInput, envVariableFeatureFlags);
cancellableSyncWorkflow.run();
} catch (final CanceledFailure cf) {
// When a scope is cancelled temporal will throw a CanceledFailure as you can see here:
Expand Down Expand Up @@ -208,7 +210,8 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
}

@SuppressWarnings("PMD.EmptyIfStmt")
private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterInput connectionUpdaterInput) {
private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterInput connectionUpdaterInput,
EnvVariableFeatureFlags envVariableFeatureFlags) {
return Workflow.newCancellationScope(() -> {
connectionId = connectionUpdaterInput.getConnectionId();

Expand Down Expand Up @@ -259,7 +262,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
reportFailure(connectionUpdaterInput, checkFailureOutput, FailureCause.CONNECTION);
} else {
try {
standardSyncOutput = runChildWorkflow(jobInputs);
standardSyncOutput = runChildWorkflow(jobInputs, envVariableFeatureFlags);
} catch (JsonValidationException | ConfigNotFoundException | IOException | ApiException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -857,7 +860,7 @@ private void reportJobStarting(final UUID connectionId) {
* since the latter is a long running workflow, in the future, using a different Node pool would
* make sense.
*/
private StandardSyncOutput runChildWorkflow(final GeneratedJobInput jobInputs)
private StandardSyncOutput runChildWorkflow(final GeneratedJobInput jobInputs, EnvVariableFeatureFlags envVariableFeatureFlags)
throws JsonValidationException, ConfigNotFoundException, IOException, ApiException {
final String taskQueue = getSyncTaskQueue();

Expand All @@ -874,7 +877,7 @@ private StandardSyncOutput runChildWorkflow(final GeneratedJobInput jobInputs)
jobInputs.getSourceLauncherConfig(),
jobInputs.getDestinationLauncherConfig(),
jobInputs.getSyncInput(),
connectionId);
connectionId, envVariableFeatureFlags);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
Expand Down Expand Up @@ -127,6 +128,7 @@ class ConnectionManagerWorkflowTest {
mock(WorkflowConfigActivity.class, Mockito.withSettings().withoutAnnotations());
private static final RouteToSyncTaskQueueActivity mRouteToSyncTaskQueueActivity =
mock(RouteToSyncTaskQueueActivity.class, Mockito.withSettings().withoutAnnotations());
private static final EnvVariableFeatureFlags mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class);
private static final String EVENT = "event = ";

private TestWorkflowEnvironment testEnv;
Expand Down Expand Up @@ -1558,7 +1560,7 @@ public boolean matches(final JobCancelledInputWithAttemptNumber arg) {

private static void startWorkflowAndWaitUntilReady(final ConnectionManagerWorkflow workflow, final ConnectionUpdaterInput input)
throws InterruptedException {
WorkflowClient.start(workflow::run, input);
WorkflowClient.start(workflow::run, input, mEnvVariableFeatureFlags);

boolean isReady = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling.testsyncworkflow;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOutput;
Expand All @@ -25,7 +26,8 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig sourceLauncherConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final StandardSyncInput syncInput,
final UUID connectionId) {
final UUID connectionId,
final EnvVariableFeatureFlags envVariableFeatureFlags) {

throw new ActivityFailure(1L, 1L, ACTIVITY_TYPE_DBT_RUN, "someId", RetryState.RETRY_STATE_UNSPECIFIED, "someIdentity", CAUSE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling.testsyncworkflow;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOutput;
Expand All @@ -18,7 +19,8 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig sourceLauncherConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final StandardSyncInput syncInput,
final UUID connectionId) {
final UUID connectionId,
final EnvVariableFeatureFlags envVariableFeatureFlags) {

return new StandardSyncOutput();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling.testsyncworkflow;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOutput;
Expand All @@ -25,7 +26,8 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig sourceLauncherConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final StandardSyncInput syncInput,
final UUID connectionId) {
final UUID connectionId,
final EnvVariableFeatureFlags envVariableFeatureFlags) {

throw new ActivityFailure(1L, 1L, ACTIVITY_TYPE_NORMALIZE, "someId", RetryState.RETRY_STATE_UNSPECIFIED, "someIdentity", CAUSE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.scheduling.testsyncworkflow;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
Expand All @@ -30,7 +31,8 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig sourceLauncherConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final StandardSyncInput syncInput,
final UUID connectionId) {
final UUID connectionId,
final EnvVariableFeatureFlags envVariableFeatureFlags) {

return new StandardSyncOutput()
.withNormalizationSummary(new NormalizationSummary()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling.testsyncworkflow;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOutput;
Expand All @@ -25,7 +26,8 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig sourceLauncherConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final StandardSyncInput syncInput,
final UUID connectionId) {
final UUID connectionId,
final EnvVariableFeatureFlags envVariableFeatureFlags) {

throw new ActivityFailure(1L, 1L, ACTIVITY_TYPE_PERSIST, "someId", RetryState.RETRY_STATE_UNSPECIFIED, "someIdentity", CAUSE);
}
Expand Down
Loading

0 comments on commit 527d2fc

Please sign in to comment.