Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade the fabric client to support newer versions of k8s #13804

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Additional Configuration
|`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No|
|`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.peonMonitors`|`JsonArray`|An override for the `druid.monitoring.monitors`, For the situation you have monitors setup, and do not want to inherit those from the overlord.|`[]`|No|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It would be useful to mention this property in the extension specific docs as well.

churromorales marked this conversation as resolved.
Show resolved Hide resolved
|`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No|

### Gotchas
Expand Down
14 changes: 10 additions & 4 deletions extensions-contrib/kubernetes-overlord-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-core</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
Expand All @@ -114,12 +114,18 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-batch</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client-api</artifactId>
<version>6.4.1</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
<scope>runtime</scope>
</dependency>

<!-- Tests -->
Expand All @@ -136,7 +142,7 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-server-mock</artifactId>
<version>5.12.2</version>
<version>6.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.joda.time.Period;

import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -81,6 +82,13 @@ public class KubernetesTaskRunnerConfig
// how long to wait for the peon k8s job to launch
public Period k8sjobLaunchTimeout = new Period("PT1H");

@JsonProperty
// ForkingTaskRunner inherits the monitors from the MM, in k8s mode
// the peon inherits the monitors from the overlord, so if someone specifies
// a TaskCountStatsMonitor in the overlord for example, the peon process
// fails because it can not inject this monitor in the peon process.
public List<String> peonMonitors = new ArrayList<>();

@JsonProperty
@NotNull
public List<String> javaOptsArray;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
Expand Down Expand Up @@ -76,7 +77,7 @@ public KubernetesTaskRunner build()
{
DruidKubernetesClient client;
if (kubernetesTaskRunnerConfig.disableClientProxy) {
Config config = Config.autoConfigure(null);
Config config = new ConfigBuilder().build();
config.setHttpsProxy(null);
config.setHttpProxy(null);
client = new DruidKubernetesClient(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
package org.apache.druid.k8s.overlord.common;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;

public class DruidKubernetesClient implements KubernetesClientApi
{
Expand All @@ -30,7 +31,7 @@ public class DruidKubernetesClient implements KubernetesClientApi

public DruidKubernetesClient()
{
this(Config.autoConfigure(null));
this(new ConfigBuilder().build());
}

public DruidKubernetesClient(Config config)
Expand All @@ -41,8 +42,14 @@ public DruidKubernetesClient(Config config)
@Override
public <T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException
{
try (KubernetesClient client = new DefaultKubernetesClient(config)) {
try (KubernetesClient client = getClient()) {
return executor.executeRequest(client);
}
}

@Override
public KubernetesClient getClient()
{
return new KubernetesClientBuilder().withConfig(config).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.io.input.ReaderInputStream;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;

import java.io.InputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -78,7 +76,7 @@ public Pod launchJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit)
long start = System.currentTimeMillis();
// launch job
return clientApi.executeRequest(client -> {
client.batch().v1().jobs().inNamespace(namespace).create(job);
client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
log.info("Successfully submitted job: %s ... waiting for job to launch", taskId);
// wait until the pod is running or complete or failed, any of those is fine
Expand Down Expand Up @@ -121,12 +119,12 @@ public JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit
public boolean cleanUpJob(K8sTaskId taskId)
{
if (!debugJobs) {
Boolean result = clientApi.executeRequest(client -> client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.delete());
Boolean result = clientApi.executeRequest(client -> !client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.delete().isEmpty());
abhishekagarwal87 marked this conversation as resolved.
Show resolved Hide resolved
if (result) {
log.info("Cleaned up k8s task: %s", taskId);
} else {
Expand Down Expand Up @@ -160,23 +158,24 @@ public String getJobLogs(K8sTaskId taskId)
@Override
public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
{
KubernetesClient k8sClient = clientApi.getClient();
try {
return clientApi.executeRequest(client -> {
Reader reader = client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.inContainer("main")
.getLogReader();
if (reader == null) {
return Optional.absent();
}
return Optional.of(new ReaderInputStream(reader, StandardCharsets.UTF_8));
});
LogWatch logWatch = k8sClient.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.inContainer("main")
.watchLog();
if (logWatch == null) {
k8sClient.close();
return Optional.absent();
}
return Optional.of(new LogWatchInputStream(k8sClient, logWatch));
}
catch (Exception e) {
log.error("Error streaming logs from task: %s", taskId);
log.error(e, "Error streaming logs from task: %s", taskId);
k8sClient.close();
return Optional.absent();
}
}
Expand All @@ -197,17 +196,17 @@ public List<Job> listAllPeonJobs()
public List<Pod> listPeonPods(Set<PeonPhase> phases)
{
return listPeonPods().stream()
.filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
.collect(Collectors.toList());
.filter(x -> phases.contains(PeonPhase.getPhaseFor(x)))
.collect(Collectors.toList());
}

@Override
public List<Pod> listPeonPods()
{
PodList podList = clientApi.executeRequest(client -> client.pods().inNamespace(namespace))
.withLabel(DruidK8sConstants.LABEL_KEY)
.list();
return podList.getItems();
return clientApi.executeRequest(client -> client.pods().inNamespace(namespace)
.withLabel(DruidK8sConstants.LABEL_KEY)
.list().getItems());

}

@Override
Expand All @@ -217,7 +216,12 @@ public int cleanCompletedJobsOlderThan(long howFarBack, TimeUnit timeUnit)
return clientApi.executeRequest(client -> {
List<Job> jobs = getJobsToCleanup(listAllPeonJobs(), howFarBack, timeUnit);
jobs.forEach(x -> {
if (client.batch().v1().jobs().inNamespace(namespace).withName(x.getMetadata().getName()).delete()) {
if (!client.batch()
.v1()
.jobs()
.inNamespace(namespace)
.withName(x.getMetadata().getName())
.delete().isEmpty()) {
numDeleted.incrementAndGet();
}
});
Expand Down Expand Up @@ -271,5 +275,4 @@ Pod getMainJobPod(KubernetesClient client, K8sTaskId taskId)
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId + " not found");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.k8s.overlord.common;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -96,6 +97,7 @@ public Job fromTask(Task task, PeonCommandContext context) throws IOException
@Override
public Task toTask(Pod from) throws IOException
{
// all i have to do here is grab the main container...done
PodSpec podSpec = from.getSpec();
massageSpec(podSpec, "main");
List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv();
Expand Down Expand Up @@ -176,8 +178,19 @@ protected void setupPorts(Container mainContainer)
mainContainer.setPorts(Lists.newArrayList(httpsPort, tcpPort));
}

protected void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents)
@VisibleForTesting
void addEnvironmentVariables(Container mainContainer, PeonCommandContext context, String taskContents)
throws JsonProcessingException
{
// if the peon monitors are set, override the overlord's monitors (if set) with the peon monitors
if (!config.peonMonitors.isEmpty()) {
mainContainer.getEnv().removeIf(x -> "druid_monitoring_monitors".equals(x.getName()));
mainContainer.getEnv().add(new EnvVarBuilder()
.withName("druid_monitoring_monitors")
.withValue(mapper.writeValueAsString(config.peonMonitors))
.build());
}

mainContainer.getEnv().addAll(Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
Expand Down Expand Up @@ -211,7 +224,7 @@ protected Container setupMainContainer(
PeonCommandContext context,
long containerSize,
String taskContents
)
) throws JsonProcessingException
{
// prepend the startup task.json extraction command
List<String> mainCommand = Lists.newArrayList("sh", "-c");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@

package org.apache.druid.k8s.overlord.common;

import io.fabric8.kubernetes.client.KubernetesClient;

// Wraps all kubernetes api calls, to ensure you open and close connections properly
public interface KubernetesClientApi
{
<T> T executeRequest(KubernetesExecutor<T> executor) throws KubernetesResourceNotFoundException;

// use only when handling streams of data, example if you want to pass around an input stream from a pod,
// then you would call this instead of executeRequest as you would want to keep the connection open until you
// are done with the stream. Callers responsibility to clean up when using this method
KubernetesClient getClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord.common;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;

import java.io.IOException;
import java.io.InputStream;

/**
* This wraps the InputStream for k8s client
* When you call close on the stream, it will also close the open
* http connections and the client
*/
public class LogWatchInputStream extends InputStream
churromorales marked this conversation as resolved.
Show resolved Hide resolved
{

private final KubernetesClient client;
private final LogWatch logWatch;

public LogWatchInputStream(KubernetesClient client, LogWatch logWatch)
{
this.client = client;
this.logWatch = logWatch;
}

@Override
public int read() throws IOException
{
return logWatch.getOutput().read();
}

@Override
public void close()
{
logWatch.close();
client.close();
}
}