Skip to content

Commit

Permalink
add container orchestrator for normalization (#9020)
Browse files Browse the repository at this point in the history
* initial commit

* improve labeling

* make more generic

* refactor constants

* move port constants

* remove flag

* some final config fixes

* clean up

* oops, didn't check this in

* add dbt orchestrator (#9114)

* respond to pr feedback
  • Loading branch information
jrhizor committed Jan 1, 2022
1 parent 99a2ae6 commit a36a860
Show file tree
Hide file tree
Showing 12 changed files with 757 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,16 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DefaultReplicationWorker;
import io.airbyte.workers.ReplicationWorker;
import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
import io.airbyte.workers.protocols.airbyte.AirbyteMessageTracker;
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource;
import io.airbyte.workers.protocols.airbyte.EmptyAirbyteSource;
import io.airbyte.workers.protocols.airbyte.NamespacingMapper;
import io.airbyte.workers.temporal.sync.DbtLauncherWorker;
import io.airbyte.workers.temporal.sync.NormalizationLauncherWorker;
import io.airbyte.workers.temporal.sync.OrchestratorConstants;
import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -39,123 +25,72 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;

/**
* Entrypoint for the application responsible for launching containers and handling all message
* passing. Currently, this is only implemented for replication but in the future it will be
* available for normalization and dbt. Also, the current version relies on a heartbeat from a
* Temporal worker. This will also be removed in the future so this can run fully async.
* passing for replication, normalization, and dbt. Also, the current version relies on a heartbeat
* from a Temporal worker. This will also be removed in the future so this can run fully async.
*
* This application retrieves most of its configuration from copied files from the calling Temporal
* worker.
*
* This app uses default logging which is directly captured by the calling Temporal worker. In the
* future this will need to independently interact with cloud storage.
*/
@Slf4j
public class ContainerOrchestratorApp {

private static final Logger LOGGER = LoggerFactory.getLogger(ContainerOrchestratorApp.class);

private static void replicationRunner(final Configs configs) throws IOException, WorkerException {

LOGGER.info("Starting replication runner app...");

final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
final ProcessFactory processFactory = getProcessBuilderFactory(configs, workerConfigs);

LOGGER.info("Attempting to retrieve config files...");

final JobRunConfig jobRunConfig =
Jsons.deserialize(Files.readString(Path.of(ReplicationLauncherWorker.INIT_FILE_JOB_RUN_CONFIG)), JobRunConfig.class);

final IntegrationLauncherConfig sourceLauncherConfig =
Jsons.deserialize(Files.readString(Path.of(ReplicationLauncherWorker.INIT_FILE_SOURCE_LAUNCHER_CONFIG)), IntegrationLauncherConfig.class);

final IntegrationLauncherConfig destinationLauncherConfig =
Jsons.deserialize(Files.readString(Path.of(ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG)),
IntegrationLauncherConfig.class);

final StandardSyncInput syncInput =
Jsons.deserialize(Files.readString(Path.of(ReplicationLauncherWorker.INIT_FILE_SYNC_INPUT)), StandardSyncInput.class);

LOGGER.info("Setting up source launcher...");
final IntegrationLauncher sourceLauncher = new AirbyteIntegrationLauncher(
sourceLauncherConfig.getJobId(),
Math.toIntExact(sourceLauncherConfig.getAttemptId()),
sourceLauncherConfig.getDockerImage(),
processFactory,
syncInput.getResourceRequirements());

LOGGER.info("Setting up destination launcher...");
final IntegrationLauncher destinationLauncher = new AirbyteIntegrationLauncher(
destinationLauncherConfig.getJobId(),
Math.toIntExact(destinationLauncherConfig.getAttemptId()),
destinationLauncherConfig.getDockerImage(),
processFactory,
syncInput.getResourceRequirements());

LOGGER.info("Setting up source...");
// reset jobs use an empty source to induce resetting all data in destination.
final AirbyteSource airbyteSource =
sourceLauncherConfig.getDockerImage().equals(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB) ? new EmptyAirbyteSource()
: new DefaultAirbyteSource(workerConfigs, sourceLauncher);

LOGGER.info("Setting up replication worker...");
final ReplicationWorker replicationWorker = new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
airbyteSource,
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
new DefaultAirbyteDestination(workerConfigs, destinationLauncher),
new AirbyteMessageTracker(),
new AirbyteMessageTracker());

LOGGER.info("Running replication worker...");
final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
final ReplicationOutput replicationOutput = replicationWorker.run(syncInput, jobRoot);

LOGGER.info("Sending output...");
// this uses stdout directly because it shouldn't have the logging related prefix
// the replication output is read from the container that launched the runner
System.out.println(Jsons.serialize(replicationOutput));

LOGGER.info("Replication runner complete!");
}

public static void main(final String[] args) throws Exception {
WorkerHeartbeatServer heartbeatServer = null;

try {
// read files that contain all necessary configuration
final String application = Files.readString(Path.of(ReplicationLauncherWorker.INIT_FILE_APPLICATION));
final String application = Files.readString(Path.of(OrchestratorConstants.INIT_FILE_APPLICATION));
final Map<String, String> envMap =
(Map<String, String>) Jsons.deserialize(Files.readString(Path.of(ReplicationLauncherWorker.INIT_FILE_ENV_MAP)), Map.class);
(Map<String, String>) Jsons.deserialize(Files.readString(Path.of(OrchestratorConstants.INIT_FILE_ENV_MAP)), Map.class);

final Configs configs = new EnvConfigs(envMap::get);

heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT);
heartbeatServer.startBackground();

if (application.equals(ReplicationLauncherWorker.REPLICATION)) {
replicationRunner(configs);
} else {
LOGGER.error("Runner failed", new IllegalStateException("Unexpected value: " + application));
System.exit(1);
}
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);
final ProcessFactory processFactory = getProcessBuilderFactory(configs, workerConfigs);
final JobOrchestrator<?> jobOrchestrator = getJobOrchestrator(configs, workerConfigs, processFactory, application);

log.info("Starting {} orchestrator...", jobOrchestrator.getOrchestratorName());
jobOrchestrator.runJob();
log.info("{} orchestrator complete!", jobOrchestrator.getOrchestratorName());
} finally {
if (heartbeatServer != null) {
LOGGER.info("Shutting down heartbeat server...");
log.info("Shutting down heartbeat server...");
heartbeatServer.stop();
}
}

// required to kill kube client
LOGGER.info("Runner closing...");
log.info("Runner closing...");
System.exit(0);
}

private static JobOrchestrator<?> getJobOrchestrator(final Configs configs,
final WorkerConfigs workerConfigs,
final ProcessFactory processFactory,
final String application) {
if (application.equals(ReplicationLauncherWorker.REPLICATION)) {
return new ReplicationJobOrchestrator(configs, workerConfigs, processFactory);
} else if (application.equals(NormalizationLauncherWorker.NORMALIZATION)) {
return new NormalizationJobOrchestrator(configs, workerConfigs, processFactory);
} else if (application.equals(DbtLauncherWorker.DBT)) {
return new DbtJobOrchestrator(configs, workerConfigs, processFactory);
} else {
log.error("Runner failed", new IllegalStateException("Unexpected value: " + application));
System.exit(1);
throw new IllegalStateException(); // should never be reached, but necessary to compile
}
}

/**
* Creates a process builder factory that will be used to create connector containers/pods.
*/
Expand All @@ -164,11 +99,11 @@ private static ProcessFactory getProcessBuilderFactory(final Configs configs, fi
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + WorkerApp.KUBE_HEARTBEAT_PORT;
LOGGER.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace());
log.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace());

// this needs to have two ports for the source and two ports for the destination (all four must be
// exposed)
KubePortManagerSingleton.init(ReplicationLauncherWorker.PORTS);
KubePortManagerSingleton.init(OrchestratorConstants.PORTS);

return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, false);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.container_orchestrator;

import io.airbyte.config.Configs;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DbtTransformationRunner;
import io.airbyte.workers.DbtTransformationWorker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker;
import java.nio.file.Path;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DbtJobOrchestrator implements JobOrchestrator<OperatorDbtInput> {

private final Configs configs;
private final WorkerConfigs workerConfigs;
private final ProcessFactory processFactory;

public DbtJobOrchestrator(final Configs configs, final WorkerConfigs workerConfigs, final ProcessFactory processFactory) {
this.configs = configs;
this.workerConfigs = workerConfigs;
this.processFactory = processFactory;
}

@Override
public String getOrchestratorName() {
return "DBT Transformation";
}

@Override
public Class<OperatorDbtInput> getInputClass() {
return OperatorDbtInput.class;
}

@Override
public void runJob() throws Exception {
final JobRunConfig jobRunConfig = readJobRunConfig();
final OperatorDbtInput dbtInput = readInput();

final IntegrationLauncherConfig destinationLauncherConfig = JobOrchestrator.readAndDeserializeFile(
ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG, IntegrationLauncherConfig.class);

log.info("Setting up dbt worker...");
final DbtTransformationWorker worker = new DbtTransformationWorker(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
workerConfigs.getResourceRequirements(),
new DbtTransformationRunner(
workerConfigs,
processFactory, NormalizationRunnerFactory.create(
workerConfigs,
destinationLauncherConfig.getDockerImage(),
processFactory)));;

log.info("Running dbt worker...");
final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
worker.run(dbtInput, jobRoot);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.container_orchestrator;

import io.airbyte.commons.json.Jsons;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.temporal.sync.OrchestratorConstants;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

/**
* The job orchestrator helps abstract over container launcher application differences across
* replication, normalization, and custom dbt operators.
*
* @param <INPUT> job input type
*/
public interface JobOrchestrator<INPUT> {

// used for logging
String getOrchestratorName();

// used to serialize the loaded input
Class<INPUT> getInputClass();

// reads input from a file that was copied to the container launcher
default INPUT readInput() throws IOException {
return readAndDeserializeFile(OrchestratorConstants.INIT_FILE_INPUT, getInputClass());
}

// reads the job run config from a file that was copied to the container launcher
default JobRunConfig readJobRunConfig() throws IOException {
return readAndDeserializeFile(OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, JobRunConfig.class);
}

// the unique logic that belongs to each type of job belongs here
void runJob() throws Exception;

static <T> T readAndDeserializeFile(String path, Class<T> type) throws IOException {
return Jsons.deserialize(Files.readString(Path.of(path)), type);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.container_orchestrator;

import io.airbyte.config.Configs;
import io.airbyte.config.NormalizationInput;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DefaultNormalizationWorker;
import io.airbyte.workers.NormalizationWorker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker;
import java.nio.file.Path;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NormalizationJobOrchestrator implements JobOrchestrator<NormalizationInput> {

private final Configs configs;
private final WorkerConfigs workerConfigs;
private final ProcessFactory processFactory;

public NormalizationJobOrchestrator(final Configs configs, final WorkerConfigs workerConfigs, final ProcessFactory processFactory) {
this.configs = configs;
this.workerConfigs = workerConfigs;
this.processFactory = processFactory;
}

@Override
public String getOrchestratorName() {
return "Normalization";
}

@Override
public Class<NormalizationInput> getInputClass() {
return NormalizationInput.class;
}

@Override
public void runJob() throws Exception {
final JobRunConfig jobRunConfig = readJobRunConfig();
final NormalizationInput normalizationInput = readInput();

final IntegrationLauncherConfig destinationLauncherConfig = JobOrchestrator.readAndDeserializeFile(
ReplicationLauncherWorker.INIT_FILE_DESTINATION_LAUNCHER_CONFIG, IntegrationLauncherConfig.class);

log.info("Setting up normalization worker...");
final NormalizationWorker normalizationWorker = new DefaultNormalizationWorker(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
NormalizationRunnerFactory.create(
workerConfigs,
destinationLauncherConfig.getDockerImage(),
processFactory),
configs.getWorkerEnvironment());

log.info("Running normalization worker...");
final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
normalizationWorker.run(normalizationInput, jobRoot);

}

}

0 comments on commit a36a860

Please sign in to comment.