From b6103932790ea93551b0fc425030e7fcc2e18bf6 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 1 Mar 2022 15:09:49 -0800 Subject: [PATCH] Extract event from the temporal worker run factory (#10739) Extract of different events that can happen to a sync into a non temporal related interface. --- .../airbyte/scheduler/client/EventRunner.java | 29 +++++++++++ .../scheduler/client/TemporalEventRunner.java | 50 +++++++++++++++++++ .../server/ConfigurationApiFactory.java | 10 ++-- .../java/io/airbyte/server/ServerApp.java | 16 +++--- .../java/io/airbyte/server/ServerFactory.java | 8 +-- .../airbyte/server/apis/ConfigurationApi.java | 10 ++-- .../server/handlers/ConnectionsHandler.java | 18 +++---- .../server/handlers/SchedulerHandler.java | 18 +++---- .../WebBackendConnectionsHandler.java | 8 +-- .../server/apis/ConfigurationApiTest.java | 4 +- .../handlers/ConnectionsHandlerTest.java | 10 ++-- .../server/handlers/SchedulerHandlerTest.java | 12 ++--- .../WebBackendConnectionsHandlerTest.java | 14 +++--- .../workers/temporal/TemporalClient.java | 8 +-- .../worker_run/TemporalWorkerRunFactory.java | 38 -------------- 15 files changed, 144 insertions(+), 109 deletions(-) create mode 100644 airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/EventRunner.java create mode 100644 airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/TemporalEventRunner.java diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/EventRunner.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/EventRunner.java new file mode 100644 index 0000000000000..f1784a8baff40 --- /dev/null +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/EventRunner.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.client; + +import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; +import java.util.Set; +import java.util.UUID; + +public interface EventRunner { + + void createNewSchedulerWorkflow(final UUID connectionId); + + ManualSyncSubmissionResult startNewManualSync(final UUID connectionId); + + ManualSyncSubmissionResult startNewCancelation(final UUID connectionId); + + ManualSyncSubmissionResult resetConnection(final UUID connectionId); + + ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId); + + void deleteConnection(final UUID connectionId); + + void migrateSyncIfNeeded(final Set connectionIds); + + void update(final UUID connectionId); + +} diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/TemporalEventRunner.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/TemporalEventRunner.java new file mode 100644 index 0000000000000..9eb7df68198f7 --- /dev/null +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/TemporalEventRunner.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.client; + +import io.airbyte.workers.temporal.TemporalClient; +import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; +import java.util.Set; +import java.util.UUID; +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class TemporalEventRunner implements EventRunner { + + private final TemporalClient temporalClient; + + public void createNewSchedulerWorkflow(final UUID connectionId) { + temporalClient.submitConnectionUpdaterAsync(connectionId); + } + + public ManualSyncSubmissionResult startNewManualSync(final UUID connectionId) { + return temporalClient.startNewManualSync(connectionId); + } + + public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) { + return temporalClient.startNewCancelation(connectionId); + } + + public ManualSyncSubmissionResult resetConnection(final UUID connectionId) { + return temporalClient.resetConnection(connectionId); + } + + public ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId) { + return temporalClient.synchronousResetConnection(connectionId); + } + + public void deleteConnection(final UUID connectionId) { + temporalClient.deleteConnection(connectionId); + } + + public void migrateSyncIfNeeded(final Set connectionIds) { + temporalClient.migrateSyncIfNeeded(connectionIds); + } + + public void update(final UUID connectionId) { + temporalClient.update(connectionId); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java index 294e944343a76..9139c8274b8e3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java @@ -13,12 +13,12 @@ import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.client.SchedulerJobClient; import io.airbyte.scheduler.client.SynchronousSchedulerClient; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.server.apis.ConfigurationApi; import io.airbyte.workers.WorkerConfigs; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import io.temporal.serviceclient.WorkflowServiceStubs; import java.net.http.HttpClient; import java.nio.file.Path; @@ -47,7 +47,7 @@ public class ConfigurationApiFactory implements Factory { private static AirbyteVersion airbyteVersion; private static HttpClient httpClient; private static FeatureFlags featureFlags; - private static TemporalWorkerRunFactory temporalWorkerRunFactory; + private static EventRunner eventRunner; public static void setValues( final WorkflowServiceStubs temporalService, @@ -69,7 +69,7 @@ public static void setValues( final Path workspaceRoot, final HttpClient httpClient, final FeatureFlags featureFlags, - final TemporalWorkerRunFactory temporalWorkerRunFactory) { + final EventRunner eventRunner) { ConfigurationApiFactory.configRepository = configRepository; ConfigurationApiFactory.jobPersistence = jobPersistence; ConfigurationApiFactory.seed = seed; @@ -89,7 +89,7 @@ public static void setValues( ConfigurationApiFactory.airbyteVersion = airbyteVersion; ConfigurationApiFactory.httpClient = httpClient; ConfigurationApiFactory.featureFlags = featureFlags; - ConfigurationApiFactory.temporalWorkerRunFactory = temporalWorkerRunFactory; + ConfigurationApiFactory.eventRunner = eventRunner; } @Override @@ -115,7 +115,7 @@ public ConfigurationApi provide() { ConfigurationApiFactory.workspaceRoot, ConfigurationApiFactory.httpClient, ConfigurationApiFactory.featureFlags, - ConfigurationApiFactory.temporalWorkerRunFactory); + ConfigurationApiFactory.eventRunner); } @Override diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 5298633a13d1d..dd1972100ebcf 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -31,7 +31,9 @@ import io.airbyte.db.instance.jobs.JobsDatabaseMigrator; import io.airbyte.scheduler.client.DefaultSchedulerJobClient; import io.airbyte.scheduler.client.DefaultSynchronousSchedulerClient; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.client.SchedulerJobClient; +import io.airbyte.scheduler.client.TemporalEventRunner; import io.airbyte.scheduler.persistence.DefaultJobCreator; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; @@ -47,7 +49,6 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.temporal.TemporalClient; import io.airbyte.workers.temporal.TemporalUtils; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.net.http.HttpClient; @@ -191,11 +192,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier); final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(); final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); - final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory( - TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs), - configs.getWorkspaceRoot(), - configs.getAirbyteVersionOrWarning(), - featureFlags); + final EventRunner eventRunner = new TemporalEventRunner( + TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs)); LOGGER.info("Starting server..."); @@ -217,17 +215,17 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con configs.getWorkspaceRoot(), httpClient, featureFlags, - temporalWorkerRunFactory); + eventRunner); } - private static void migrateExistingConnection(final ConfigRepository configRepository, final TemporalWorkerRunFactory temporalWorkerRunFactory) + private static void migrateExistingConnection(final ConfigRepository configRepository, final EventRunner eventRunner) throws JsonValidationException, ConfigNotFoundException, IOException { LOGGER.info("Start migration to the new scheduler..."); final Set connectionIds = configRepository.listStandardSyncs().stream() .filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE) .map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet()); - temporalWorkerRunFactory.migrateSyncIfNeeded(connectionIds); + eventRunner.migrateSyncIfNeeded(connectionIds); LOGGER.info("Done migrating to the new scheduler..."); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java index 4b1cdf7eb6e3d..1990d3cc9e051 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java @@ -13,12 +13,12 @@ import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.client.SchedulerJobClient; import io.airbyte.scheduler.client.SynchronousSchedulerClient; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.server.apis.ConfigurationApi; import io.airbyte.workers.WorkerConfigs; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import io.temporal.serviceclient.WorkflowServiceStubs; import java.net.http.HttpClient; import java.nio.file.Path; @@ -45,7 +45,7 @@ ServerRunnable create(SchedulerJobClient schedulerJobClient, Path workspaceRoot, HttpClient httpClient, FeatureFlags featureFlags, - TemporalWorkerRunFactory temporalWorkerRunFactory); + EventRunner eventRunner); class Api implements ServerFactory { @@ -67,7 +67,7 @@ public ServerRunnable create(final SchedulerJobClient schedulerJobClient, final Path workspaceRoot, final HttpClient httpClient, final FeatureFlags featureFlags, - final TemporalWorkerRunFactory temporalWorkerRunFactory) { + final EventRunner eventRunner) { // set static values for factory ConfigurationApiFactory.setValues( temporalService, @@ -89,7 +89,7 @@ public ServerRunnable create(final SchedulerJobClient schedulerJobClient, workspaceRoot, httpClient, featureFlags, - temporalWorkerRunFactory); + eventRunner); // server configurations final Set> componentClasses = Set.of(ConfigurationApi.class); diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 3f0b19a7fded5..fceb234a81e4c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -92,6 +92,7 @@ import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.client.SchedulerJobClient; import io.airbyte.scheduler.client.SynchronousSchedulerClient; import io.airbyte.scheduler.persistence.JobNotifier; @@ -119,7 +120,6 @@ import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.WorkerConfigs; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.File; import java.io.IOException; @@ -168,7 +168,7 @@ public ConfigurationApi(final ConfigRepository configRepository, final Path workspaceRoot, final HttpClient httpClient, final FeatureFlags featureFlags, - final TemporalWorkerRunFactory temporalWorkerRunFactory) { + final EventRunner eventRunner) { this.workerEnvironment = workerEnvironment; this.logConfigs = logConfigs; this.workspaceRoot = workspaceRoot; @@ -189,13 +189,13 @@ public ConfigurationApi(final ConfigRepository configRepository, jobPersistence, jobNotifier, temporalService, - new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, temporalWorkerRunFactory, featureFlags); + new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, eventRunner, featureFlags); connectionsHandler = new ConnectionsHandler( configRepository, workspaceHelper, trackingClient, - temporalWorkerRunFactory, + eventRunner, featureFlags, workerConfigs); sourceHandler = new SourceHandler(configRepository, schemaValidator, connectionsHandler); @@ -215,7 +215,7 @@ public ConfigurationApi(final ConfigRepository configRepository, schedulerHandler, operationsHandler, featureFlags, - temporalWorkerRunFactory); + eventRunner); healthCheckHandler = new HealthCheckHandler(); archiveHandler = new ArchiveHandler( airbyteVersion, diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index 8bac37471b502..ac30522cf149e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -33,6 +33,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.persistence.WorkspaceHelper; import io.airbyte.server.converters.ApiPojoConverters; import io.airbyte.server.handlers.helpers.ConnectionMatcher; @@ -42,7 +43,6 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.helper.CatalogConverter; import io.airbyte.workers.helper.ConnectionHelper; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import java.io.IOException; import java.util.Collections; import java.util.HashSet; @@ -61,7 +61,7 @@ public class ConnectionsHandler { private final Supplier uuidGenerator; private final WorkspaceHelper workspaceHelper; private final TrackingClient trackingClient; - private final TemporalWorkerRunFactory temporalWorkerRunFactory; + private final EventRunner eventRunner; private final FeatureFlags featureFlags; private final WorkerConfigs workerConfigs; @@ -70,14 +70,14 @@ public class ConnectionsHandler { final Supplier uuidGenerator, final WorkspaceHelper workspaceHelper, final TrackingClient trackingClient, - final TemporalWorkerRunFactory temporalWorkerRunFactory, + final EventRunner eventRunner, final FeatureFlags featureFlags, final WorkerConfigs workerConfigs) { this.configRepository = configRepository; this.uuidGenerator = uuidGenerator; this.workspaceHelper = workspaceHelper; this.trackingClient = trackingClient; - this.temporalWorkerRunFactory = temporalWorkerRunFactory; + this.eventRunner = eventRunner; this.featureFlags = featureFlags; this.workerConfigs = workerConfigs; } @@ -85,14 +85,14 @@ public class ConnectionsHandler { public ConnectionsHandler(final ConfigRepository configRepository, final WorkspaceHelper workspaceHelper, final TrackingClient trackingClient, - final TemporalWorkerRunFactory temporalWorkerRunFactory, + final EventRunner eventRunner, final FeatureFlags featureFlags, final WorkerConfigs workerConfigs) { this(configRepository, UUID::randomUUID, workspaceHelper, trackingClient, - temporalWorkerRunFactory, + eventRunner, featureFlags, workerConfigs); @@ -150,7 +150,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate) if (featureFlags.usesNewScheduler()) { try { LOGGER.info("Starting a connection using the new scheduler"); - temporalWorkerRunFactory.createNewSchedulerWorkflow(connectionId); + eventRunner.createNewSchedulerWorkflow(connectionId); } catch (final Exception e) { LOGGER.error("Start of the temporal connection manager workflow failed", e); configRepository.deleteStandardSyncDefinition(standardSync.getConnectionId()); @@ -214,7 +214,7 @@ public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate) configRepository.writeStandardSync(newConnection); if (featureFlags.usesNewScheduler()) { - temporalWorkerRunFactory.update(connectionUpdate); + eventRunner.update(connectionUpdate.getConnectionId()); } return buildConnectionRead(connectionUpdate.getConnectionId()); @@ -323,7 +323,7 @@ public void deleteConnection(final UUID connectionId) throws ConfigNotFoundException, IOException, JsonValidationException { if (featureFlags.usesNewScheduler()) { // todo (cgardens) - need an interface over this. - temporalWorkerRunFactory.deleteConnection(connectionId); + eventRunner.deleteConnection(connectionId); } else { final ConnectionRead connectionRead = getConnection(connectionId); deleteConnection(connectionRead); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 9012914c575bf..835bc26d590d1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -45,6 +45,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.client.SchedulerJobClient; import io.airbyte.scheduler.client.SynchronousJobMetadata; import io.airbyte.scheduler.client.SynchronousResponse; @@ -61,7 +62,6 @@ import io.airbyte.workers.helper.CatalogConverter; import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; import io.airbyte.workers.temporal.TemporalUtils; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -88,7 +88,7 @@ public class SchedulerHandler { private final JobConverter jobConverter; private final WorkerEnvironment workerEnvironment; private final LogConfigs logConfigs; - private final TemporalWorkerRunFactory temporalWorkerRunFactory; + private final EventRunner eventRunner; private final FeatureFlags featureFlags; public SchedulerHandler(final ConfigRepository configRepository, @@ -100,7 +100,7 @@ public SchedulerHandler(final ConfigRepository configRepository, final OAuthConfigSupplier oAuthConfigSupplier, final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, - final TemporalWorkerRunFactory temporalWorkerRunFactory, + final EventRunner eventRunner, final FeatureFlags featureFlags) { this( configRepository, @@ -114,7 +114,7 @@ public SchedulerHandler(final ConfigRepository configRepository, oAuthConfigSupplier, workerEnvironment, logConfigs, - temporalWorkerRunFactory, + eventRunner, featureFlags, new JobConverter(workerEnvironment, logConfigs)); } @@ -131,7 +131,7 @@ public SchedulerHandler(final ConfigRepository configRepository, final OAuthConfigSupplier oAuthConfigSupplier, final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, - final TemporalWorkerRunFactory temporalWorkerRunFactory, + final EventRunner eventRunner, final FeatureFlags featureFlags, final JobConverter jobConverter) { this.configRepository = configRepository; @@ -145,7 +145,7 @@ public SchedulerHandler(final ConfigRepository configRepository, this.oAuthConfigSupplier = oAuthConfigSupplier; this.workerEnvironment = workerEnvironment; this.logConfigs = logConfigs; - this.temporalWorkerRunFactory = temporalWorkerRunFactory; + this.eventRunner = eventRunner; this.featureFlags = featureFlags; this.jobConverter = jobConverter; } @@ -464,7 +464,7 @@ private ConnectorSpecification getSpecFromDestinationDefinitionId(final UUID des private JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException { final Job job = jobPersistence.getJob(id); - final ManualSyncSubmissionResult cancellationSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope())); + final ManualSyncSubmissionResult cancellationSubmissionResult = eventRunner.startNewCancelation(UUID.fromString(job.getScope())); if (cancellationSubmissionResult.getFailingReason().isPresent()) { throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get()); @@ -475,7 +475,7 @@ private JobInfoRead createNewSchedulerCancellation(final Long id) throws IOExcep } private JobInfoRead createManualRun(final UUID connectionId) throws IOException { - final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.startNewManualSync(connectionId); + final ManualSyncSubmissionResult manualSyncSubmissionResult = eventRunner.startNewManualSync(connectionId); if (manualSyncSubmissionResult.getFailingReason().isPresent()) { throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get()); @@ -487,7 +487,7 @@ private JobInfoRead createManualRun(final UUID connectionId) throws IOException } private JobInfoRead resetConnectionWithNewScheduler(final UUID connectionId) throws IOException { - final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.resetConnection(connectionId); + final ManualSyncSubmissionResult manualSyncSubmissionResult = eventRunner.resetConnection(connectionId); if (manualSyncSubmissionResult.getFailingReason().isPresent()) { throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get()); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 836ebfd378131..82221c347bcbd 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -44,8 +44,8 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.validation.json.JsonValidationException; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -70,7 +70,7 @@ public class WebBackendConnectionsHandler { private final SchedulerHandler schedulerHandler; private final OperationsHandler operationsHandler; private final FeatureFlags featureFlags; - private final TemporalWorkerRunFactory temporalWorkerRunFactory; + private final EventRunner eventRunner; public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) throws ConfigNotFoundException, IOException, JsonValidationException { @@ -274,10 +274,10 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne if (needReset) { // todo (cgardens) - temporalWorkerRunFactory CANNOT be here. - temporalWorkerRunFactory.synchronousResetConnection(webBackendConnectionUpdate.getConnectionId()); + eventRunner.synchronousResetConnection(webBackendConnectionUpdate.getConnectionId()); // todo (cgardens) - temporalWorkerRunFactory CANNOT be here. - temporalWorkerRunFactory.startNewManualSync(webBackendConnectionUpdate.getConnectionId()); + eventRunner.startNewManualSync(webBackendConnectionUpdate.getConnectionId()); connectionRead = connectionsHandler.getConnection(connectionUpdate.getConnectionId()); } } diff --git a/airbyte-server/src/test/java/io/airbyte/server/apis/ConfigurationApiTest.java b/airbyte-server/src/test/java/io/airbyte/server/apis/ConfigurationApiTest.java index d7ac77d1523c6..6875d87f38d09 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/apis/ConfigurationApiTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/apis/ConfigurationApiTest.java @@ -18,11 +18,11 @@ import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.client.SchedulerJobClient; import io.airbyte.scheduler.client.SynchronousSchedulerClient; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.workers.WorkerConfigs; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import io.temporal.serviceclient.WorkflowServiceStubs; import java.net.http.HttpClient; import java.nio.file.Path; @@ -55,7 +55,7 @@ void testImportDefinitions() { Path.of(""), mock(HttpClient.class), mock(FeatureFlags.class), - mock(TemporalWorkerRunFactory.class)); + mock(EventRunner.class)); assertTrue(configurationApi.canImportDefinitons()); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java index 2ada8688995f5..89793ab48eb45 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ConnectionsHandlerTest.java @@ -47,6 +47,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.WorkspaceHelper; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; @@ -54,7 +55,6 @@ import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.helper.CatalogConverter; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -90,7 +90,7 @@ class ConnectionsHandlerTest { private StandardSyncOperation standardSyncOperation; private WorkspaceHelper workspaceHelper; private TrackingClient trackingClient; - private TemporalWorkerRunFactory temporalWorkflowHandler; + private EventRunner eventRunner; private SyncJobFactory jobFactory; private JobPersistence jobPersistence; private LogConfigs logConfigs; @@ -153,7 +153,7 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio workspaceHelper = mock(WorkspaceHelper.class); trackingClient = mock(TrackingClient.class); featureFlags = mock(FeatureFlags.class); - temporalWorkflowHandler = mock(TemporalWorkerRunFactory.class); + eventRunner = mock(EventRunner.class); when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(sourceId)).thenReturn(workspaceId); when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(deletedSourceId)).thenReturn(workspaceId); @@ -173,7 +173,7 @@ void setUp() { uuidGenerator, workspaceHelper, trackingClient, - temporalWorkflowHandler, + eventRunner, featureFlags, workerConfigs); } @@ -357,7 +357,7 @@ void testUpdateConnection(boolean useNewScheduler) throws JsonValidationExceptio verify(configRepository).writeStandardSync(updatedStandardSync); if (useNewScheduler) { - verify(temporalWorkflowHandler).update(connectionUpdate); + verify(eventRunner).update(connectionUpdate.getConnectionId()); } } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index be52d1ac072b9..4e63cf4f6c5b6 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -63,6 +63,7 @@ import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.scheduler.client.SchedulerJobClient; import io.airbyte.scheduler.client.SynchronousJobMetadata; import io.airbyte.scheduler.client.SynchronousResponse; @@ -80,7 +81,6 @@ import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.net.URI; @@ -134,7 +134,7 @@ class SchedulerHandlerTest { private ConfigurationUpdate configurationUpdate; private JsonSchemaValidator jsonSchemaValidator; private JobPersistence jobPersistence; - private TemporalWorkerRunFactory temporalWorkerRunFactory; + private EventRunner eventRunner; private FeatureFlags featureFlags; private JobConverter jobConverter; @@ -153,7 +153,7 @@ void setup() { configRepository = mock(ConfigRepository.class); jobPersistence = mock(JobPersistence.class); final JobNotifier jobNotifier = mock(JobNotifier.class); - temporalWorkerRunFactory = mock(TemporalWorkerRunFactory.class); + eventRunner = mock(EventRunner.class); featureFlags = mock(FeatureFlags.class); when(featureFlags.usesNewScheduler()).thenReturn(false); @@ -172,7 +172,7 @@ void setup() { mock(OAuthConfigSupplier.class), WorkerEnvironment.DOCKER, LogConfigs.EMPTY, - temporalWorkerRunFactory, + eventRunner, featureFlags, jobConverter); } @@ -631,7 +631,7 @@ void testNewSchedulerSync() throws JsonValidationException, ConfigNotFoundExcept .jobId(Optional.of(jobId)) .build(); - when(temporalWorkerRunFactory.startNewManualSync(connectionId)) + when(eventRunner.startNewManualSync(connectionId)) .thenReturn(manualSyncSubmissionResult); doReturn(new JobInfoRead()) @@ -639,7 +639,7 @@ void testNewSchedulerSync() throws JsonValidationException, ConfigNotFoundExcept schedulerHandler.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - verify(temporalWorkerRunFactory).startNewManualSync(connectionId); + verify(eventRunner).startNewManualSync(connectionId); } private static List getOperations(final StandardSync standardSync) { diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 095c16778f99a..c50241637c930 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -65,6 +65,7 @@ import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.scheduler.client.EventRunner; import io.airbyte.server.helpers.ConnectionHelpers; import io.airbyte.server.helpers.DestinationDefinitionHelpers; import io.airbyte.server.helpers.DestinationHelpers; @@ -72,7 +73,6 @@ import io.airbyte.server.helpers.SourceHelpers; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.helper.ConnectionHelper; -import io.airbyte.workers.worker_run.TemporalWorkerRunFactory; import java.io.IOException; import java.lang.reflect.Method; import java.time.Instant; @@ -99,7 +99,7 @@ class WebBackendConnectionsHandlerTest { private WebBackendConnectionRead expected; private WebBackendConnectionRead expectedWithNewSchema; private FeatureFlags featureFlags; - private TemporalWorkerRunFactory temporalWorkerRunFactory; + private EventRunner eventRunner; private ConnectionHelper connectionHelper; @BeforeEach @@ -111,7 +111,7 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE final JobHistoryHandler jobHistoryHandler = mock(JobHistoryHandler.class); schedulerHandler = mock(SchedulerHandler.class); featureFlags = mock(FeatureFlags.class); - temporalWorkerRunFactory = mock(TemporalWorkerRunFactory.class); + eventRunner = mock(EventRunner.class); connectionHelper = mock(ConnectionHelper.class); wbHandler = new WebBackendConnectionsHandler(connectionsHandler, sourceHandler, @@ -120,7 +120,7 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE schedulerHandler, operationsHandler, featureFlags, - temporalWorkerRunFactory); + eventRunner); final StandardSourceDefinition standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); @@ -590,9 +590,9 @@ void testUpdateConnectionWithUpdatedSchemaNewScheduler() throws JsonValidationEx verify(schedulerHandler, times(0)).resetConnection(connectionId); verify(schedulerHandler, times(0)).syncConnection(connectionId); verify(connectionsHandler, times(1)).updateConnection(any()); - final InOrder orderVerifier = inOrder(temporalWorkerRunFactory); - orderVerifier.verify(temporalWorkerRunFactory, times(1)).synchronousResetConnection(connectionId.getConnectionId()); - orderVerifier.verify(temporalWorkerRunFactory, times(1)).startNewManualSync(connectionId.getConnectionId()); + final InOrder orderVerifier = inOrder(eventRunner); + orderVerifier.verify(eventRunner, times(1)).synchronousResetConnection(connectionId.getConnectionId()); + orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId()); } @Test diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index b9c82a441abc6..91cff4612cae4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -6,7 +6,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; -import io.airbyte.api.model.ConnectionUpdate; import io.airbyte.config.Configs; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobDiscoverCatalogConfig; @@ -17,12 +16,10 @@ import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; -import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; -import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; @@ -36,7 +33,6 @@ import io.temporal.client.BatchRequest; import io.temporal.client.WorkflowClient; import io.temporal.serviceclient.WorkflowServiceStubs; -import java.io.IOException; import java.nio.file.Path; import java.util.HashSet; import java.util.Optional; @@ -251,8 +247,8 @@ public void deleteConnection(final UUID connectionId) { connectionManagerWorkflow.deleteConnection(); } - public void update(final ConnectionUpdate connectionUpdate) throws JsonValidationException, ConfigNotFoundException, IOException { - final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionUpdateWorkflow(connectionUpdate.getConnectionId()); + public void update(final UUID connectionId) { + final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionUpdateWorkflow(connectionId); connectionManagerWorkflow.connectionUpdated(); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java index 063a8286dd2d3..015f2013caad8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java @@ -4,7 +4,6 @@ package io.airbyte.workers.worker_run; -import io.airbyte.api.model.ConnectionUpdate; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; @@ -14,19 +13,14 @@ import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary.ReplicationStatus; -import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.scheduler.models.Job; -import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.JobStatus; import io.airbyte.workers.OutputAndStatus; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.temporal.TemporalClient; -import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalResponse; -import java.io.IOException; import java.nio.file.Path; -import java.util.Set; import java.util.UUID; import lombok.AllArgsConstructor; import org.slf4j.Logger; @@ -47,34 +41,6 @@ public WorkerRun create(final Job job) { return WorkerRun.create(workspaceRoot, job.getId(), attemptId, createSupplier(job, attemptId), airbyteVersionOrWarnings); } - public void createNewSchedulerWorkflow(final UUID connectionId) { - temporalClient.submitConnectionUpdaterAsync(connectionId); - } - - public ManualSyncSubmissionResult startNewManualSync(final UUID connectionId) { - return temporalClient.startNewManualSync(connectionId); - } - - public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) { - return temporalClient.startNewCancelation(connectionId); - } - - public ManualSyncSubmissionResult resetConnection(final UUID connectionId) { - return temporalClient.resetConnection(connectionId); - } - - public ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId) { - return temporalClient.synchronousResetConnection(connectionId); - } - - public void deleteConnection(final UUID connectionId) { - temporalClient.deleteConnection(connectionId); - } - - public void migrateSyncIfNeeded(final Set connectionIds) { - temporalClient.migrateSyncIfNeeded(connectionIds); - } - public CheckedSupplier, Exception> createSupplier(final Job job, final int attemptId) { final TemporalJobType temporalJobType = toTemporalJobType(job.getConfigType()); final UUID connectionId = UUID.fromString(job.getScope()); @@ -138,8 +104,4 @@ private OutputAndStatus toOutputAndStatusConnector() { return new OutputAndStatus<>(status, new JobOutput().withSync(null)); } - public void update(final ConnectionUpdate connectionUpdate) throws JsonValidationException, ConfigNotFoundException, IOException { - temporalClient.update(connectionUpdate); - } - }