Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix gke file copy errors #4214

Merged
merged 3 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@
import io.airbyte.workers.temporal.TemporalUtils;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -167,13 +168,14 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi
}
}

private static ProcessFactory getProcessBuilderFactory(Configs configs) throws UnknownHostException {
private static ProcessFactory getProcessBuilderFactory(Configs configs) throws IOException {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
final KubernetesClient kubeClient = new DefaultKubernetesClient();
final ApiClient officialClient = Config.defaultClient();
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final BlockingQueue<Integer> workerPorts = new LinkedBlockingDeque<>(configs.getTemporalWorkerPorts());
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
return new KubeProcessFactory("default", kubeClient, kubeHeartbeatUrl, workerPorts);
return new KubeProcessFactory("default", officialClient, fabricClient, kubeHeartbeatUrl, workerPorts);
} else {
return new DockerProcessFactory(
configs.getWorkspaceRoot(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package io.airbyte.workers.process;

import io.airbyte.commons.io.IOs;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
Expand All @@ -39,15 +38,19 @@
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import io.kubernetes.client.Copy;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -110,7 +113,7 @@ public class KubePodProcess extends Process {
private static final int KILLED_EXIT_CODE = 143;
private static final int STDIN_REMOTE_PORT = 9001;

private final KubernetesClient client;
private final KubernetesClient fabricClient;
private final Pod podDefinition;
// Necessary since it is not possible to retrieve the pod's actual exit code upon termination. This
// is because the Kube API server does not keep
Expand Down Expand Up @@ -222,24 +225,25 @@ private static Container getMain(String image, boolean usesStdin, String entrypo
.build();
}

private static void copyFilesToKubeConfigVolume(KubernetesClient client, String podName, String namespace, Map<String, String> files) {
private static void copyFilesToKubeConfigVolume(ApiClient officialClient, String podName, String namespace, Map<String, String> files) {
List<Map.Entry<String, String>> fileEntries = new ArrayList<>(files.entrySet());

// copy this file last to indicate that the copy has completed
fileEntries.add(new AbstractMap.SimpleEntry<>(SUCCESS_FILE_NAME, ""));

for (Map.Entry<String, String> file : fileEntries) {
Path tmpFile = null;
try {
tmpFile = Path.of(IOs.writeFileToRandomTmpDir(file.getKey(), file.getValue()));

LOGGER.info("Uploading file: " + file.getKey());
var contents = file.getValue().getBytes(StandardCharsets.UTF_8);
var containerPath = Path.of(CONFIG_DIR + "/" + file.getKey());

client.pods().inNamespace(namespace).withName(podName).inContainer(INIT_CONTAINER_NAME)
.file(CONFIG_DIR + "/" + file.getKey())
.upload(tmpFile);
// fabric8 kube client upload doesn't work on gke:
// https://github.com/fabric8io/kubernetes-client/issues/2217
Copy copy = new Copy(officialClient);
copy.copyFileToPod(namespace, podName, INIT_CONTAINER_NAME, contents, containerPath);

} finally {
if (tmpFile != null) {
tmpFile.toFile().delete();
}
} catch (IOException | ApiException e) {
throw new RuntimeException(e);
}
}
}
Expand All @@ -260,7 +264,8 @@ private static void waitForInitPodToRun(KubernetesClient client, Pod podDefiniti
LOGGER.info("Init container ready..");
}

public KubePodProcess(KubernetesClient client,
public KubePodProcess(ApiClient officialClient,
KubernetesClient fabricClient,
Consumer<Integer> portReleaser,
String podName,
String namespace,
Expand All @@ -273,7 +278,7 @@ public KubePodProcess(KubernetesClient client,
final String entrypointOverride,
final String... args)
throws IOException, InterruptedException {
this.client = client;
this.fabricClient = fabricClient;
this.portReleaser = portReleaser;
this.stdoutLocalPort = stdoutLocalPort;
this.stderrLocalPort = stderrLocalPort;
Expand All @@ -283,7 +288,7 @@ public KubePodProcess(KubernetesClient client,
executorService = Executors.newFixedThreadPool(2);
setupStdOutAndStdErrListeners();

String entrypoint = entrypointOverride == null ? getCommandFromImage(client, image, namespace) : entrypointOverride;
String entrypoint = entrypointOverride == null ? getCommandFromImage(fabricClient, image, namespace) : entrypointOverride;
LOGGER.info("Found entrypoint: {}", entrypoint);

Volume pipeVolume = new VolumeBuilder()
Expand Down Expand Up @@ -376,17 +381,12 @@ public KubePodProcess(KubernetesClient client,
.build();

LOGGER.info("Creating pod...");
this.podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod);
this.podDefinition = fabricClient.pods().inNamespace(namespace).createOrReplace(pod);

waitForInitPodToRun(client, podDefinition);
waitForInitPodToRun(fabricClient, podDefinition);

LOGGER.info("Copying files...");
Map<String, String> filesWithSuccess = new HashMap<>(files);

// We always copy the empty success file to ensure our waiting step can detect the init container in
// RUNNING. Otherwise, the container can complete and exit before we are able to detect it.
filesWithSuccess.put(SUCCESS_FILE_NAME, "");
copyFilesToKubeConfigVolume(client, podName, namespace, filesWithSuccess);
copyFilesToKubeConfigVolume(officialClient, podName, namespace, files);

LOGGER.info("Waiting until pod is ready...");
// If a pod gets into a non-terminal error state it should be automatically killed by our
Expand All @@ -396,14 +396,14 @@ public KubePodProcess(KubernetesClient client,
// This doesn't manage things like pods that are blocked from running for some cluster reason or if
// the init
// container got stuck somehow.
client.resource(podDefinition).waitUntilCondition(p -> {
fabricClient.resource(podDefinition).waitUntilCondition(p -> {
boolean isReady = Objects.nonNull(p) && Readiness.getInstance().isReady(p);
return isReady || isTerminal(p);
}, 10, TimeUnit.DAYS);

// allow writing stdin to pod
LOGGER.info("Reading pod IP...");
var podIp = getPodIP(client, podName, namespace);
var podIp = getPodIP(fabricClient, podName, namespace);
LOGGER.info("Pod IP: {}", podIp);

if (usesStdin) {
Expand Down Expand Up @@ -460,8 +460,9 @@ public InputStream getErrorStream() {
@Override
public int waitFor() throws InterruptedException {
try {
Pod refreshedPod = client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get();
client.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS);
Pod refreshedPod =
fabricClient.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get();
fabricClient.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS);
wasKilled.set(true);
return exitValue();
} finally {
Expand Down Expand Up @@ -489,7 +490,7 @@ public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException
public void destroy() {
LOGGER.info("Destroying Kube process: {}", podDefinition.getMetadata().getName());
try {
client.resource(podDefinition).withPropagationPolicy(DeletionPropagation.FOREGROUND).delete();
fabricClient.resource(podDefinition).withPropagationPolicy(DeletionPropagation.FOREGROUND).delete();
wasKilled.set(true);
} finally {
close();
Expand Down Expand Up @@ -524,7 +525,7 @@ private boolean isTerminal(Pod pod) {

private int getReturnCode(Pod pod) {
var name = pod.getMetadata().getName();
Pod refreshedPod = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(name).get();
Pod refreshedPod = fabricClient.pods().inNamespace(pod.getMetadata().getNamespace()).withName(name).get();
if (refreshedPod == null) {
if (wasKilled.get()) {
LOGGER.info("Unable to find pod {} to retrieve exit value. Defaulting to value {}. This is expected if the job was cancelled.", name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import io.airbyte.workers.WorkerException;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand All @@ -39,20 +40,27 @@ public class KubeProcessFactory implements ProcessFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class);

private final String namespace;
private final KubernetesClient kubeClient;
private final ApiClient officialClient;
private final KubernetesClient fabricClient;
private final String kubeHeartbeatUrl;
private final BlockingQueue<Integer> workerPorts;

/**
* @param namespace kubernetes namespace where spawned pods will live
* @param kubeClient kubernetes client
* @param officialClient official kubernetes client
* @param fabricClient fabric8 kubernetes client
* @param kubeHeartbeatUrl a url where if the response is not 200 the spawned process will fail
* itself
* @param workerPorts a set of ports that can be used for IO socket servers
*/
public KubeProcessFactory(String namespace, KubernetesClient kubeClient, String kubeHeartbeatUrl, BlockingQueue<Integer> workerPorts) {
public KubeProcessFactory(String namespace,
ApiClient officialClient,
KubernetesClient fabricClient,
String kubeHeartbeatUrl,
BlockingQueue<Integer> workerPorts) {
this.namespace = namespace;
this.kubeClient = kubeClient;
this.officialClient = officialClient;
this.fabricClient = fabricClient;
this.kubeHeartbeatUrl = kubeHeartbeatUrl;
this.workerPorts = workerPorts;
}
Expand Down Expand Up @@ -88,7 +96,8 @@ public Process create(String jobId,
};

return new KubePodProcess(
kubeClient,
officialClient,
fabricClient,
portReleaser,
podName,
namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.airbyte.workers.WorkerException;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.ServerSocket;
Expand All @@ -45,6 +47,7 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -57,7 +60,8 @@ public class KubePodProcessIntegrationTest {
private List<Integer> openWorkerPorts;
private int heartbeatPort;
private String heartbeatUrl;
private KubernetesClient kubeClient;
private ApiClient officialClient;
private KubernetesClient fabricClient;
private BlockingQueue<Integer> workerPorts;
private KubeProcessFactory processFactory;

Expand All @@ -70,9 +74,10 @@ public void setup() throws Exception {
heartbeatPort = openPorts.get(0);
heartbeatUrl = getHost() + ":" + heartbeatPort;

kubeClient = new DefaultKubernetesClient();
officialClient = Config.defaultClient();
fabricClient = new DefaultKubernetesClient();
workerPorts = new LinkedBlockingDeque<>(openWorkerPorts);
processFactory = new KubeProcessFactory("default", kubeClient, heartbeatUrl, workerPorts);
processFactory = new KubeProcessFactory("default", officialClient, fabricClient, heartbeatUrl, workerPorts);

server = new WorkerHeartbeatServer(heartbeatPort);
server.startBackground();
Expand Down Expand Up @@ -143,14 +148,30 @@ public void testKillingWithoutHeartbeat() throws Exception {
assertNotEquals(0, process.exitValue());
}

private static String getRandomFile(int lines) {
var sb = new StringBuilder();
for (int i = 0; i < lines; i++) {
sb.append(RandomStringUtils.randomAlphabetic(100));
sb.append("\n");
}
return sb.toString();
}

private Process getProcess(String entrypoint) throws WorkerException {
// these files aren't used for anything, it's just to check for exceptions when uploading
var files = ImmutableMap.of(
"file0", "fixed str",
"file1", getRandomFile(1),
"file2", getRandomFile(100),
"file3", getRandomFile(1000));

return processFactory.create(
"some-id",
0,
Path.of("/tmp/job-root"),
"busybox:latest",
false,
ImmutableMap.of(),
files,
entrypoint);
}

Expand Down