Skip to content

Commit

Permalink
Bmoric/remove dep server worker (#17894)
Browse files Browse the repository at this point in the history
* test [ci skip]

* Autogenerated files

* Add missing annotation

* Remove unused json2Schema block from worker

* Move tess

* Missing deps and format

* Fix test build

* TMP

* Add missing dependencies

* PR comments

* Tmp

* [ci skip] Tmp

* Fix acceptance test and add the seed dependency

* Fix build

* For diff

* tmp

* Build pass

* make the worker to be  on the platform only

* fix setting.yaml

* Fix pmd

* Fix Cron

* Add chart

* Fix cron

* Fix server build.gradle

* Fix jar conflict

* PR comments

* Add cron micronaut environemnt
  • Loading branch information
benmoriceau committed Oct 17, 2022
1 parent 211b4be commit 7a71c55
Show file tree
Hide file tree
Showing 112 changed files with 1,622 additions and 1,820 deletions.
4 changes: 3 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ LOG_LEVEL=INFO
### APPLICATIONS ###
# Worker #
WORKERS_MICRONAUT_ENVIRONMENTS=control-plane
# Cron #
CRON_MICRONAUT_ENVIRONMENTS=control-plane
# Relevant to scaling.
MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
Expand All @@ -102,4 +104,4 @@ METRIC_CLIENT=
# Useful only when metric client is set to be otel. Must start with http:// or https://.
OTEL_COLLECTOR_ENDPOINT="http://host.docker.internal:4317"

USE_STREAM_CAPABLE_STATE=true
USE_STREAM_CAPABLE_STATE=true
1 change: 1 addition & 0 deletions .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ INTERNAL_API_HOST=airbyte-server:8001
SYNC_JOB_MAX_ATTEMPTS=3
SYNC_JOB_MAX_TIMEOUT_DAYS=3
WORKERS_MICRONAUT_ENVIRONMENTS=control-plane
CRON_MICRONAUT_ENVIRONMENTS=control-plane

# Sentry
SENTRY_DSN=""
Expand Down
22 changes: 4 additions & 18 deletions airbyte-commons-temporal/build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import org.jsonschema2pojo.SourceType

plugins {
id "java-library"
id 'com.github.eirnym.js2p' version '1.0'
}

dependencies {
Expand All @@ -18,27 +15,16 @@ dependencies {
testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor


implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-worker-models')

testImplementation 'io.temporal:temporal-testing:1.8.1'
// Needed to be able to mock final class
testImplementation 'org.mockito:mockito-inline:4.7.0'
}

jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/workers_models")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
removeOldOutput = true

targetPackage = 'io.airbyte.persistence.job.models'

useLongIntegers = true
generateBuilders = true
includeConstructors = false
includeSetters = true
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@

package io.airbyte.commons.temporal;

import io.airbyte.commons.temporal.exception.DeletedWorkflowException;
import io.airbyte.commons.temporal.exception.UnreachableWorkflowException;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.client.BatchRequest;
import io.temporal.client.WorkflowClient;
import io.temporal.workflow.Functions.Proc;
import io.temporal.workflow.Functions.Proc1;
import io.temporal.workflow.Functions.TemporalFunctionalInterfaceMarker;
import jakarta.inject.Singleton;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -17,6 +30,105 @@
@Slf4j
public class ConnectionManagerUtils {

/**
* Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection.
*
* If the workflow is unreachable, this will restart the workflow and send the signal in a single
* batched request. Batching is used to avoid race conditions between starting the workflow and
* executing the signal.
*
* @param client the WorkflowClient for interacting with temporal
* @param connectionId the connection ID to execute this operation for
* @param signalMethod a function that takes in a connection manager workflow and executes a signal
* method on it, with no arguments
* @return the healthy connection manager workflow that was signaled
* @throws DeletedWorkflowException if the connection manager workflow was deleted
*/
public ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc> signalMethod)
throws DeletedWorkflowException {
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.empty());
}

/**
* Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection.
*
* If the workflow is unreachable, this will restart the workflow and send the signal in a single
* batched request. Batching is used to avoid race conditions between starting the workflow and
* executing the signal.
*
* @param client the WorkflowClient for interacting with temporal
* @param connectionId the connection ID to execute this operation for
* @param signalMethod a function that takes in a connection manager workflow and executes a signal
* method on it, with 1 argument
* @param signalArgument the single argument to be input to the signal
* @return the healthy connection manager workflow that was signaled
* @throws DeletedWorkflowException if the connection manager workflow was deleted
*/
public <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc1<T>> signalMethod,
final T signalArgument)
throws DeletedWorkflowException {
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.of(signalArgument));
}

// This method unifies the logic of the above two, by using the optional signalArgument parameter to
// indicate if an argument is being provided to the signal or not.
// Keeping this private and only exposing the above methods outside this class provides a strict
// type enforcement for external calls, and means this method can assume consistent type
// implementations for both cases.
private <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, ? extends TemporalFunctionalInterfaceMarker> signalMethod,
final Optional<T> signalArgument)
throws DeletedWorkflowException {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionManagerWorkflow(client, connectionId);
log.info("Retrieved existing connection manager workflow for connection {}. Executing signal.", connectionId);
// retrieve the signal from the lambda
final TemporalFunctionalInterfaceMarker signal = signalMethod.apply(connectionManagerWorkflow);
// execute the signal
if (signalArgument.isPresent()) {
((Proc1<T>) signal).apply(signalArgument.get());
} else {
((Proc) signal).apply();
}
return connectionManagerWorkflow;
} catch (final UnreachableWorkflowException e) {
log.error(
String.format(
"Failed to retrieve ConnectionManagerWorkflow for connection %s. Repairing state by creating new workflow and starting with the signal.",
connectionId),
e);

// in case there is an existing workflow in a bad state, attempt to terminate it first before
// starting a new workflow
safeTerminateWorkflow(client, connectionId, "Terminating workflow in unreachable state before starting a new workflow for this connection");

final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput startWorkflowInput = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId);

final BatchRequest batchRequest = client.newSignalWithStartRequest();
batchRequest.add(connectionManagerWorkflow::run, startWorkflowInput);

// retrieve the signal from the lambda
final TemporalFunctionalInterfaceMarker signal = signalMethod.apply(connectionManagerWorkflow);
// add signal to batch request
if (signalArgument.isPresent()) {
batchRequest.add((Proc1<T>) signal, signalArgument.get());
} else {
batchRequest.add((Proc) signal);
}

client.signalWithStart(batchRequest);
log.info("Connection manager workflow for connection {} has been started and signaled.", connectionId);

return connectionManagerWorkflow;
}
}

void safeTerminateWorkflow(final WorkflowClient client, final String workflowId, final String reason) {
log.info("Attempting to terminate existing workflow for workflowId {}.", workflowId);
try {
Expand All @@ -33,10 +145,6 @@ public void safeTerminateWorkflow(final WorkflowClient client, final UUID connec
safeTerminateWorkflow(client, getConnectionManagerName(connectionId), reason);
}

public String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput input = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId);
Expand All @@ -45,9 +153,89 @@ public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowCl
return connectionManagerWorkflow;
}

/**
* Attempts to retrieve the connection manager workflow for the provided connection.
*
* @param connectionId the ID of the connection whose workflow should be retrieved
* @return the healthy ConnectionManagerWorkflow
* @throws DeletedWorkflowException if the workflow was deleted, according to the workflow state
* @throws UnreachableWorkflowException if the workflow is in an unreachable state
*/
public ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClient client, final UUID connectionId)
throws DeletedWorkflowException, UnreachableWorkflowException {

final ConnectionManagerWorkflow connectionManagerWorkflow;
final WorkflowState workflowState;
final WorkflowExecutionStatus workflowExecutionStatus;
try {
connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId));
workflowState = connectionManagerWorkflow.getState();
workflowExecutionStatus = getConnectionManagerWorkflowStatus(client, connectionId);
} catch (final Exception e) {
throw new UnreachableWorkflowException(
String.format("Failed to retrieve ConnectionManagerWorkflow for connection %s due to the following error:", connectionId),
e);
}

if (WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED.equals(workflowExecutionStatus)) {
if (workflowState.isDeleted()) {
throw new DeletedWorkflowException(String.format(
"The connection manager workflow for connection %s is deleted, so no further operations cannot be performed on it.",
connectionId));
}

// A non-deleted workflow being in a COMPLETED state is unexpected, and should be corrected
throw new UnreachableWorkflowException(
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to having COMPLETED status.", connectionId));
}

if (workflowState.isQuarantined()) {
throw new UnreachableWorkflowException(
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to being in a quarantined state.", connectionId));
}

return connectionManagerWorkflow;
}

boolean isWorkflowStateRunning(final WorkflowClient client, final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class,
getConnectionManagerName(connectionId));
return connectionManagerWorkflow.getState().isRunning();
} catch (final Exception e) {
return false;
}
}

public WorkflowExecutionStatus getConnectionManagerWorkflowStatus(final WorkflowClient workflowClient, final UUID connectionId) {
final DescribeWorkflowExecutionRequest describeWorkflowExecutionRequest = DescribeWorkflowExecutionRequest.newBuilder()
.setExecution(WorkflowExecution.newBuilder()
.setWorkflowId(getConnectionManagerName(connectionId))
.build())
.setNamespace(workflowClient.getOptions().getNamespace()).build();

final DescribeWorkflowExecutionResponse describeWorkflowExecutionResponse = workflowClient.getWorkflowServiceStubs().blockingStub()
.describeWorkflowExecution(describeWorkflowExecutionRequest);

return describeWorkflowExecutionResponse.getWorkflowExecutionInfo().getStatus();
}

public long getCurrentJobId(final WorkflowClient client, final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionManagerWorkflow(client, connectionId);
return connectionManagerWorkflow.getJobInformation().getJobId();
} catch (final Exception e) {
return ConnectionManagerWorkflow.NON_RUNNING_JOB_ID;
}
}

public ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) {
return client.newWorkflowStub(ConnectionManagerWorkflow.class,
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)));
}

public String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

public enum ErrorCode {
UNKNOWN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import java.nio.file.Path;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.workers.temporal.exception.RetryableException;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.List;
Expand Down
Loading

0 comments on commit 7a71c55

Please sign in to comment.