From 711bf53effad0be4be3f8087d76c0dc2a2e7a783 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Wed, 14 Dec 2022 12:34:30 -0500 Subject: [PATCH] Break inheritance dependency on java.lang.Process (#20235) --- .../process/AsyncOrchestratorPodProcess.java | 56 +++++++++++++- .../io/airbyte/workers/process/KubePod.java | 6 +- .../workers/process/KubePodProcess.java | 73 +++++++++++-------- .../workers/process/KubeProcessFactory.java | 2 +- 4 files changed, 101 insertions(+), 36 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java index 72bffe59ba38b2..269a5735fa485a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java @@ -22,7 +22,12 @@ import io.fabric8.kubernetes.api.model.VolumeMountBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.micronaut.core.util.StringUtils; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; import java.nio.file.Path; import java.util.AbstractMap; import java.util.ArrayList; @@ -186,7 +191,6 @@ public boolean hasExited() { } } - @Override public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException { // implementation copied from Process.java since this isn't a real Process long remainingNanos = unit.toNanos(timeout); @@ -229,6 +233,56 @@ public KubePodInfo getInfo() { return kubePodInfo; } + @Override + public Process toProcess() { + return new Process() { + + @Override + public OutputStream getOutputStream() { + try { + final String output = AsyncOrchestratorPodProcess.this.getOutput().orElse(""); + final OutputStream os = new BufferedOutputStream(new ByteArrayOutputStream()); + os.write(output.getBytes(Charset.defaultCharset())); + return os; + } catch (final Exception e) { + log.warn("Unable to write output to stream.", e); + return OutputStream.nullOutputStream(); + } + } + + @Override + public InputStream getInputStream() { + return InputStream.nullInputStream(); + } + + @Override + public InputStream getErrorStream() { + return InputStream.nullInputStream(); + } + + @Override + public int waitFor() throws InterruptedException { + return AsyncOrchestratorPodProcess.this.waitFor(); + } + + @Override + public int exitValue() { + return AsyncOrchestratorPodProcess.this.exitValue(); + } + + @Override + public void destroy() { + AsyncOrchestratorPodProcess.this.destroy(); + } + + @Override + public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException { + return AsyncOrchestratorPodProcess.this.waitFor(timeout, unit); + } + + }; + } + private Optional getDocument(final String key) { return documentStoreClient.read(getInfo().namespace() + "/" + getInfo().name() + "/" + key); } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePod.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePod.java index c2fbcc2e6367a7..1d795b2f97b32b 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePod.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePod.java @@ -4,18 +4,16 @@ package io.airbyte.workers.process; -import java.util.concurrent.TimeUnit; - public interface KubePod { int exitValue(); void destroy(); - boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException; - int waitFor() throws InterruptedException; KubePodInfo getInfo(); + Process toProcess(); + } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java index fb0a7a7a8fb363..769f8eaf159b27 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -101,7 +101,7 @@ // it is required for the connectors @SuppressWarnings("PMD.AvoidPrintStackTrace") // TODO(Davin): Better test for this. See https://github.com/airbytehq/airbyte/issues/3700. -public class KubePodProcess extends Process implements KubePod { +public class KubePodProcess implements KubePod { private static final Configs configs = new EnvConfigs(); @@ -628,21 +628,6 @@ private void setupStdOutAndStdErrListeners() { }); } - @Override - public OutputStream getOutputStream() { - return this.stdin; - } - - @Override - public InputStream getInputStream() { - return this.stdout; - } - - @Override - public InputStream getErrorStream() { - return this.stderr; - } - /** * Waits for the Kube Pod backing this process and returns the exit value after closing resources. */ @@ -657,15 +642,6 @@ public int waitFor() throws InterruptedException { return exitValue(); } - /** - * Waits for the Kube Pod backing this process and returns the exit value until a timeout. Closes - * resources if and only if the timeout is not reached. - */ - @Override - public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException { - return super.waitFor(timeout, unit); - } - /** * Immediately terminates the Kube Pod backing this process and cleans up IO resources. */ @@ -684,11 +660,6 @@ public void destroy() { } } - @Override - public Info info() { - return new KubePodProcessInfo(podDefinition.getMetadata().getName()); - } - private Container getMainContainerFromPodDefinition() { final Optional containerOptional = podDefinition.getSpec() .getContainers() @@ -783,6 +754,48 @@ public int exitValue() { return returnCode; } + @Override + public Process toProcess() { + return new Process() { + + @Override + public OutputStream getOutputStream() { + return KubePodProcess.this.stdin; + } + + @Override + public InputStream getInputStream() { + return KubePodProcess.this.stdout; + } + + @Override + public InputStream getErrorStream() { + return KubePodProcess.this.stderr; + } + + @Override + public int waitFor() throws InterruptedException { + return KubePodProcess.this.waitFor(); + } + + @Override + public int exitValue() { + return KubePodProcess.this.exitValue(); + } + + @Override + public void destroy() { + KubePodProcess.this.destroy(); + } + + @Override + public Info info() { + return new KubePodProcessInfo(podDefinition.getMetadata().getName()); + } + + }; + } + public static ResourceRequirementsBuilder getResourceRequirementsBuilder(final ResourceRequirements resourceRequirements) { if (resourceRequirements != null) { final Map requestMap = new HashMap<>(); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index c0ee04a461e5e5..4f3d83f9890d4c 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -140,7 +140,7 @@ public Process create( workerConfigs.getJobCurlImage(), MoreMaps.merge(jobMetadata, workerConfigs.getEnvMap()), internalToExternalPorts, - args); + args).toProcess(); } catch (final Exception e) { throw new WorkerException(e.getMessage(), e); }