From f3330a9c274bb6b2bdbfd721e8615556073b73b0 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Fri, 18 Jun 2021 16:23:24 -0700 Subject: [PATCH] fix gke file copy errors (#4214) * fix gke file copy errors * remove one * remove unthrown exception --- .../airbyte/scheduler/app/SchedulerApp.java | 10 +-- .../workers/process/KubePodProcess.java | 63 ++++++++++--------- .../workers/process/KubeProcessFactory.java | 19 ++++-- .../KubePodProcessIntegrationTest.java | 29 +++++++-- 4 files changed, 77 insertions(+), 44 deletions(-) diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 7fc8d1d362c48..7da21a37ac3b9 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -52,10 +52,11 @@ import io.airbyte.workers.temporal.TemporalUtils; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.util.Config; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.net.InetAddress; -import java.net.UnknownHostException; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; @@ -167,13 +168,14 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi } } - private static ProcessFactory getProcessBuilderFactory(Configs configs) throws UnknownHostException { + private static ProcessFactory getProcessBuilderFactory(Configs configs) throws IOException { if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { - final KubernetesClient kubeClient = new DefaultKubernetesClient(); + final ApiClient officialClient = Config.defaultClient(); + final KubernetesClient fabricClient = new DefaultKubernetesClient(); final BlockingQueue workerPorts = new LinkedBlockingDeque<>(configs.getTemporalWorkerPorts()); final String localIp = InetAddress.getLocalHost().getHostAddress(); final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT; - return new KubeProcessFactory("default", kubeClient, kubeHeartbeatUrl, workerPorts); + return new KubeProcessFactory("default", officialClient, fabricClient, kubeHeartbeatUrl, workerPorts); } else { return new DockerProcessFactory( configs.getWorkspaceRoot(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index b794e30dabd19..25b46c658d732 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -24,7 +24,6 @@ package io.airbyte.workers.process; -import io.airbyte.commons.io.IOs; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; @@ -39,15 +38,19 @@ import io.fabric8.kubernetes.api.model.VolumeMountBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.internal.readiness.Readiness; +import io.kubernetes.client.Copy; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.AbstractMap; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -110,7 +113,7 @@ public class KubePodProcess extends Process { private static final int KILLED_EXIT_CODE = 143; private static final int STDIN_REMOTE_PORT = 9001; - private final KubernetesClient client; + private final KubernetesClient fabricClient; private final Pod podDefinition; // Necessary since it is not possible to retrieve the pod's actual exit code upon termination. This // is because the Kube API server does not keep @@ -222,24 +225,25 @@ private static Container getMain(String image, boolean usesStdin, String entrypo .build(); } - private static void copyFilesToKubeConfigVolume(KubernetesClient client, String podName, String namespace, Map files) { + private static void copyFilesToKubeConfigVolume(ApiClient officialClient, String podName, String namespace, Map files) { List> fileEntries = new ArrayList<>(files.entrySet()); + // copy this file last to indicate that the copy has completed + fileEntries.add(new AbstractMap.SimpleEntry<>(SUCCESS_FILE_NAME, "")); + for (Map.Entry file : fileEntries) { - Path tmpFile = null; try { - tmpFile = Path.of(IOs.writeFileToRandomTmpDir(file.getKey(), file.getValue())); - LOGGER.info("Uploading file: " + file.getKey()); + var contents = file.getValue().getBytes(StandardCharsets.UTF_8); + var containerPath = Path.of(CONFIG_DIR + "/" + file.getKey()); - client.pods().inNamespace(namespace).withName(podName).inContainer(INIT_CONTAINER_NAME) - .file(CONFIG_DIR + "/" + file.getKey()) - .upload(tmpFile); + // fabric8 kube client upload doesn't work on gke: + // https://github.com/fabric8io/kubernetes-client/issues/2217 + Copy copy = new Copy(officialClient); + copy.copyFileToPod(namespace, podName, INIT_CONTAINER_NAME, contents, containerPath); - } finally { - if (tmpFile != null) { - tmpFile.toFile().delete(); - } + } catch (IOException | ApiException e) { + throw new RuntimeException(e); } } } @@ -260,7 +264,8 @@ private static void waitForInitPodToRun(KubernetesClient client, Pod podDefiniti LOGGER.info("Init container ready.."); } - public KubePodProcess(KubernetesClient client, + public KubePodProcess(ApiClient officialClient, + KubernetesClient fabricClient, Consumer portReleaser, String podName, String namespace, @@ -273,7 +278,7 @@ public KubePodProcess(KubernetesClient client, final String entrypointOverride, final String... args) throws IOException, InterruptedException { - this.client = client; + this.fabricClient = fabricClient; this.portReleaser = portReleaser; this.stdoutLocalPort = stdoutLocalPort; this.stderrLocalPort = stderrLocalPort; @@ -283,7 +288,7 @@ public KubePodProcess(KubernetesClient client, executorService = Executors.newFixedThreadPool(2); setupStdOutAndStdErrListeners(); - String entrypoint = entrypointOverride == null ? getCommandFromImage(client, image, namespace) : entrypointOverride; + String entrypoint = entrypointOverride == null ? getCommandFromImage(fabricClient, image, namespace) : entrypointOverride; LOGGER.info("Found entrypoint: {}", entrypoint); Volume pipeVolume = new VolumeBuilder() @@ -376,17 +381,12 @@ public KubePodProcess(KubernetesClient client, .build(); LOGGER.info("Creating pod..."); - this.podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod); + this.podDefinition = fabricClient.pods().inNamespace(namespace).createOrReplace(pod); - waitForInitPodToRun(client, podDefinition); + waitForInitPodToRun(fabricClient, podDefinition); LOGGER.info("Copying files..."); - Map filesWithSuccess = new HashMap<>(files); - - // We always copy the empty success file to ensure our waiting step can detect the init container in - // RUNNING. Otherwise, the container can complete and exit before we are able to detect it. - filesWithSuccess.put(SUCCESS_FILE_NAME, ""); - copyFilesToKubeConfigVolume(client, podName, namespace, filesWithSuccess); + copyFilesToKubeConfigVolume(officialClient, podName, namespace, files); LOGGER.info("Waiting until pod is ready..."); // If a pod gets into a non-terminal error state it should be automatically killed by our @@ -396,14 +396,14 @@ public KubePodProcess(KubernetesClient client, // This doesn't manage things like pods that are blocked from running for some cluster reason or if // the init // container got stuck somehow. - client.resource(podDefinition).waitUntilCondition(p -> { + fabricClient.resource(podDefinition).waitUntilCondition(p -> { boolean isReady = Objects.nonNull(p) && Readiness.getInstance().isReady(p); return isReady || isTerminal(p); }, 10, TimeUnit.DAYS); // allow writing stdin to pod LOGGER.info("Reading pod IP..."); - var podIp = getPodIP(client, podName, namespace); + var podIp = getPodIP(fabricClient, podName, namespace); LOGGER.info("Pod IP: {}", podIp); if (usesStdin) { @@ -460,8 +460,9 @@ public InputStream getErrorStream() { @Override public int waitFor() throws InterruptedException { try { - Pod refreshedPod = client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get(); - client.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS); + Pod refreshedPod = + fabricClient.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get(); + fabricClient.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS); wasKilled.set(true); return exitValue(); } finally { @@ -489,7 +490,7 @@ public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException public void destroy() { LOGGER.info("Destroying Kube process: {}", podDefinition.getMetadata().getName()); try { - client.resource(podDefinition).withPropagationPolicy(DeletionPropagation.FOREGROUND).delete(); + fabricClient.resource(podDefinition).withPropagationPolicy(DeletionPropagation.FOREGROUND).delete(); wasKilled.set(true); } finally { close(); @@ -524,7 +525,7 @@ private boolean isTerminal(Pod pod) { private int getReturnCode(Pod pod) { var name = pod.getMetadata().getName(); - Pod refreshedPod = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(name).get(); + Pod refreshedPod = fabricClient.pods().inNamespace(pod.getMetadata().getNamespace()).withName(name).get(); if (refreshedPod == null) { if (wasKilled.get()) { LOGGER.info("Unable to find pod {} to retrieve exit value. Defaulting to value {}. This is expected if the job was cancelled.", name, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 5dde4ad8a5b7d..2d21965f68ba3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -26,6 +26,7 @@ import io.airbyte.workers.WorkerException; import io.fabric8.kubernetes.client.KubernetesClient; +import io.kubernetes.client.openapi.ApiClient; import java.nio.file.Path; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -39,20 +40,27 @@ public class KubeProcessFactory implements ProcessFactory { private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class); private final String namespace; - private final KubernetesClient kubeClient; + private final ApiClient officialClient; + private final KubernetesClient fabricClient; private final String kubeHeartbeatUrl; private final BlockingQueue workerPorts; /** * @param namespace kubernetes namespace where spawned pods will live - * @param kubeClient kubernetes client + * @param officialClient official kubernetes client + * @param fabricClient fabric8 kubernetes client * @param kubeHeartbeatUrl a url where if the response is not 200 the spawned process will fail * itself * @param workerPorts a set of ports that can be used for IO socket servers */ - public KubeProcessFactory(String namespace, KubernetesClient kubeClient, String kubeHeartbeatUrl, BlockingQueue workerPorts) { + public KubeProcessFactory(String namespace, + ApiClient officialClient, + KubernetesClient fabricClient, + String kubeHeartbeatUrl, + BlockingQueue workerPorts) { this.namespace = namespace; - this.kubeClient = kubeClient; + this.officialClient = officialClient; + this.fabricClient = fabricClient; this.kubeHeartbeatUrl = kubeHeartbeatUrl; this.workerPorts = workerPorts; } @@ -88,7 +96,8 @@ public Process create(String jobId, }; return new KubePodProcess( - kubeClient, + officialClient, + fabricClient, portReleaser, podName, namespace, diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java index 03ce839db043e..191a325bb86c9 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java @@ -33,6 +33,8 @@ import io.airbyte.workers.WorkerException; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.util.Config; import java.io.IOException; import java.net.Inet4Address; import java.net.ServerSocket; @@ -45,6 +47,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import org.apache.commons.lang.RandomStringUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,7 +60,8 @@ public class KubePodProcessIntegrationTest { private List openWorkerPorts; private int heartbeatPort; private String heartbeatUrl; - private KubernetesClient kubeClient; + private ApiClient officialClient; + private KubernetesClient fabricClient; private BlockingQueue workerPorts; private KubeProcessFactory processFactory; @@ -70,9 +74,10 @@ public void setup() throws Exception { heartbeatPort = openPorts.get(0); heartbeatUrl = getHost() + ":" + heartbeatPort; - kubeClient = new DefaultKubernetesClient(); + officialClient = Config.defaultClient(); + fabricClient = new DefaultKubernetesClient(); workerPorts = new LinkedBlockingDeque<>(openWorkerPorts); - processFactory = new KubeProcessFactory("default", kubeClient, heartbeatUrl, workerPorts); + processFactory = new KubeProcessFactory("default", officialClient, fabricClient, heartbeatUrl, workerPorts); server = new WorkerHeartbeatServer(heartbeatPort); server.startBackground(); @@ -143,14 +148,30 @@ public void testKillingWithoutHeartbeat() throws Exception { assertNotEquals(0, process.exitValue()); } + private static String getRandomFile(int lines) { + var sb = new StringBuilder(); + for (int i = 0; i < lines; i++) { + sb.append(RandomStringUtils.randomAlphabetic(100)); + sb.append("\n"); + } + return sb.toString(); + } + private Process getProcess(String entrypoint) throws WorkerException { + // these files aren't used for anything, it's just to check for exceptions when uploading + var files = ImmutableMap.of( + "file0", "fixed str", + "file1", getRandomFile(1), + "file2", getRandomFile(100), + "file3", getRandomFile(1000)); + return processFactory.create( "some-id", 0, Path.of("/tmp/job-root"), "busybox:latest", false, - ImmutableMap.of(), + files, entrypoint); }