Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade the fabric client to support newer versions of k8s #13804

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Additional Configuration
|`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No|
|`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.peonMonitors`|`JsonArray`|Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord.|`[]`|No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No|

### Gotchas
Expand Down
14 changes: 10 additions & 4 deletions extensions-contrib/kubernetes-overlord-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-core</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
Expand All @@ -108,12 +108,18 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-batch</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client-api</artifactId>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
<scope>runtime</scope>
</dependency>

<!-- Tests -->
Expand All @@ -130,7 +136,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-server-mock</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public ListenableFuture<TaskStatus> run(Task task)
} else {
status = TaskStatus.failure(
task.getId(),
"Task failed %s: " + k8sTaskId
"Task failed: " + k8sTaskId
);
}
if (completedPhase.getJobDuration().isPresent()) {
Expand Down Expand Up @@ -254,6 +254,7 @@ JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId)
@Override
public void updateStatus(Task task, TaskStatus status)
{
log.info("Updating task: %s with status %s", task.getId(), status);
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.joda.time.Period;

import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -81,6 +82,13 @@ public class KubernetesTaskRunnerConfig
// how long to wait for the peon k8s job to launch
public Period k8sjobLaunchTimeout = new Period("PT1H");

@JsonProperty
// ForkingTaskRunner inherits the monitors from the MM, in k8s mode
// the peon inherits the monitors from the overlord, so if someone specifies
// a TaskCountStatsMonitor in the overlord for example, the peon process
// fails because it can not inject this monitor in the peon process.
public List<String> peonMonitors = new ArrayList<>();

@JsonProperty
@NotNull
public List<String> javaOptsArray;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.config.TaskConfig;
Expand Down Expand Up @@ -76,7 +77,7 @@ public KubernetesTaskRunner build()
{
DruidKubernetesClient client;
if (kubernetesTaskRunnerConfig.disableClientProxy) {
Config config = Config.autoConfigure(null);
Config config = new ConfigBuilder().build();
config.setHttpsProxy(null);
config.setHttpProxy(null);
client = new DruidKubernetesClient(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
package org.apache.druid.k8s.overlord.common;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;

public class DruidKubernetesClient implements KubernetesClientApi
{
Expand All @@ -30,7 +31,7 @@ public class DruidKubernetesClient implements KubernetesClientApi

public DruidKubernetesClient()
{
this(Config.autoConfigure(null));
this(new ConfigBuilder().build());
}

public DruidKubernetesClient(Config config)
Expand All @@ -41,8 +42,14 @@ public DruidKubernetesClient(Config config)
@Override
public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException
{
try (KubernetesClient client = new DefaultKubernetesClient(config)) {
try (KubernetesClient client = getClient()) {
return executor.executeRequest(client);
}
}

@Override
public KubernetesClient getClient()
{
return new KubernetesClientBuilder().withConfig(config).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.io.input.ReaderInputStream;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;

import java.io.InputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -78,7 +76,7 @@ public Pod launchJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit)
long start = System.currentTimeMillis();
// launch job
return clientApi.executeRequest(client -> {
client.batch().v1().jobs().inNamespace(namespace).create(job);
client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
log.info("Successfully submitted job: %s ... waiting for job to launch", taskId);
// wait until the pod is running or complete or failed, any of those is fine
Expand Down Expand Up @@ -106,13 +104,15 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.waitUntilCondition(
x -> x != null && x.getStatus() != null && x.getStatus().getActive() == null,
x -> x != null && x.getStatus() != null && x.getStatus().getActive() == null
&& (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() !=null),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a tricky one. For some background

the overlord launches a task, waits for it to complete and then returns the status. Now in k8s < 1.25 it would just mean once the pod is not active, go grab the status.
Now the contact has changed a bit, because in 1.25 they introduced finalizers:
kubernetes/kubernetes#110948

And we noticed that when we ran tasks on a 1.25 k8s druid cluster they would complete fine but marked as failure.
We outputted the job status and noticed that the job was not active, but both success and failure were not set, but there was this field that had

uncountedTerminatedPods=UncountedTerminatedPods(failed=[], succeeded=[e916cbf9-467a-45f3-86a7-3767145d6384], additionalProperties={})

which from the docs:

UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters. The job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding counter. This field is beta-level. The job controller only makes use of this field when the feature gate JobTrackingWithFinalizers is enabled (enabled by default). Old jobs might not be tracked using this field, in which case the field remains null.

So now what happens is the job goes from a state where it is not active, to having uncountedTerminatedPods to then having a status with success or failure. I will push up a one-line fix to make this work, but for those of you working with 1.25 version of k8s, I’m sure you will be affected as well.

Basically add another check to wait on,
Right now we wait for this:

// block until
job.getStatus() != null && job.getActive() == null
// then return 
return job.getStatus().getSucceeded() != null

So the change has to become

// block until
job.getStatus() != null && job.getActive() == null && (job.getStatus().getFailed() != null || job.getStatus().getSucceeded() !=null)
// then return 
return job.getStatus().getSucceeded() != null

This should keep things backwards compatible and working in all versions of k8s

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for the older k8s version, we can expect that when job.getActive() is null, one of success or failure field must be set. Question - do we need to check for getActive() result at all? can we just block till the job either succeeded or failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that could work, want me to change it? This definitely works above, I don't have access to many different k8s environments, and someone in the community helped me test the above on 1.25. I could definitely change this, but I unfortunately have no way of testing this out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this change, we can test it out on a v1.25 cluster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nlippis I believe you can test by taking this commit and making that small change and apply it onto the 25 branch. I think if it works for you, ill resubmit this with the change.

howLong,
unit
);
if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job, PeonPhase.SUCCEEDED);
}
log.warn("Task %s failed with status %s", taskId, job.getStatus());
return new JobResponse(job, PeonPhase.FAILED);
});
}
Expand All @@ -121,12 +121,12 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit
public boolean cleanUpJob(K8sTaskId taskId)
{
if (!debugJobs) {
Boolean result = clientApi.executeRequest(client -> client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.delete());
Boolean result = clientApi.executeRequest(client -> !client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.delete().isEmpty());
abhishekagarwal87 marked this conversation as resolved.
Show resolved Hide resolved
if (result) {
log.info("Cleaned up k8s task: %s", taskId);
} else {
Expand All @@ -143,23 +143,24 @@ public boolean cleanUpJob(K8sTaskId taskId)
@Override
public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
{
KubernetesClient k8sClient = clientApi.getClient();
try {
return clientApi.executeRequest(client -> {
Reader reader = client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.inContainer("main")
.getLogReader();
if (reader == null) {
return Optional.absent();
}
return Optional.of(new ReaderInputStream(reader, StandardCharsets.UTF_8));
});
LogWatch logWatch = k8sClient.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.inContainer("main")
.watchLog();
if (logWatch == null) {
k8sClient.close();
return Optional.absent();
}
return Optional.of(new LogWatchInputStream(k8sClient, logWatch));
}
catch (Exception e) {
log.error(e, "Error streaming logs from task: %s", taskId);
k8sClient.close();
return Optional.absent();
}
}
Expand All @@ -180,17 +181,17 @@ public List<Job> listAllPeonJobs()
public List<Pod> listPeonPods(Set<PeonPhase> phases)
{
return listPeonPods().stream()
.filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
.collect(Collectors.toList());
.filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
.collect(Collectors.toList());
}

@Override
public List<Pod> listPeonPods()
{
PodList podList = clientApi.executeRequest(client -> client.pods().inNamespace(namespace))
.withLabel(DruidK8sConstants.LABEL_KEY)
.list();
return podList.getItems();
return clientApi.executeRequest(client -> client.pods().inNamespace(namespace)
.withLabel(DruidK8sConstants.LABEL_KEY)
.list().getItems());

}

@Override
Expand All @@ -200,7 +201,12 @@ public int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit)
return clientApi.executeRequest(client -> {
List<Job> jobs = getJobsToCleanup(listAllPeonJobs(), howFarBack, timeUnit);
jobs.forEach(x -> {
if (client.batch().v1().jobs().inNamespace(namespace).withName(x.getMetadata().getName()).delete()) {
if (!client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(x.getMetadata().getName())
.delete().isEmpty()) {
numDeleted.incrementAndGet();
}
});
Expand Down Expand Up @@ -254,5 +260,4 @@ Pod getMainJobPod(KubernetesClient client, K8sTaskId taskId)
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId + " not found");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.k8s.overlord.common;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -96,6 +97,7 @@ public Job fromTask(Task task, PeonCommandContext context) throws IOException
@Override
public Task toTask(Pod from) throws IOException
{
// all i have to do here is grab the main container...done
PodSpec podSpec = from.getSpec();
massageSpec(podSpec, "main");
List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv();
Expand Down Expand Up @@ -176,8 +178,19 @@ protected void setupPorts(Container mainContainer)
mainContainer.setPorts(Lists.newArrayList(httpsPort, tcpPort));
}

protected void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents)
@VisibleForTesting
void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents)
throws JsonProcessingException
{
// if the peon monitors are set, override the overlord's monitors (if set) with the peon monitors
if (!config.peonMonitors.isEmpty()) {
mainContainer.getEnv().removeIf(x -> "druid_monitoring_monitors".equals(x.getName()));
mainContainer.getEnv().add(new EnvVarBuilder()
.withName("druid_monitoring_monitors")
.withValue(mapper.writeValueAsString(config.peonMonitors))
.build());
}

mainContainer.getEnv().addAll(Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
Expand Down Expand Up @@ -211,7 +224,7 @@ protected Container setupMainContainer(
PeonCommandContext context,
long containerSize,
String taskContents
)
) throws JsonProcessingException
{
// prepend the startup task.json extraction command
List<String> mainCommand = Lists.newArrayList("sh", "-c");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@

package org.apache.druid.k8s.overlord.common;

import io.fabric8.kubernetes.client.KubernetesClient;

// Wraps all kubernetes api calls, to ensure you open and close connections properly
public interface KubernetesClientApi
{
<T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException;

// use only when handling streams of data, example if you want to pass around an input stream from a pod,
// then you would call this instead of executeRequest as you would want to keep the connection open until you
// are done with the stream. Callers responsibility to clean up when using this method
KubernetesClient getClient();
}