Skip to content

Commit

Permalink
split sync worlflow (#7690)
Browse files Browse the repository at this point in the history
Split the sync workflow into multiple files
  • Loading branch information
benmoriceau committed Nov 8, 2021
1 parent 0a43fb7 commit 291dccb
Show file tree
Hide file tree
Showing 15 changed files with 682 additions and 543 deletions.
16 changes: 10 additions & 6 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
import io.airbyte.workers.temporal.SyncWorkflow;
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.TemporalUtils;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl;
Expand All @@ -30,6 +29,11 @@
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl;
import io.airbyte.workers.temporal.spec.SpecActivityImpl;
import io.airbyte.workers.temporal.spec.SpecWorkflowImpl;
import io.airbyte.workers.temporal.sync.DbtTransformationActivityImpl;
import io.airbyte.workers.temporal.sync.NormalizationActivityImpl;
import io.airbyte.workers.temporal.sync.PersistStateActivityImpl;
import io.airbyte.workers.temporal.sync.ReplicationActivityImpl;
import io.airbyte.workers.temporal.sync.SyncWorkflowImpl;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
Expand Down Expand Up @@ -130,15 +134,15 @@ public void start() {
databasePassword, databaseUrl, airbyteVersion));

final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class);
syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class);
syncWorker.registerActivitiesImplementations(
new SyncWorkflow.ReplicationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, databaseUser,
new ReplicationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, databaseUser,
databasePassword, databaseUrl, airbyteVersion),
new SyncWorkflow.NormalizationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, databaseUser,
new NormalizationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, databaseUser,
databasePassword, databaseUrl, airbyteVersion),
new SyncWorkflow.DbtTransformationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, databaseUser,
new DbtTransformationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, databaseUser,
databasePassword, databaseUrl, airbyteVersion),
new SyncWorkflow.PersistStateActivityImpl(workspaceRoot, configRepository));
new PersistStateActivityImpl(workspaceRoot, configRepository));
factory.start();
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow;
import io.airbyte.workers.temporal.spec.SpecWorkflow;
import io.airbyte.workers.temporal.sync.SyncWorkflow;
import io.temporal.client.WorkflowClient;
import java.nio.file.Path;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

@ActivityInterface
public interface DbtTransformationActivity {

@ActivityMethod
Void run(JobRunConfig jobRunConfig,
IntegrationLauncherConfig destinationLauncherConfig,
ResourceRequirements resourceRequirements,
OperatorDbtInput input);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AirbyteConfigValidator;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DbtTransformationRunner;
import io.airbyte.workers.DbtTransformationWorker;
import io.airbyte.workers.Worker;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.CancellationHandler;
import io.airbyte.workers.temporal.TemporalAttemptExecution;
import java.nio.file.Path;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbtTransformationActivityImpl implements DbtTransformationActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationActivityImpl.class);

private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
private final AirbyteConfigValidator validator;
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final String databaseUser;
private final String databasePassword;
private final String databaseUrl;
private final String airbyteVersion;

public DbtTransformationActivityImpl(final ProcessFactory processFactory,
final SecretsHydrator secretsHydrator,
final Path workspaceRoot,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final String databaseUser,
final String databasePassword,
final String databaseUrl,
final String airbyteVersion) {
this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator(), workerEnvironment, logConfigs, databaseUser,
databasePassword, databaseUrl, airbyteVersion);
}

@VisibleForTesting
DbtTransformationActivityImpl(final ProcessFactory processFactory,
final SecretsHydrator secretsHydrator,
final Path workspaceRoot,
final AirbyteConfigValidator validator,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final String databaseUser,
final String databasePassword,
final String databaseUrl,
final String airbyteVersion) {
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
this.validator = validator;
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.databaseUser = databaseUser;
this.databasePassword = databasePassword;
this.databaseUrl = databaseUrl;
this.airbyteVersion = airbyteVersion;
}

@Override
public Void run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final ResourceRequirements resourceRequirements,
final OperatorDbtInput input) {

final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);

final Supplier<OperatorDbtInput> inputSupplier = () -> {
validator.ensureAsRuntime(ConfigSchema.OPERATOR_DBT_INPUT, Jsons.jsonNode(fullInput));
return fullInput;
};

final TemporalAttemptExecution<OperatorDbtInput, Void> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot, workerEnvironment, logConfigs,
jobRunConfig,
getWorkerFactory(destinationLauncherConfig, jobRunConfig, resourceRequirements),
inputSupplier,
new CancellationHandler.TemporalCancellationHandler(), databaseUser, databasePassword, databaseUrl, airbyteVersion);

return temporalAttemptExecution.get();
}

private CheckedSupplier<Worker<OperatorDbtInput, Void>, Exception> getWorkerFactory(final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig,
final ResourceRequirements resourceRequirements) {
return () -> new DbtTransformationWorker(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
resourceRequirements,
new DbtTransformationRunner(
processFactory, NormalizationRunnerFactory.create(
destinationLauncherConfig.getDockerImage(),
processFactory)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import io.airbyte.config.NormalizationInput;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

@ActivityInterface
public interface NormalizationActivity {

@ActivityMethod
Void normalize(JobRunConfig jobRunConfig,
IntegrationLauncherConfig destinationLauncherConfig,
NormalizationInput input);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AirbyteConfigValidator;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DefaultNormalizationWorker;
import io.airbyte.workers.Worker;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.CancellationHandler;
import io.airbyte.workers.temporal.TemporalAttemptExecution;
import java.nio.file.Path;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NormalizationActivityImpl implements NormalizationActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationActivityImpl.class);

private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
private final AirbyteConfigValidator validator;
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final String databaseUser;
private final String databasePassword;
private final String databaseUrl;
private final String airbyteVersion;

public NormalizationActivityImpl(final ProcessFactory processFactory,
final SecretsHydrator secretsHydrator,
final Path workspaceRoot,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfig,
final String databaseUser,
final String databasePassword,
final String databaseUrl,
final String airbyteVersion) {
this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator(), workerEnvironment, logConfig, databaseUser, databasePassword,
databaseUrl, airbyteVersion);
}

@VisibleForTesting
NormalizationActivityImpl(final ProcessFactory processFactory,
final SecretsHydrator secretsHydrator,
final Path workspaceRoot,
final AirbyteConfigValidator validator,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final String databaseUser,
final String databasePassword,
final String databaseUrl,
final String airbyteVersion) {
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
this.validator = validator;
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.databaseUser = databaseUser;
this.databasePassword = databasePassword;
this.databaseUrl = databaseUrl;
this.airbyteVersion = airbyteVersion;
}

@Override
public Void normalize(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig destinationLauncherConfig,
final NormalizationInput input) {

final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration());
final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig);

final Supplier<NormalizationInput> inputSupplier = () -> {
validator.ensureAsRuntime(ConfigSchema.NORMALIZATION_INPUT, Jsons.jsonNode(fullInput));
return fullInput;
};

final TemporalAttemptExecution<NormalizationInput, Void> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot, workerEnvironment, logConfigs,
jobRunConfig,
getWorkerFactory(destinationLauncherConfig, jobRunConfig),
inputSupplier,
new CancellationHandler.TemporalCancellationHandler(), databaseUser, databasePassword, databaseUrl, airbyteVersion);

return temporalAttemptExecution.get();
}

private CheckedSupplier<Worker<NormalizationInput, Void>, Exception> getWorkerFactory(final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig) {
return () -> new DefaultNormalizationWorker(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
NormalizationRunnerFactory.create(
destinationLauncherConfig.getDockerImage(),
processFactory),
workerEnvironment);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import io.airbyte.config.StandardSyncOutput;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.util.UUID;

@ActivityInterface
public interface PersistStateActivity {

@ActivityMethod
boolean persist(final UUID connectionId, final StandardSyncOutput syncOutput);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;

import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.State;
import io.airbyte.config.persistence.ConfigRepository;
import java.io.IOException;
import java.nio.file.Path;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistStateActivityImpl implements PersistStateActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(PersistStateActivityImpl.class);
private final Path workspaceRoot;
private final ConfigRepository configRepository;

public PersistStateActivityImpl(final Path workspaceRoot, final ConfigRepository configRepository) {
this.workspaceRoot = workspaceRoot;
this.configRepository = configRepository;
}

@Override
public boolean persist(final UUID connectionId, final StandardSyncOutput syncOutput) {
final State state = syncOutput.getState();
if (state != null) {
try {
configRepository.updateConnectionState(connectionId, state);
} catch (final IOException e) {
throw new RuntimeException(e);
}
return true;
} else {
return false;
}
}

}

0 comments on commit 291dccb

Please sign in to comment.