Skip to content
Permalink
Browse files
Updates to Kubernetes scheduler to fix dockerenv issue (#3550)
* Updates to Kubernetes scheduler to create Headless service
* Use FQDN instead of hostname when running in Kubernetes environment
  • Loading branch information
nicknezis committed Jul 14, 2020
1 parent eb5090b commit e1dda9acfded22ac1de86d73981118488d7a3b19
Showing 9 changed files with 2,856 additions and 2,847 deletions.
@@ -54,7 +54,7 @@ jetty_version = "9.4.6.v20170531"

jersey_version = "2.25.1"

kubernetes_client_version = "7.0.0"
kubernetes_client_version = "8.0.0"

load("@rules_jvm_external//:defs.bzl", "maven_install")
load("@rules_jvm_external//:specs.bzl", "maven")
@@ -73,15 +73,14 @@ maven_install(
"org.apache.mesos:mesos:0.22.0",
"com.hashicorp.nomad:nomad-sdk:0.7.0",
"org.apache.hadoop:hadoop-core:0.20.2",
"org.apache.pulsar:pulsar-client:1.19.0-incubating",
"org.apache.pulsar:pulsar-client:jar:shaded:1.19.0-incubating",
"org.apache.kafka:kafka-clients:2.2.0",
"com.google.apis:google-api-services-storage:v1-rev108-" + google_client_version,
"org.apache.reef:reef-runtime-yarn:" + reef_version,
"org.apache.reef:reef-runtime-local:" + reef_version,
"org.apache.httpcomponents:httpclient:" + http_client_version,
"org.apache.httpcomponents:httpmime:" + http_client_version,
"com.google.apis:google-api-services-storage:v1-rev108-1.22.0",
"io.kubernetes:client-java:7.0.0",
"com.microsoft.dhalion:dhalion:0.2.3",
"org.objenesis:objenesis:2.1",
"org.ow2.asm:asm-all:5.1",
@@ -29,7 +29,7 @@ java_binary(
"//heron/common/src/java:basics-java",
"//heron/simulator/src/java:simulator-java",
"//third_party/java:kryo",
"@maven//:org_apache_pulsar_pulsar_client",
"@maven//:org_apache_pulsar_pulsar_client_shaded",
],
)

@@ -140,6 +140,9 @@ def log_pid_for_process(process_name, pid):
def is_docker_environment():
return os.path.isfile('/.dockerenv')

def is_kubernetes_environment():
return 'POD_NAME' in os.environ

def stdout_log_fn(cmd):
"""Simple function callback that is used to log the streaming output of a subprocess command
:param cmd: the name of the command which will be added to the log line
@@ -235,7 +238,9 @@ def init_from_parsed_args(self, parsed_args):
# Needed for Docker environments since the hostname of a docker container is the container's
# id within docker, rather than the host's hostname. NOTE: this 'HOST' env variable is not
# guaranteed to be set in all Docker executor environments (outside of Marathon)
if is_docker_environment():
if is_kubernetes_environment():
self.master_host = socket.getfqdn()
elif is_docker_environment():
self.master_host = os.environ.get('HOST') if 'HOST' in os.environ else socket.gethostname()
else:
self.master_host = socket.gethostname()

This file was deleted.

@@ -50,7 +50,7 @@ public class KubernetesScheduler implements IScheduler, IScalable {
private UpdateTopologyManager updateTopologyManager;

protected KubernetesController getController() {
return new AppsV1Controller(configuration, runtimeConfiguration);
return new V1Controller(configuration, runtimeConfiguration);
}

@Override
@@ -48,6 +48,7 @@
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1ContainerPort;
import io.kubernetes.client.openapi.models.V1EnvVar;
@@ -58,6 +59,8 @@
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
import io.kubernetes.client.openapi.models.V1Toleration;
@@ -66,21 +69,23 @@

import okhttp3.Response;

public class AppsV1Controller extends KubernetesController {
public class V1Controller extends KubernetesController {

private static final Logger LOG =
Logger.getLogger(AppsV1Controller.class.getName());
Logger.getLogger(V1Controller.class.getName());

private static final String ENV_SHARD_ID = "SHARD_ID";

private final AppsV1Api appsClient;
private final CoreV1Api coreClient;

AppsV1Controller(Config configuration, Config runtimeConfiguration) {
V1Controller(Config configuration, Config runtimeConfiguration) {
super(configuration, runtimeConfiguration);
try {
final ApiClient apiClient = io.kubernetes.client.util.Config.defaultClient();
Configuration.setDefaultApiClient(apiClient);
appsClient = new AppsV1Api(apiClient);
coreClient = new CoreV1Api(apiClient);
} catch (IOException e) {
LOG.log(Level.SEVERE, "Failed to setup Kubernetes client" + e);
throw new RuntimeException(e);
@@ -96,6 +101,16 @@ boolean submit(PackingPlan packingPlan) {

final Resource containerResource = getContainerResource(packingPlan);

final V1Service topologyService = createTopologyyService();
try {
final V1Service response =
coreClient.createNamespacedService(getNamespace(), topologyService, null,
null, null);
} catch (ApiException e) {
KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology service", e);
throw new TopologySubmissionException(e.getMessage());
}

// find the max number of instances in a container so we can open
// enough ports if remote debugging is enabled.
int numberOfInstances = 0;
@@ -118,11 +133,9 @@ boolean submit(PackingPlan packingPlan) {

@Override
boolean killTopology() {
return
isStatefulSet()
? deleteStatefulSet()
:
new KubernetesCompat().killTopology(getKubernetesUri(), getTopologyName(), getNamespace());
deleteStatefulSet();
deleteService();
return true;
}

@Override
@@ -199,6 +212,31 @@ V1StatefulSet getStatefulSet() throws ApiException {
null, null, null);
}

boolean deleteService() {
try {
final Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
getNamespace(), null, null, 0, null,
KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute();

if (response.isSuccessful()) {
LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
+ "] in namespace [" + getNamespace() + "] is deleted.");
return true;
} else {
LOG.log(Level.SEVERE, "Error when deleting the Service of the job ["
+ getTopologyName() + "] in namespace [" + getNamespace() + "]");
LOG.log(Level.SEVERE, "Error killing topoogy message:" + response.message());
KubernetesUtils.logResponseBodyIfPresent(LOG, response);

throw new TopologyRuntimeManagementException(
KubernetesUtils.errorMessageFromResponse(response));
}
} catch (IOException | ApiException e) {
KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology service", e);
return false;
}
}

boolean deleteStatefulSet() {
try {
final Response response = appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
@@ -211,7 +249,7 @@ boolean deleteStatefulSet() {
return true;
} else {
LOG.log(Level.SEVERE, "Error when deleting the StatefulSet of the job ["
+ getTopologyName() + "]: in namespace [" + getNamespace() + "]");
+ getTopologyName() + "] in namespace [" + getNamespace() + "]");
LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
KubernetesUtils.logResponseBodyIfPresent(LOG, response);

@@ -224,18 +262,6 @@ boolean deleteStatefulSet() {
}
}

boolean isStatefulSet() {
try {
final V1StatefulSet response =
appsClient.readNamespacedStatefulSet(getTopologyName(), getNamespace(),
null, null, null);
return response.getKind().equals("StatefulSet");
} catch (ApiException e) {
LOG.warning("isStatefulSet check " + e.getMessage());
}
return false;
}

protected List<String> getExecutorCommand(String containerId) {
final Map<ExecutorPort, String> ports =
KubernetesConstants.EXECUTOR_PORTS.entrySet()
@@ -262,6 +288,26 @@ private static String setShardIdEnvironmentVariableCommand() {
return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID);
}

private V1Service createTopologyyService() {
final String topologyName = getTopologyName();
final Config runtimeConfiguration = getRuntimeConfiguration();

final V1Service service = new V1Service();

// setup service metadata
final V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.name(topologyName);
service.setMetadata(objectMeta);

// create the headless service
final V1ServiceSpec serviceSpec = new V1ServiceSpec();
serviceSpec.clusterIP("None");
serviceSpec.setSelector(getMatchLabels(topologyName));

service.setSpec(serviceSpec);

return service;
}

private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances) {
final String topologyName = getTopologyName();
@@ -45,10 +45,10 @@ CkptMgrClient::CkptMgrClient(std::shared_ptr<EventLoop> eventloop, const Network
ckptmgr_id_(_ckptmgr_id),
stmgr_id_(_stmgr_id),
quit_(false),
pplan_(nullptr),
ckpt_saved_watcher_(_ckpt_saved_watcher),
ckpt_get_watcher_(_ckpt_get_watcher),
register_watcher_(_register_watcher) {
register_watcher_(_register_watcher),
pplan_(nullptr) {

// TODO(nlu): take the value from config
reconnect_cpktmgr_interval_sec_ = 10;
@@ -346,7 +346,7 @@ TEST(StatefulRestorer, deadinstances) {
// Send notification that some tasks have recovered
EXPECT_GT(local_tasks.size(), 1);
bool first = true;
int32_t troublesome_task;
int32_t troublesome_task = 0;
for (auto task : local_tasks) {
if (first) {
first = false;
@@ -426,7 +426,7 @@ TEST(StatefulRestorer, deadckptmgr) {
// Send notification that some tasks have recovered
EXPECT_GT(local_tasks.size(), 1);
bool first = true;
int32_t troublesome_task;
int32_t troublesome_task = 0;
// ckpt delivers some checkpoints
for (auto task : local_tasks) {
if (first) {

0 comments on commit e1dda9a

Please sign in to comment.