Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break inheritance dependency on java.lang.Process #20235

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.micronaut.core.util.StringUtils;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.ArrayList;
Expand Down Expand Up @@ -186,7 +191,6 @@ public boolean hasExited() {
}
}

@Override
public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException {
// implementation copied from Process.java since this isn't a real Process
long remainingNanos = unit.toNanos(timeout);
Expand Down Expand Up @@ -229,6 +233,56 @@ public KubePodInfo getInfo() {
return kubePodInfo;
}

@Override
public Process toProcess() {
return new Process() {

@Override
public OutputStream getOutputStream() {
try {
final String output = AsyncOrchestratorPodProcess.this.getOutput().orElse("");
final OutputStream os = new BufferedOutputStream(new ByteArrayOutputStream());
os.write(output.getBytes(Charset.defaultCharset()));
return os;
} catch (final Exception e) {
log.warn("Unable to write output to stream.", e);
return OutputStream.nullOutputStream();
}
}

@Override
public InputStream getInputStream() {
return InputStream.nullInputStream();
}

@Override
public InputStream getErrorStream() {
return InputStream.nullInputStream();
}

@Override
public int waitFor() throws InterruptedException {
return AsyncOrchestratorPodProcess.this.waitFor();
}

@Override
public int exitValue() {
return AsyncOrchestratorPodProcess.this.exitValue();
}

@Override
public void destroy() {
AsyncOrchestratorPodProcess.this.destroy();
}

@Override
public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException {
return AsyncOrchestratorPodProcess.this.waitFor(timeout, unit);
}

};
}

private Optional<String> getDocument(final String key) {
return documentStoreClient.read(getInfo().namespace() + "/" + getInfo().name() + "/" + key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@

package io.airbyte.workers.process;

import java.util.concurrent.TimeUnit;

public interface KubePod {

int exitValue();

void destroy();

boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException;

int waitFor() throws InterruptedException;

KubePodInfo getInfo();

Process toProcess();

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
// it is required for the connectors
@SuppressWarnings("PMD.AvoidPrintStackTrace")
// TODO(Davin): Better test for this. See https://github.com/airbytehq/airbyte/issues/3700.
public class KubePodProcess extends Process implements KubePod {
public class KubePodProcess implements KubePod {

private static final Configs configs = new EnvConfigs();

Expand Down Expand Up @@ -623,21 +623,6 @@ private void setupStdOutAndStdErrListeners() {
});
}

@Override
public OutputStream getOutputStream() {
return this.stdin;
}

@Override
public InputStream getInputStream() {
return this.stdout;
}

@Override
public InputStream getErrorStream() {
return this.stderr;
}

/**
* Waits for the Kube Pod backing this process and returns the exit value after closing resources.
*/
Expand All @@ -652,15 +637,6 @@ public int waitFor() throws InterruptedException {
return exitValue();
}

/**
* Waits for the Kube Pod backing this process and returns the exit value until a timeout. Closes
* resources if and only if the timeout is not reached.
*/
@Override
public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException {
davinchia marked this conversation as resolved.
Show resolved Hide resolved
return super.waitFor(timeout, unit);
}

/**
* Immediately terminates the Kube Pod backing this process and cleans up IO resources.
*/
Expand All @@ -679,11 +655,6 @@ public void destroy() {
}
}

@Override
public Info info() {
return new KubePodProcessInfo(podDefinition.getMetadata().getName());
}

private Container getMainContainerFromPodDefinition() {
final Optional<Container> containerOptional = podDefinition.getSpec()
.getContainers()
Expand Down Expand Up @@ -778,6 +749,48 @@ public int exitValue() {
return returnCode;
}

@Override
public Process toProcess() {
return new Process() {

@Override
public OutputStream getOutputStream() {
return KubePodProcess.this.stdin;
}

@Override
public InputStream getInputStream() {
return KubePodProcess.this.stdout;
}

@Override
public InputStream getErrorStream() {
return KubePodProcess.this.stderr;
}

@Override
public int waitFor() throws InterruptedException {
return KubePodProcess.this.waitFor();
}

@Override
public int exitValue() {
return KubePodProcess.this.exitValue();
}

@Override
public void destroy() {
KubePodProcess.this.destroy();
}

@Override
public Info info() {
return new KubePodProcessInfo(podDefinition.getMetadata().getName());
}

};
}

public static ResourceRequirementsBuilder getResourceRequirementsBuilder(final ResourceRequirements resourceRequirements) {
if (resourceRequirements != null) {
final Map<String, Quantity> requestMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public Process create(
workerConfigs.getJobCurlImage(),
MoreMaps.merge(jobMetadata, workerConfigs.getEnvMap()),
internalToExternalPorts,
args);
args).toProcess();
} catch (final Exception e) {
throw new WorkerException(e.getMessage(), e);
}
Expand Down
18 changes: 9 additions & 9 deletions airbyte-webapp/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 2 additions & 10 deletions docs/reference/api/generated-api-html/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,9 @@ <h1>Airbyte Configuration API</h1>
<li>The naming convention for endpoints is: localhost:8000/{VERSION}/{METHOD_FAMILY}/{METHOD_NAME} e.g. <code>localhost:8000/v1/connections/create</code>.</li>
<li>For all <code>update</code> methods, the whole object must be passed in, even the fields that did not change.</li>
</ul>
<p>Change Management:</p>
<p>Authentication (OSS):</p>
<ul>
<li>The major version of the API endpoint can be determined / specified in the URL <code>localhost:8080/v1/connections/create</code></li>
<li>Minor version bumps will be invisible to the end user. The user cannot specify minor versions in requests.</li>
<li>All backwards incompatible changes will happen in major version bumps. We will not make backwards incompatible changes in minor version bumps. Examples of non-breaking changes (includes but not limited to...):
<ul>
<li>Adding fields to request or response bodies.</li>
<li>Adding new HTTP endpoints.</li>
</ul>
</li>
<li>All <code>web_backend</code> APIs are not considered public APIs and are not guaranteeing backwards compatibility.</li>
<li>When authenticating to the Configuration API, you must use Basic Authentication by setting the Authentication Header to Basic and base64 encoding the username and password (which are <code>airbyte</code> and <code>password</code> by default - so base64 encoding <code>airbyte:password</code> results in <code>YWlyYnl0ZTpwYXNzd29yZA==</code>). So the full header reads <code>'Authorization': &quot;Basic YWlyYnl0ZTpwYXNzd29yZA==&quot;</code></li>
</ul>
</div>
<div class="app-desc">More information: <a href="https://openapi-generator.tech">https://openapi-generator.tech</a></div>
Expand Down