Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract event from the temporal worker run factory #10739

Merged
merged 4 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.client;

import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import java.util.Set;
import java.util.UUID;

public interface EventRunner {

void createNewSchedulerWorkflow(final UUID connectionId);

ManualSyncSubmissionResult startNewManualSync(final UUID connectionId);

ManualSyncSubmissionResult startNewCancelation(final UUID connectionId);

ManualSyncSubmissionResult resetConnection(final UUID connectionId);

ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId);

void deleteConnection(final UUID connectionId);

void migrateSyncIfNeeded(final Set<UUID> connectionIds);

void update(final UUID connectionId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.client;

import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import java.util.Set;
import java.util.UUID;
import lombok.AllArgsConstructor;

@AllArgsConstructor
public class TemporalEventRunner implements EventRunner {

private final TemporalClient temporalClient;

public void createNewSchedulerWorkflow(final UUID connectionId) {
temporalClient.submitConnectionUpdaterAsync(connectionId);
}

public ManualSyncSubmissionResult startNewManualSync(final UUID connectionId) {
return temporalClient.startNewManualSync(connectionId);
}

public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) {
return temporalClient.startNewCancelation(connectionId);
}

public ManualSyncSubmissionResult resetConnection(final UUID connectionId) {
return temporalClient.resetConnection(connectionId);
}

public ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId) {
return temporalClient.synchronousResetConnection(connectionId);
}

public void deleteConnection(final UUID connectionId) {
temporalClient.deleteConnection(connectionId);
}

public void migrateSyncIfNeeded(final Set<UUID> connectionIds) {
temporalClient.migrateSyncIfNeeded(connectionIds);
}

public void update(final UUID connectionId) {
temporalClient.update(connectionId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.Database;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.net.http.HttpClient;
import java.nio.file.Path;
Expand Down Expand Up @@ -47,7 +47,7 @@ public class ConfigurationApiFactory implements Factory<ConfigurationApi> {
private static AirbyteVersion airbyteVersion;
private static HttpClient httpClient;
private static FeatureFlags featureFlags;
private static TemporalWorkerRunFactory temporalWorkerRunFactory;
private static EventRunner eventRunner;

public static void setValues(
final WorkflowServiceStubs temporalService,
Expand All @@ -69,7 +69,7 @@ public static void setValues(
final Path workspaceRoot,
final HttpClient httpClient,
final FeatureFlags featureFlags,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final EventRunner eventRunner) {
ConfigurationApiFactory.configRepository = configRepository;
ConfigurationApiFactory.jobPersistence = jobPersistence;
ConfigurationApiFactory.seed = seed;
Expand All @@ -89,7 +89,7 @@ public static void setValues(
ConfigurationApiFactory.airbyteVersion = airbyteVersion;
ConfigurationApiFactory.httpClient = httpClient;
ConfigurationApiFactory.featureFlags = featureFlags;
ConfigurationApiFactory.temporalWorkerRunFactory = temporalWorkerRunFactory;
ConfigurationApiFactory.eventRunner = eventRunner;
}

@Override
Expand All @@ -115,7 +115,7 @@ public ConfigurationApi provide() {
ConfigurationApiFactory.workspaceRoot,
ConfigurationApiFactory.httpClient,
ConfigurationApiFactory.featureFlags,
ConfigurationApiFactory.temporalWorkerRunFactory);
ConfigurationApiFactory.eventRunner);
}

@Override
Expand Down
16 changes: 7 additions & 9 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator;
import io.airbyte.scheduler.client.DefaultSchedulerJobClient;
import io.airbyte.scheduler.client.DefaultSynchronousSchedulerClient;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.TemporalEventRunner;
import io.airbyte.scheduler.persistence.DefaultJobCreator;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
Expand All @@ -47,7 +49,6 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalUtils;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -191,11 +192,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(
TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs),
configs.getWorkspaceRoot(),
configs.getAirbyteVersionOrWarning(),
featureFlags);
final EventRunner eventRunner = new TemporalEventRunner(
TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs));

LOGGER.info("Starting server...");

Expand All @@ -217,17 +215,17 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
configs.getWorkspaceRoot(),
httpClient,
featureFlags,
temporalWorkerRunFactory);
eventRunner);
}

private static void migrateExistingConnection(final ConfigRepository configRepository, final TemporalWorkerRunFactory temporalWorkerRunFactory)
private static void migrateExistingConnection(final ConfigRepository configRepository, final EventRunner eventRunner)
throws JsonValidationException, ConfigNotFoundException, IOException {
LOGGER.info("Start migration to the new scheduler...");
final Set<UUID> connectionIds =
configRepository.listStandardSyncs().stream()
.filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE)
.map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet());
temporalWorkerRunFactory.migrateSyncIfNeeded(connectionIds);
eventRunner.migrateSyncIfNeeded(connectionIds);
LOGGER.info("Done migrating to the new scheduler...");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.Database;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.net.http.HttpClient;
import java.nio.file.Path;
Expand All @@ -45,7 +45,7 @@ ServerRunnable create(SchedulerJobClient schedulerJobClient,
Path workspaceRoot,
HttpClient httpClient,
FeatureFlags featureFlags,
TemporalWorkerRunFactory temporalWorkerRunFactory);
EventRunner eventRunner);

class Api implements ServerFactory {

Expand All @@ -67,7 +67,7 @@ public ServerRunnable create(final SchedulerJobClient schedulerJobClient,
final Path workspaceRoot,
final HttpClient httpClient,
final FeatureFlags featureFlags,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final EventRunner eventRunner) {
// set static values for factory
ConfigurationApiFactory.setValues(
temporalService,
Expand All @@ -89,7 +89,7 @@ public ServerRunnable create(final SchedulerJobClient schedulerJobClient,
workspaceRoot,
httpClient,
featureFlags,
temporalWorkerRunFactory);
eventRunner);

// server configurations
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.Database;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.scheduler.persistence.JobNotifier;
Expand Down Expand Up @@ -119,7 +120,6 @@
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -168,7 +168,7 @@ public ConfigurationApi(final ConfigRepository configRepository,
final Path workspaceRoot,
final HttpClient httpClient,
final FeatureFlags featureFlags,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final EventRunner eventRunner) {
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.workspaceRoot = workspaceRoot;
Expand All @@ -189,13 +189,13 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobPersistence,
jobNotifier,
temporalService,
new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, temporalWorkerRunFactory, featureFlags);
new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, eventRunner, featureFlags);

connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
temporalWorkerRunFactory,
eventRunner,
featureFlags,
workerConfigs);
sourceHandler = new SourceHandler(configRepository, schemaValidator, connectionsHandler);
Expand All @@ -215,7 +215,7 @@ public ConfigurationApi(final ConfigRepository configRepository,
schedulerHandler,
operationsHandler,
featureFlags,
temporalWorkerRunFactory);
eventRunner);
healthCheckHandler = new HealthCheckHandler();
archiveHandler = new ArchiveHandler(
airbyteVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.ApiPojoConverters;
import io.airbyte.server.handlers.helpers.ConnectionMatcher;
Expand All @@ -42,7 +43,6 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.helper.CatalogConverter;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -61,7 +61,7 @@ public class ConnectionsHandler {
private final Supplier<UUID> uuidGenerator;
private final WorkspaceHelper workspaceHelper;
private final TrackingClient trackingClient;
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
private final EventRunner eventRunner;
private final FeatureFlags featureFlags;
private final WorkerConfigs workerConfigs;

Expand All @@ -70,29 +70,29 @@ public class ConnectionsHandler {
final Supplier<UUID> uuidGenerator,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final TemporalWorkerRunFactory temporalWorkerRunFactory,
final EventRunner eventRunner,
final FeatureFlags featureFlags,
final WorkerConfigs workerConfigs) {
this.configRepository = configRepository;
this.uuidGenerator = uuidGenerator;
this.workspaceHelper = workspaceHelper;
this.trackingClient = trackingClient;
this.temporalWorkerRunFactory = temporalWorkerRunFactory;
this.eventRunner = eventRunner;
this.featureFlags = featureFlags;
this.workerConfigs = workerConfigs;
}

public ConnectionsHandler(final ConfigRepository configRepository,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final TemporalWorkerRunFactory temporalWorkerRunFactory,
final EventRunner eventRunner,
final FeatureFlags featureFlags,
final WorkerConfigs workerConfigs) {
this(configRepository,
UUID::randomUUID,
workspaceHelper,
trackingClient,
temporalWorkerRunFactory,
eventRunner,
featureFlags,
workerConfigs);

Expand Down Expand Up @@ -150,7 +150,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
if (featureFlags.usesNewScheduler()) {
try {
LOGGER.info("Starting a connection using the new scheduler");
temporalWorkerRunFactory.createNewSchedulerWorkflow(connectionId);
eventRunner.createNewSchedulerWorkflow(connectionId);
} catch (final Exception e) {
LOGGER.error("Start of the temporal connection manager workflow failed", e);
configRepository.deleteStandardSyncDefinition(standardSync.getConnectionId());
Expand Down Expand Up @@ -214,7 +214,7 @@ public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate)
configRepository.writeStandardSync(newConnection);

if (featureFlags.usesNewScheduler()) {
temporalWorkerRunFactory.update(connectionUpdate);
eventRunner.update(connectionUpdate.getConnectionId());
}

return buildConnectionRead(connectionUpdate.getConnectionId());
Expand Down Expand Up @@ -323,7 +323,7 @@ public void deleteConnection(final UUID connectionId)
throws ConfigNotFoundException, IOException, JsonValidationException {
if (featureFlags.usesNewScheduler()) {
// todo (cgardens) - need an interface over this.
temporalWorkerRunFactory.deleteConnection(connectionId);
eventRunner.deleteConnection(connectionId);
} else {
final ConnectionRead connectionRead = getConnection(connectionId);
deleteConnection(connectionRead);
Expand Down