Skip to content

Commit

Permalink
fix gke file copy errors (#4214)
Browse files Browse the repository at this point in the history
* fix gke file copy errors

* remove one

* remove unthrown exception
  • Loading branch information
jrhizor committed Jun 18, 2021
1 parent 88d7953 commit f3330a9
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 44 deletions.
Expand Up @@ -52,10 +52,11 @@
import io.airbyte.workers.temporal.TemporalUtils;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -167,13 +168,14 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi
}
}

private static ProcessFactory getProcessBuilderFactory(Configs configs) throws UnknownHostException {
private static ProcessFactory getProcessBuilderFactory(Configs configs) throws IOException {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
final KubernetesClient kubeClient = new DefaultKubernetesClient();
final ApiClient officialClient = Config.defaultClient();
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final BlockingQueue<Integer> workerPorts = new LinkedBlockingDeque<>(configs.getTemporalWorkerPorts());
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
return new KubeProcessFactory("default", kubeClient, kubeHeartbeatUrl, workerPorts);
return new KubeProcessFactory("default", officialClient, fabricClient, kubeHeartbeatUrl, workerPorts);
} else {
return new DockerProcessFactory(
configs.getWorkspaceRoot(),
Expand Down
Expand Up @@ -24,7 +24,6 @@

package io.airbyte.workers.process;

import io.airbyte.commons.io.IOs;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
Expand All @@ -39,15 +38,19 @@
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import io.kubernetes.client.Copy;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -110,7 +113,7 @@ public class KubePodProcess extends Process {
private static final int KILLED_EXIT_CODE = 143;
private static final int STDIN_REMOTE_PORT = 9001;

private final KubernetesClient client;
private final KubernetesClient fabricClient;
private final Pod podDefinition;
// Necessary since it is not possible to retrieve the pod's actual exit code upon termination. This
// is because the Kube API server does not keep
Expand Down Expand Up @@ -222,24 +225,25 @@ private static Container getMain(String image, boolean usesStdin, String entrypo
.build();
}

private static void copyFilesToKubeConfigVolume(KubernetesClient client, String podName, String namespace, Map<String, String> files) {
private static void copyFilesToKubeConfigVolume(ApiClient officialClient, String podName, String namespace, Map<String, String> files) {
List<Map.Entry<String, String>> fileEntries = new ArrayList<>(files.entrySet());

// copy this file last to indicate that the copy has completed
fileEntries.add(new AbstractMap.SimpleEntry<>(SUCCESS_FILE_NAME, ""));

for (Map.Entry<String, String> file : fileEntries) {
Path tmpFile = null;
try {
tmpFile = Path.of(IOs.writeFileToRandomTmpDir(file.getKey(), file.getValue()));

LOGGER.info("Uploading file: " + file.getKey());
var contents = file.getValue().getBytes(StandardCharsets.UTF_8);
var containerPath = Path.of(CONFIG_DIR + "/" + file.getKey());

client.pods().inNamespace(namespace).withName(podName).inContainer(INIT_CONTAINER_NAME)
.file(CONFIG_DIR + "/" + file.getKey())
.upload(tmpFile);
// fabric8 kube client upload doesn't work on gke:
// https://github.com/fabric8io/kubernetes-client/issues/2217
Copy copy = new Copy(officialClient);
copy.copyFileToPod(namespace, podName, INIT_CONTAINER_NAME, contents, containerPath);

} finally {
if (tmpFile != null) {
tmpFile.toFile().delete();
}
} catch (IOException | ApiException e) {
throw new RuntimeException(e);
}
}
}
Expand All @@ -260,7 +264,8 @@ private static void waitForInitPodToRun(KubernetesClient client, Pod podDefiniti
LOGGER.info("Init container ready..");
}

public KubePodProcess(KubernetesClient client,
public KubePodProcess(ApiClient officialClient,
KubernetesClient fabricClient,
Consumer<Integer> portReleaser,
String podName,
String namespace,
Expand All @@ -273,7 +278,7 @@ public KubePodProcess(KubernetesClient client,
final String entrypointOverride,
final String... args)
throws IOException, InterruptedException {
this.client = client;
this.fabricClient = fabricClient;
this.portReleaser = portReleaser;
this.stdoutLocalPort = stdoutLocalPort;
this.stderrLocalPort = stderrLocalPort;
Expand All @@ -283,7 +288,7 @@ public KubePodProcess(KubernetesClient client,
executorService = Executors.newFixedThreadPool(2);
setupStdOutAndStdErrListeners();

String entrypoint = entrypointOverride == null ? getCommandFromImage(client, image, namespace) : entrypointOverride;
String entrypoint = entrypointOverride == null ? getCommandFromImage(fabricClient, image, namespace) : entrypointOverride;
LOGGER.info("Found entrypoint: {}", entrypoint);

Volume pipeVolume = new VolumeBuilder()
Expand Down Expand Up @@ -376,17 +381,12 @@ public KubePodProcess(KubernetesClient client,
.build();

LOGGER.info("Creating pod...");
this.podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod);
this.podDefinition = fabricClient.pods().inNamespace(namespace).createOrReplace(pod);

waitForInitPodToRun(client, podDefinition);
waitForInitPodToRun(fabricClient, podDefinition);

LOGGER.info("Copying files...");
Map<String, String> filesWithSuccess = new HashMap<>(files);

// We always copy the empty success file to ensure our waiting step can detect the init container in
// RUNNING. Otherwise, the container can complete and exit before we are able to detect it.
filesWithSuccess.put(SUCCESS_FILE_NAME, "");
copyFilesToKubeConfigVolume(client, podName, namespace, filesWithSuccess);
copyFilesToKubeConfigVolume(officialClient, podName, namespace, files);

LOGGER.info("Waiting until pod is ready...");
// If a pod gets into a non-terminal error state it should be automatically killed by our
Expand All @@ -396,14 +396,14 @@ public KubePodProcess(KubernetesClient client,
// This doesn't manage things like pods that are blocked from running for some cluster reason or if
// the init
// container got stuck somehow.
client.resource(podDefinition).waitUntilCondition(p -> {
fabricClient.resource(podDefinition).waitUntilCondition(p -> {
boolean isReady = Objects.nonNull(p) && Readiness.getInstance().isReady(p);
return isReady || isTerminal(p);
}, 10, TimeUnit.DAYS);

// allow writing stdin to pod
LOGGER.info("Reading pod IP...");
var podIp = getPodIP(client, podName, namespace);
var podIp = getPodIP(fabricClient, podName, namespace);
LOGGER.info("Pod IP: {}", podIp);

if (usesStdin) {
Expand Down Expand Up @@ -460,8 +460,9 @@ public InputStream getErrorStream() {
@Override
public int waitFor() throws InterruptedException {
try {
Pod refreshedPod = client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get();
client.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS);
Pod refreshedPod =
fabricClient.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get();
fabricClient.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS);
wasKilled.set(true);
return exitValue();
} finally {
Expand Down Expand Up @@ -489,7 +490,7 @@ public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException
public void destroy() {
LOGGER.info("Destroying Kube process: {}", podDefinition.getMetadata().getName());
try {
client.resource(podDefinition).withPropagationPolicy(DeletionPropagation.FOREGROUND).delete();
fabricClient.resource(podDefinition).withPropagationPolicy(DeletionPropagation.FOREGROUND).delete();
wasKilled.set(true);
} finally {
close();
Expand Down Expand Up @@ -524,7 +525,7 @@ private boolean isTerminal(Pod pod) {

private int getReturnCode(Pod pod) {
var name = pod.getMetadata().getName();
Pod refreshedPod = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(name).get();
Pod refreshedPod = fabricClient.pods().inNamespace(pod.getMetadata().getNamespace()).withName(name).get();
if (refreshedPod == null) {
if (wasKilled.get()) {
LOGGER.info("Unable to find pod {} to retrieve exit value. Defaulting to value {}. This is expected if the job was cancelled.", name,
Expand Down
Expand Up @@ -26,6 +26,7 @@

import io.airbyte.workers.WorkerException;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand All @@ -39,20 +40,27 @@ public class KubeProcessFactory implements ProcessFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class);

private final String namespace;
private final KubernetesClient kubeClient;
private final ApiClient officialClient;
private final KubernetesClient fabricClient;
private final String kubeHeartbeatUrl;
private final BlockingQueue<Integer> workerPorts;

/**
* @param namespace kubernetes namespace where spawned pods will live
* @param kubeClient kubernetes client
* @param officialClient official kubernetes client
* @param fabricClient fabric8 kubernetes client
* @param kubeHeartbeatUrl a url where if the response is not 200 the spawned process will fail
* itself
* @param workerPorts a set of ports that can be used for IO socket servers
*/
public KubeProcessFactory(String namespace, KubernetesClient kubeClient, String kubeHeartbeatUrl, BlockingQueue<Integer> workerPorts) {
public KubeProcessFactory(String namespace,
ApiClient officialClient,
KubernetesClient fabricClient,
String kubeHeartbeatUrl,
BlockingQueue<Integer> workerPorts) {
this.namespace = namespace;
this.kubeClient = kubeClient;
this.officialClient = officialClient;
this.fabricClient = fabricClient;
this.kubeHeartbeatUrl = kubeHeartbeatUrl;
this.workerPorts = workerPorts;
}
Expand Down Expand Up @@ -88,7 +96,8 @@ public Process create(String jobId,
};

return new KubePodProcess(
kubeClient,
officialClient,
fabricClient,
portReleaser,
podName,
namespace,
Expand Down
Expand Up @@ -33,6 +33,8 @@
import io.airbyte.workers.WorkerException;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.ServerSocket;
Expand All @@ -45,6 +47,7 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -57,7 +60,8 @@ public class KubePodProcessIntegrationTest {
private List<Integer> openWorkerPorts;
private int heartbeatPort;
private String heartbeatUrl;
private KubernetesClient kubeClient;
private ApiClient officialClient;
private KubernetesClient fabricClient;
private BlockingQueue<Integer> workerPorts;
private KubeProcessFactory processFactory;

Expand All @@ -70,9 +74,10 @@ public void setup() throws Exception {
heartbeatPort = openPorts.get(0);
heartbeatUrl = getHost() + ":" + heartbeatPort;

kubeClient = new DefaultKubernetesClient();
officialClient = Config.defaultClient();
fabricClient = new DefaultKubernetesClient();
workerPorts = new LinkedBlockingDeque<>(openWorkerPorts);
processFactory = new KubeProcessFactory("default", kubeClient, heartbeatUrl, workerPorts);
processFactory = new KubeProcessFactory("default", officialClient, fabricClient, heartbeatUrl, workerPorts);

server = new WorkerHeartbeatServer(heartbeatPort);
server.startBackground();
Expand Down Expand Up @@ -143,14 +148,30 @@ public void testKillingWithoutHeartbeat() throws Exception {
assertNotEquals(0, process.exitValue());
}

private static String getRandomFile(int lines) {
var sb = new StringBuilder();
for (int i = 0; i < lines; i++) {
sb.append(RandomStringUtils.randomAlphabetic(100));
sb.append("\n");
}
return sb.toString();
}

private Process getProcess(String entrypoint) throws WorkerException {
// these files aren't used for anything, it's just to check for exceptions when uploading
var files = ImmutableMap.of(
"file0", "fixed str",
"file1", getRandomFile(1),
"file2", getRandomFile(100),
"file3", getRandomFile(1000));

return processFactory.create(
"some-id",
0,
Path.of("/tmp/job-root"),
"busybox:latest",
false,
ImmutableMap.of(),
files,
entrypoint);
}

Expand Down

0 comments on commit f3330a9

Please sign in to comment.