Skip to content

Commit

Permalink
Move Temporal Client to .env (#2675)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Apr 8, 2021
1 parent 6ba62ca commit b5bf9df
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 71 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ TRACKING_STRATEGY=segment
# already exist on the host filesystem and MUST be parents of *_ROOT.
# Issue: https://github.com/airbytehq/airbyte/issues/577
HACK_LOCAL_ROOT_PARENT=/tmp
TEMPORAL_HOST=airbyte-temporal:7233
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public interface Configs {

WorkspaceRetentionConfig getWorkspaceRetentionConfig();

String getTemporalHost();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class EnvConfigs implements Configs {
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";

private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
Expand Down Expand Up @@ -115,62 +116,34 @@ public String getDatabaseUrl() {

@Override
public String getWorkspaceDockerMount() {
final String mount = getEnv.apply(WORKSPACE_DOCKER_MOUNT);
if (mount != null) {
return mount;
}

LOGGER.info(WORKSPACE_DOCKER_MOUNT + " not found, defaulting to " + WORKSPACE_ROOT);
return getWorkspaceRoot().toString();
return getEnvOrDefault(WORKSPACE_DOCKER_MOUNT, WORKSPACE_ROOT);
}

@Override
public String getLocalDockerMount() {
final String mount = getEnv.apply(LOCAL_DOCKER_MOUNT);
if (mount != null) {
return mount;
}

LOGGER.info(LOCAL_DOCKER_MOUNT + " not found, defaulting to " + LOCAL_ROOT);
return getLocalRoot().toString();
return getEnvOrDefault(LOCAL_DOCKER_MOUNT, LOCAL_ROOT);
}

@Override
public String getDockerNetwork() {
final String network = getEnv.apply(DOCKER_NETWORK);
if (network != null) {
return network;
}

LOGGER.info(DOCKER_NETWORK + " not found, defaulting to " + DEFAULT_NETWORK);
return DEFAULT_NETWORK;
return getEnvOrDefault(DOCKER_NETWORK, DEFAULT_NETWORK);
}

@Override
public TrackingStrategy getTrackingStrategy() {
final String trackingStrategy = getEnv.apply(TRACKING_STRATEGY);
if (trackingStrategy == null) {
LOGGER.info("TRACKING_STRATEGY not set, defaulting to " + TrackingStrategy.LOGGING);
return TrackingStrategy.LOGGING;
}

try {
return TrackingStrategy.valueOf(trackingStrategy.toUpperCase());
} catch (IllegalArgumentException e) {
LOGGER.info(trackingStrategy + " not recognized, defaulting to " + TrackingStrategy.LOGGING);
return TrackingStrategy.LOGGING;
}
return getEnvOrDefault(TRACKING_STRATEGY, TrackingStrategy.LOGGING, s -> {
try {
return TrackingStrategy.valueOf(s.toUpperCase());
} catch (IllegalArgumentException e) {
LOGGER.info(s + " not recognized, defaulting to " + TrackingStrategy.LOGGING);
return TrackingStrategy.LOGGING;
}
});
}

@Override
public WorkerEnvironment getWorkerEnvironment() {
final String workerEnvironment = getEnv.apply(WORKER_ENVIRONMENT);
if (workerEnvironment != null) {
return WorkerEnvironment.valueOf(workerEnvironment.toUpperCase());
}

LOGGER.info(WORKER_ENVIRONMENT + " not found, defaulting to " + WorkerEnvironment.DOCKER);
return WorkerEnvironment.DOCKER;
return getEnvOrDefault(WORKER_ENVIRONMENT, WorkerEnvironment.DOCKER, s -> WorkerEnvironment.valueOf(s.toUpperCase()));
}

@Override
Expand All @@ -182,10 +155,23 @@ public WorkspaceRetentionConfig getWorkspaceRetentionConfig() {
return new WorkspaceRetentionConfig(minDays, maxDays, maxSizeMb);
}

public long getEnvOrDefault(String key, long defaultValue) {
@Override
public String getTemporalHost() {
return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233");
}

private String getEnvOrDefault(String key, String defaultValue) {
return getEnvOrDefault(key, defaultValue, Function.identity());
}

private long getEnvOrDefault(String key, long defaultValue) {
return getEnvOrDefault(key, defaultValue, Long::parseLong);
}

private <T> T getEnvOrDefault(String key, T defaultValue, Function<String, T> parser) {
final String value = getEnv.apply(key);
if (value != null) {
return Long.parseLong(value);
return parser.apply(value);
} else {
LOGGER.info(key + " not found, defaulting to " + defaultValue);
return defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalPool;
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
Expand Down Expand Up @@ -83,26 +85,33 @@ public class SchedulerApp {
private final JobPersistence jobPersistence;
private final ConfigRepository configRepository;
private final JobCleaner jobCleaner;
private final TemporalClient temporalClient;
private final WorkflowServiceStubs temporalService;

public SchedulerApp(Path workspaceRoot,
ProcessBuilderFactory pbf,
JobPersistence jobPersistence,
ConfigRepository configRepository,
JobCleaner jobCleaner) {
JobCleaner jobCleaner,
TemporalClient temporalClient,
WorkflowServiceStubs temporalService) {
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.jobCleaner = jobCleaner;
this.temporalClient = temporalClient;
this.temporalService = temporalService;

}

public void start() throws IOException {
final TemporalPool temporalPool = new TemporalPool(workspaceRoot, pbf);
final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, pbf);
temporalPool.run();

final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(TemporalClient.production(workspaceRoot), workspaceRoot);
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository);
final JobSubmitter jobSubmitter = new JobSubmitter(
Expand Down Expand Up @@ -170,6 +179,9 @@ public static void main(String[] args) throws IOException, InterruptedException
final Path workspaceRoot = configs.getWorkspaceRoot();
LOGGER.info("workspaceRoot = " + workspaceRoot);

final String temporalHost = configs.getTemporalHost();
LOGGER.info("temporalHost = " + temporalHost);

LOGGER.info("Creating DB connection pool...");
final Database database = Databases.createPostgresDatabase(
configs.getDatabaseUser(),
Expand Down Expand Up @@ -206,8 +218,11 @@ public static void main(String[] args) throws IOException, InterruptedException
throw new IllegalStateException("Unable to retrieve Airbyte Version, aborting...");
}

final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot);

LOGGER.info("Launching scheduler...");
new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository, jobCleaner).start();
new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository, jobCleaner, temporalClient, temporalService).start();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.apis.ConfigurationApi;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.Map;
import org.glassfish.hk2.api.Factory;
import org.slf4j.MDC;

public class ConfigurationApiFactory implements Factory<ConfigurationApi> {

private static WorkflowServiceStubs temporalService;
private static ConfigRepository configRepository;
private static JobPersistence jobPersistence;
private static SchedulerJobClient schedulerJobClient;
Expand Down Expand Up @@ -73,6 +75,10 @@ public static void setMdc(Map<String, String> mdc) {
ConfigurationApiFactory.mdc = mdc;
}

public static void setTemporalService(final WorkflowServiceStubs temporalService) {
ConfigurationApiFactory.temporalService = temporalService;
}

@Override
public ConfigurationApi provide() {
MDC.setContextMap(mdc);
Expand All @@ -83,7 +89,8 @@ public ConfigurationApi provide() {
ConfigurationApiFactory.schedulerJobClient,
ConfigurationApiFactory.synchronousSchedulerClient,
ConfigurationApiFactory.configs,
ConfigurationApiFactory.archiveTtlManager);
ConfigurationApiFactory.archiveTtlManager,
ConfigurationApiFactory.temporalService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import io.airbyte.server.errors.UncaughtExceptionMapper;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
Expand Down Expand Up @@ -99,10 +101,12 @@ public void start() throws Exception {

ConfigurationApiFactory.setSchedulerJobClient(new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence)));
final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence);
final TemporalClient temporalClient = TemporalClient.production(configs.getWorkspaceRoot());
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost());
final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot());

ConfigurationApiFactory
.setSynchronousSchedulerClient(new SpecCachingSynchronousSchedulerClient(new DefaultSynchronousSchedulerClient(temporalClient, jobTracker)));
ConfigurationApiFactory.setTemporalService(temporalService);
ConfigurationApiFactory.setConfigRepository(configRepository);
ConfigurationApiFactory.setJobPersistence(jobPersistence);
ConfigurationApiFactory.setConfigs(configs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import io.airbyte.server.validators.DockerImageValidator;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
import java.io.IOException;
import javax.validation.Valid;
Expand All @@ -124,17 +125,25 @@ public class ConfigurationApi implements io.airbyte.api.V1Api {
private final LogsHandler logsHandler;
private final OpenApiConfigHandler openApiConfigHandler;
private final Configs configs;
private final WorkflowServiceStubs temporalService;

public ConfigurationApi(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final SchedulerJobClient schedulerJobClient,
final CachingSynchronousSchedulerClient synchronousSchedulerClient,
final Configs configs,
final FileTtlManager archiveTtlManager) {
final FileTtlManager archiveTtlManager,
final WorkflowServiceStubs temporalService) {
this.temporalService = temporalService;
final SpecFetcher specFetcher = new SpecFetcher(synchronousSchedulerClient);
final JsonSchemaValidator schemaValidator = new JsonSchemaValidator();
schedulerHandler =
new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, jobPersistence, configs.getWorkspaceRoot());
schedulerHandler = new SchedulerHandler(
configRepository,
schedulerJobClient,
synchronousSchedulerClient,
jobPersistence,
configs.getWorkspaceRoot(),
temporalService);
final DockerImageValidator dockerImageValidator = new DockerImageValidator(synchronousSchedulerClient);
sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient);
connectionsHandler = new ConnectionsHandler(configRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
import java.util.UUID;
Expand All @@ -85,12 +86,14 @@ public class SchedulerHandler {
private final JsonSchemaValidator jsonSchemaValidator;
private final JobPersistence jobPersistence;
private final Path workspaceRoot;
private final WorkflowServiceStubs temporalService;

public SchedulerHandler(ConfigRepository configRepository,
SchedulerJobClient schedulerJobClient,
SynchronousSchedulerClient synchronousSchedulerClient,
JobPersistence jobPersistence,
Path workspaceRoot) {
Path workspaceRoot,
WorkflowServiceStubs temporalService) {
this(
configRepository,
schedulerJobClient,
Expand All @@ -99,7 +102,8 @@ public SchedulerHandler(ConfigRepository configRepository,
new JsonSchemaValidator(),
new SpecFetcher(synchronousSchedulerClient),
jobPersistence,
workspaceRoot);
workspaceRoot,
temporalService);
}

@VisibleForTesting
Expand All @@ -110,7 +114,8 @@ public SchedulerHandler(ConfigRepository configRepository,
JsonSchemaValidator jsonSchemaValidator,
SpecFetcher specFetcher,
JobPersistence jobPersistence,
Path workspaceRoot) {
Path workspaceRoot,
WorkflowServiceStubs temporalService) {
this.configRepository = configRepository;
this.schedulerJobClient = schedulerJobClient;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -119,6 +124,7 @@ public SchedulerHandler(ConfigRepository configRepository,
this.specFetcher = specFetcher;
this.jobPersistence = jobPersistence;
this.workspaceRoot = workspaceRoot;
this.temporalService = temporalService;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -316,7 +322,7 @@ public JobInfoRead cancelJob(JobIdRequestBody jobIdRequestBody) throws IOExcepti
.setNamespace(TemporalUtils.DEFAULT_NAMESPACE)
.build();

TemporalUtils.TEMPORAL_SERVICE.blockingStub().requestCancelWorkflowExecution(cancelRequest);
temporalService.blockingStub().requestCancelWorkflowExecution(cancelRequest);

return JobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import io.airbyte.server.helpers.SourceHelpers;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
Expand Down Expand Up @@ -148,8 +149,16 @@ void setup() {
configRepository = mock(ConfigRepository.class);
jobPersistence = mock(JobPersistence.class);

schedulerHandler = new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, configurationUpdate,
jsonSchemaValidator, specFetcher, jobPersistence, mock(Path.class));
schedulerHandler = new SchedulerHandler(
configRepository,
schedulerJobClient,
synchronousSchedulerClient,
configurationUpdate,
jsonSchemaValidator,
specFetcher,
jobPersistence,
mock(Path.class),
mock(WorkflowServiceStubs.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class TemporalClient {
private final Path workspaceRoot;
private final WorkflowClient client;

public static TemporalClient production(Path workspaceRoot) {
return new TemporalClient(TemporalUtils.TEMPORAL_CLIENT, workspaceRoot);
public static TemporalClient production(String temporalHost, Path workspaceRoot) {
return new TemporalClient(TemporalUtils.createTemporalClient(temporalHost), workspaceRoot);
}

// todo (cgardens) - there are two sources of truth on workspace root. we need to get this down to
Expand Down
Loading

0 comments on commit b5bf9df

Please sign in to comment.