Skip to content

Commit

Permalink
Configure kube pod process per job type (#10200)
Browse files Browse the repository at this point in the history
* split workerConfigs and processFactory by job type, env var for check job node selectors

* move status check interval to WorkerConfigs and customize for check worker

* add scaffolding for spec, discover, and sync configs

* optional orElse instead of orElseGet

* add replicationWorkerConfigs with custom resource requirements
  • Loading branch information
pmossman committed Feb 15, 2022
1 parent 838ce14 commit b742a45
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 87 deletions.
Expand Up @@ -10,6 +10,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -245,7 +246,22 @@ public interface Configs {
/**
* Define one or more Job pod node selectors. Each kv-pair is separated by a `,`.
*/
Map<String, String> getJobKubeNodeSelectors();
Optional<Map<String, String>> getJobKubeNodeSelectors();

/**
* Define node selectors for Spec job pods specifically. Each kv-pair is separated by a `,`.
*/
Optional<Map<String, String>> getSpecJobKubeNodeSelectors();

/**
* Define node selectors for Check job pods specifically. Each kv-pair is separated by a `,`.
*/
Optional<Map<String, String>> getCheckJobKubeNodeSelectors();

/**
* Define node selectors for Discover job pods specifically. Each kv-pair is separated by a `,`.
*/
Optional<Map<String, String>> getDiscoverJobKubeNodeSelectors();

/**
* Define the Job pod connector image pull policy.
Expand Down
Expand Up @@ -107,6 +107,16 @@ public class EnvConfigs implements Configs {
public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT";
public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS";

// job-type-specific overrides
public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS";
public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS";
public static final String DISCOVER_JOB_KUBE_NODE_SELECTORS = "DISCOVER_JOB_KUBE_NODE_SELECTORS";

private static final String REPLICATION_ORCHESTRATOR_CPU_REQUEST = "REPLICATION_ORCHESTRATOR_CPU_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_CPU_LIMIT = "REPLICATION_ORCHESTRATOR_CPU_LIMIT";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT";

// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default";
Expand All @@ -129,10 +139,6 @@ public class EnvConfigs implements Configs {
public static final long DEFAULT_MAX_SYNC_WORKERS = 5;

public static final String DEFAULT_NETWORK = "host";
private static final String REPLICATION_ORCHESTRATOR_CPU_REQUEST = "REPLICATION_ORCHESTRATOR_CPU_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_CPU_LIMIT = "REPLICATION_ORCHESTRATOR_CPU_LIMIT";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT";

private final Function<String, String> getEnv;
private final Supplier<Set<String>> getAllEnvKeys;
Expand Down Expand Up @@ -431,22 +437,64 @@ private TolerationPOJO parseToleration(final String tolerationStr) {
}

/**
* Returns a map of node selectors from its own environment variable. The value of the env is a
* string that represents one or more node selector labels. Each kv-pair is separated by a `,`
* Returns a map of node selectors for any job type. Used as a default if a particular job type does
* not define its own node selector environment variable.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, ""));
}

/**
* Returns a map of node selectors for Spec job pods specifically.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getSpecJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(SPEC_JOB_KUBE_NODE_SELECTORS, ""));
}

/**
* Returns a map of node selectors for Check job pods specifically.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getCheckJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(CHECK_JOB_KUBE_NODE_SELECTORS, ""));
}

/**
* Returns a map of node selectors for Discover job pods specifically.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getDiscoverJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(DISCOVER_JOB_KUBE_NODE_SELECTORS, ""));
}

/**
* Parse string containing node selectors into a map. Each kv-pair is separated by a `,`
* <p>
* For example:- The following represents two node selectors
* <p>
* airbyte=server,type=preemptive
*
* @return map containing kv pairs of node selectors
* @param envString string that represents one or more node selector labels.
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Map<String, String> getJobKubeNodeSelectors() {
return Splitter.on(",")
.splitToStream(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, ""))
private Optional<Map<String, String>> getNodeSelectorsFromEnvString(final String envString) {
final Map<String, String> selectors = Splitter.on(",")
.splitToStream(envString)
.filter(s -> !Strings.isNullOrEmpty(s) && s.contains("="))
.map(s -> s.split("="))
.collect(Collectors.toMap(s -> s[0], s -> s[1]));

return selectors.isEmpty() ? Optional.empty() : Optional.of(selectors);
}

@Override
Expand Down
Expand Up @@ -183,21 +183,75 @@ void testworkerKubeTolerations() {
}

@Test
void testworkerKubeNodeSelectors() {
void testJobKubeNodeSelectors() {
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, null);
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of());
Assertions.assertFalse(config.getJobKubeNodeSelectors().isPresent());

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, ",,,");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of());
Assertions.assertFalse(config.getJobKubeNodeSelectors().isPresent());

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("key", "k"));
Assertions.assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("key", "k"));

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "one=two");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("one", "two"));
Assertions.assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("one", "two"));

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
Assertions.assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testSpecKubeNodeSelectors() {
envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, null);
Assertions.assertFalse(config.getSpecJobKubeNodeSelectors().isPresent());

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, ",,,");
Assertions.assertFalse(config.getSpecJobKubeNodeSelectors().isPresent());

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
Assertions.assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("key", "k"));

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "one=two");
Assertions.assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("one", "two"));

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
Assertions.assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testCheckKubeNodeSelectors() {
envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, null);
Assertions.assertFalse(config.getCheckJobKubeNodeSelectors().isPresent());

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, ",,,");
Assertions.assertFalse(config.getCheckJobKubeNodeSelectors().isPresent());

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
Assertions.assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("key", "k"));

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "one=two");
Assertions.assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("one", "two"));

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
Assertions.assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testDiscoverKubeNodeSelectors() {
envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, null);
Assertions.assertFalse(config.getDiscoverJobKubeNodeSelectors().isPresent());

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, ",,,");
Assertions.assertFalse(config.getDiscoverJobKubeNodeSelectors().isPresent());

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
Assertions.assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("key", "k"));

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "one=two");
Assertions.assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("one", "two"));

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
Assertions.assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
Expand Down
68 changes: 46 additions & 22 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Expand Up @@ -91,14 +91,22 @@ public class WorkerApp {
public static final Path STATE_STORAGE_PREFIX = Path.of("/state");

private final Path workspaceRoot;
private final ProcessFactory jobProcessFactory;
private final ProcessFactory defaultProcessFactory;
private final ProcessFactory specProcessFactory;
private final ProcessFactory checkProcessFactory;
private final ProcessFactory discoverProcessFactory;
private final ProcessFactory replicationProcessFactory;
private final SecretsHydrator secretsHydrator;
private final WorkflowServiceStubs temporalService;
private final ConfigRepository configRepository;
private final MaxWorkersConfig maxWorkers;
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final WorkerConfigs workerConfigs;
private final WorkerConfigs defaultWorkerConfigs;
private final WorkerConfigs specWorkerConfigs;
private final WorkerConfigs checkWorkerConfigs;
private final WorkerConfigs discoverWorkerConfigs;
private final WorkerConfigs replicationWorkerConfigs;
private final String airbyteVersion;
private final SyncJobFactory jobFactory;
private final JobPersistence jobPersistence;
Expand Down Expand Up @@ -126,29 +134,30 @@ public void start() {
final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers()));
specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class);
specWorker.registerActivitiesImplementations(
new SpecActivityImpl(workerConfigs, jobProcessFactory, workspaceRoot, workerEnvironment, logConfigs, jobPersistence,
new SpecActivityImpl(specWorkerConfigs, specProcessFactory, workspaceRoot, workerEnvironment, logConfigs, jobPersistence,
airbyteVersion));

final Worker checkConnectionWorker =
factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers()));
checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflowImpl.class);
checkConnectionWorker
.registerActivitiesImplementations(
new CheckConnectionActivityImpl(workerConfigs, jobProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs,
new CheckConnectionActivityImpl(checkWorkerConfigs, checkProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs,
jobPersistence, airbyteVersion));

final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers()));
discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflowImpl.class);
discoverWorker
.registerActivitiesImplementations(
new DiscoverCatalogActivityImpl(workerConfigs, jobProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs,
new DiscoverCatalogActivityImpl(discoverWorkerConfigs, discoverProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment,
logConfigs,
jobPersistence, airbyteVersion));

final NormalizationActivityImpl normalizationActivity =
new NormalizationActivityImpl(
containerOrchestratorConfig,
workerConfigs,
jobProcessFactory,
defaultWorkerConfigs,
defaultProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
Expand All @@ -158,8 +167,8 @@ public void start() {
final DbtTransformationActivityImpl dbtTransformationActivity =
new DbtTransformationActivityImpl(
containerOrchestratorConfig,
workerConfigs,
jobProcessFactory,
defaultWorkerConfigs,
defaultProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
Expand All @@ -171,8 +180,8 @@ public void start() {
final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl(
containerOrchestratorConfig,
workerConfigs,
jobProcessFactory,
replicationWorkerConfigs,
replicationProcessFactory,
secretsHydrator,
workspaceRoot,
workerEnvironment,
Expand All @@ -183,7 +192,7 @@ public void start() {

syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);

final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements());
final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository, defaultWorkerConfigs.getResourceRequirements());

final Worker connectionUpdaterWorker =
factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
Expand Down Expand Up @@ -238,9 +247,7 @@ private ReplicationActivityImpl getReplicationActivityImpl(
airbyteVersion);
}

private static ProcessFactory getJobProcessFactory(final Configs configs) throws IOException {
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);

private static ProcessFactory getJobProcessFactory(final Configs configs, final WorkerConfigs workerConfigs) throws IOException {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final String localIp = InetAddress.getLocalHost().getHostAddress();
Expand Down Expand Up @@ -295,7 +302,18 @@ static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(fina

private static void launchWorkerApp() throws IOException {
final Configs configs = new EnvConfigs();
final WorkerConfigs workerConfigs = new WorkerConfigs(configs);

final WorkerConfigs defaultWorkerConfigs = new WorkerConfigs(configs);
final WorkerConfigs specWorkerConfigs = WorkerConfigs.buildSpecWorkerConfigs(configs);
final WorkerConfigs checkWorkerConfigs = WorkerConfigs.buildCheckWorkerConfigs(configs);
final WorkerConfigs discoverWorkerConfigs = WorkerConfigs.buildDiscoverWorkerConfigs(configs);
final WorkerConfigs replicationWorkerConfigs = WorkerConfigs.buildReplicationWorkerConfigs(configs);

final ProcessFactory defaultProcessFactory = getJobProcessFactory(configs, defaultWorkerConfigs);
final ProcessFactory specProcessFactory = getJobProcessFactory(configs, specWorkerConfigs);
final ProcessFactory checkProcessFactory = getJobProcessFactory(configs, checkWorkerConfigs);
final ProcessFactory discoverProcessFactory = getJobProcessFactory(configs, discoverWorkerConfigs);
final ProcessFactory replicationProcessFactory = getJobProcessFactory(configs, replicationWorkerConfigs);

LogClientSingleton.getInstance().setWorkspaceMdc(configs.getWorkerEnvironment(), configs.getLogConfigs(),
LogClientSingleton.getInstance().getSchedulerLogsRoot(configs.getWorkspaceRoot()));
Expand All @@ -312,8 +330,6 @@ private static void launchWorkerApp() throws IOException {
KubePortManagerSingleton.init(configs.getTemporalWorkerPorts());
}

final ProcessFactory jobProcessFactory = getJobProcessFactory(configs);

final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);

TemporalUtils.configureTemporalNamespace(temporalService);
Expand Down Expand Up @@ -343,7 +359,7 @@ private static void launchWorkerApp() throws IOException {
configRepository);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final SyncJobFactory jobFactory = new DefaultSyncJobFactory(
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()),
new DefaultJobCreator(jobPersistence, configRepository, defaultWorkerConfigs.getResourceRequirements()),
configRepository,
new OAuthConfigSupplier(configRepository, trackingClient));

Expand All @@ -364,7 +380,7 @@ private static void launchWorkerApp() throws IOException {
final ConnectionHelper connectionHelper = new ConnectionHelper(
configRepository,
workspaceHelper,
workerConfigs);
defaultWorkerConfigs);

final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig = getContainerOrchestratorConfig(configs);

Expand All @@ -378,14 +394,22 @@ private static void launchWorkerApp() throws IOException {

new WorkerApp(
workspaceRoot,
jobProcessFactory,
defaultProcessFactory,
specProcessFactory,
checkProcessFactory,
discoverProcessFactory,
replicationProcessFactory,
secretsHydrator,
temporalService,
configRepository,
configs.getMaxWorkers(),
configs.getWorkerEnvironment(),
configs.getLogConfigs(),
workerConfigs,
defaultWorkerConfigs,
specWorkerConfigs,
checkWorkerConfigs,
discoverWorkerConfigs,
replicationWorkerConfigs,
configs.getAirbyteVersionOrWarning(),
jobFactory,
jobPersistence,
Expand Down

0 comments on commit b742a45

Please sign in to comment.