diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityDeploy.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityDeploy.java index 29cb518fce..1f9ab12b40 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityDeploy.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityDeploy.java @@ -46,6 +46,7 @@ public class SingularityDeploy { private final Optional healthcheckUri; private final Optional healthcheckIntervalSeconds; private final Optional healthcheckTimeoutSeconds; + private final Optional healthcheckPortIndex; private final Optional skipHealthchecksOnDeploy; private final Optional healthcheckProtocol; @@ -58,6 +59,7 @@ public class SingularityDeploy { private final Optional serviceBasePath; private final Optional> loadBalancerGroups; + private final Optional loadBalancerPortIndex; private final Optional> loadBalancerOptions; public static SingularityDeployBuilder newBuilder(String requestId, String id) { @@ -87,10 +89,12 @@ public SingularityDeploy(@JsonProperty("requestId") String requestId, @JsonProperty("healthcheckUri") Optional healthcheckUri, @JsonProperty("healthcheckIntervalSeconds") Optional healthcheckIntervalSeconds, @JsonProperty("healthcheckTimeoutSeconds") Optional healthcheckTimeoutSeconds, + @JsonProperty("healthcheckPortIndex") Optional healthcheckPortIndex, @JsonProperty("healthcheckMaxRetries") Optional healthcheckMaxRetries, @JsonProperty("healthcheckMaxTotalTimeoutSeconds") Optional healthcheckMaxTotalTimeoutSeconds, @JsonProperty("serviceBasePath") Optional serviceBasePath, @JsonProperty("loadBalancerGroups") Optional> loadBalancerGroups, + @JsonProperty("loadBalancerPortIndex") Optional loadBalancerPortIndex, @JsonProperty("considerHealthyAfterRunningForSeconds") Optional considerHealthyAfterRunningForSeconds, @JsonProperty("loadBalancerOptions") Optional> loadBalancerOptions, @JsonProperty("skipHealthchecksOnDeploy") Optional skipHealthchecksOnDeploy, @@ -121,6 +125,7 @@ public SingularityDeploy(@JsonProperty("requestId") String requestId, this.healthcheckUri = healthcheckUri; this.healthcheckIntervalSeconds = healthcheckIntervalSeconds; this.healthcheckTimeoutSeconds = healthcheckTimeoutSeconds; + this.healthcheckPortIndex = healthcheckPortIndex; this.skipHealthchecksOnDeploy = skipHealthchecksOnDeploy; this.healthcheckProtocol = healthcheckProtocol; @@ -133,6 +138,7 @@ public SingularityDeploy(@JsonProperty("requestId") String requestId, this.serviceBasePath = serviceBasePath; this.loadBalancerGroups = loadBalancerGroups; + this.loadBalancerPortIndex = loadBalancerPortIndex; this.loadBalancerOptions = loadBalancerOptions; } @@ -151,6 +157,7 @@ public SingularityDeployBuilder toBuilder() { .setHealthcheckUri(healthcheckUri) .setHealthcheckIntervalSeconds(healthcheckIntervalSeconds) .setHealthcheckTimeoutSeconds(healthcheckTimeoutSeconds) + .setHealthcheckPortIndex(healthcheckPortIndex) .setSkipHealthchecksOnDeploy(skipHealthchecksOnDeploy) .setHealthcheckProtocol(healthcheckProtocol) @@ -161,6 +168,7 @@ public SingularityDeployBuilder toBuilder() { .setDeployHealthTimeoutSeconds(deployHealthTimeoutSeconds) .setServiceBasePath(serviceBasePath) .setLoadBalancerGroups(copyOfSet(loadBalancerGroups)) + .setLoadBalancerPortIndex(loadBalancerPortIndex) .setLoadBalancerOptions(copyOfMap(loadBalancerOptions)) .setMetadata(copyOfMap(metadata)) @@ -281,6 +289,11 @@ public Optional getHealthcheckTimeoutSeconds() { return healthcheckTimeoutSeconds; } + @ApiModelProperty(required=false, value="Perform healthcheck on this dynamically allocated port (e.g. 0 for first port), defaults to first port") + public Optional getHealthcheckPortIndex() { + return healthcheckPortIndex; + } + @ApiModelProperty(required=false, value="The base path for the API exposed by the deploy. Used in conjunction with the Load balancer API.") public Optional getServiceBasePath() { return serviceBasePath; @@ -296,6 +309,11 @@ public Optional> getLoadBalancerGroups() { return loadBalancerGroups; } + @ApiModelProperty(required=false, value="Send this port to the load balancer api (e.g. 0 for first port), defaults to first port") + public Optional getLoadBalancerPortIndex() { + return loadBalancerPortIndex; + } + @ApiModelProperty(required=false, value="Map (Key/Value) of options for the load balancer.") public Optional> getLoadBalancerOptions() { return loadBalancerOptions; @@ -344,6 +362,7 @@ public String toString() { ", healthcheckUri=" + healthcheckUri + ", healthcheckIntervalSeconds=" + healthcheckIntervalSeconds + ", healthcheckTimeoutSeconds=" + healthcheckTimeoutSeconds + + ", healthcheckPortIndex=" + healthcheckPortIndex + ", skipHealthchecksOnDeploy=" + skipHealthchecksOnDeploy + ", healthcheckProtocol=" + healthcheckProtocol + ", healthcheckMaxRetries=" + healthcheckMaxRetries + @@ -352,6 +371,7 @@ public String toString() { ", considerHealthyAfterRunningForSeconds=" + considerHealthyAfterRunningForSeconds + ", serviceBasePath=" + serviceBasePath + ", loadBalancerGroups=" + loadBalancerGroups + + ", loadBalancerPortIndex=" + loadBalancerPortIndex + ", loadBalancerOptions=" + loadBalancerOptions + ", labels=" + labels + '}'; diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityDeployBuilder.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityDeployBuilder.java index 8dbd3ff5bb..e7b57959e0 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityDeployBuilder.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityDeployBuilder.java @@ -39,6 +39,7 @@ public class SingularityDeployBuilder { private Optional healthcheckUri; private Optional healthcheckIntervalSeconds; private Optional healthcheckTimeoutSeconds; + private Optional healthcheckPortIndex; private Optional skipHealthchecksOnDeploy; private Optional healthcheckProtocol; @@ -51,6 +52,7 @@ public class SingularityDeployBuilder { private Optional serviceBasePath; private Optional> loadBalancerGroups; + private Optional loadBalancerPortIndex; private Optional> loadBalancerOptions; public SingularityDeployBuilder(String requestId, String id) { @@ -75,6 +77,7 @@ public SingularityDeployBuilder(String requestId, String id) { this.healthcheckUri = Optional.absent(); this.healthcheckIntervalSeconds = Optional.absent(); this.healthcheckTimeoutSeconds = Optional.absent(); + this.healthcheckPortIndex = Optional.absent(); this.skipHealthchecksOnDeploy = Optional.absent(); this.deployHealthTimeoutSeconds = Optional.absent(); this.healthcheckProtocol = Optional.absent(); @@ -83,13 +86,14 @@ public SingularityDeployBuilder(String requestId, String id) { this.considerHealthyAfterRunningForSeconds = Optional.absent(); this.serviceBasePath = Optional.absent(); this.loadBalancerGroups = Optional.absent(); + this.loadBalancerPortIndex = Optional.absent(); this.loadBalancerOptions = Optional.absent(); } public SingularityDeploy build() { return new SingularityDeploy(requestId, id, command, arguments, containerInfo, customExecutorCmd, customExecutorId, customExecutorSource, customExecutorResources, customExecutorUser, resources, env, - uris, metadata, executorData, version, timestamp, labels, deployHealthTimeoutSeconds, healthcheckUri, healthcheckIntervalSeconds, healthcheckTimeoutSeconds, healthcheckMaxRetries, - healthcheckMaxTotalTimeoutSeconds, serviceBasePath, loadBalancerGroups, considerHealthyAfterRunningForSeconds, loadBalancerOptions, skipHealthchecksOnDeploy, healthcheckProtocol); + uris, metadata, executorData, version, timestamp, labels, deployHealthTimeoutSeconds, healthcheckUri, healthcheckIntervalSeconds, healthcheckTimeoutSeconds, healthcheckPortIndex, healthcheckMaxRetries, + healthcheckMaxTotalTimeoutSeconds, serviceBasePath, loadBalancerGroups, loadBalancerPortIndex, considerHealthyAfterRunningForSeconds, loadBalancerOptions, skipHealthchecksOnDeploy, healthcheckProtocol); } public String getRequestId() { @@ -285,6 +289,15 @@ public SingularityDeployBuilder setHealthcheckTimeoutSeconds(Optional heal return this; } + public Optional getHealthcheckPortIndex() { + return healthcheckPortIndex; + } + + public SingularityDeployBuilder setHealthcheckPortIndex(Optional healthcheckPortIndex) { + this.healthcheckPortIndex = healthcheckPortIndex; + return this; + } + public Optional getServiceBasePath() { return serviceBasePath; } @@ -303,6 +316,15 @@ public SingularityDeployBuilder setLoadBalancerGroups(Optional> load return this; } + public Optional getLoadBalancerPortIndex() { + return loadBalancerPortIndex; + } + + public SingularityDeployBuilder setLoadBalancerPortIndex(Optional loadBalancerPortIndex) { + this.loadBalancerPortIndex = loadBalancerPortIndex; + return this; + } + public Optional> getLoadBalancerOptions() { return loadBalancerOptions; } @@ -381,6 +403,7 @@ public String toString() { ", healthcheckUri=" + healthcheckUri + ", healthcheckIntervalSeconds=" + healthcheckIntervalSeconds + ", healthcheckTimeoutSeconds=" + healthcheckTimeoutSeconds + + ", healthcheckPortIndex=" + healthcheckPortIndex + ", skipHealthchecksOnDeploy=" + skipHealthchecksOnDeploy + ", healthcheckProtocol=" + healthcheckProtocol + ", healthcheckMaxRetries=" + healthcheckMaxRetries + @@ -389,6 +412,7 @@ public String toString() { ", considerHealthyAfterRunningForSeconds=" + considerHealthyAfterRunningForSeconds + ", serviceBasePath=" + serviceBasePath + ", loadBalancerGroups=" + loadBalancerGroups + + ", loadBalancerPortIndex=" + loadBalancerPortIndex + ", loadBalancerOptions=" + loadBalancerOptions + '}'; } diff --git a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTask.java b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTask.java index 43c6af2a98..5855018891 100644 --- a/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTask.java +++ b/SingularityBase/src/main/java/com/hubspot/singularity/SingularityTask.java @@ -1,5 +1,7 @@ package com.hubspot.singularity; +import java.util.List; + import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.TaskInfo; @@ -45,16 +47,13 @@ public Optional getRackId() { } @JsonIgnore - public Optional getFirstPort() { - for (Resource resource : mesosTask.getResourcesList()) { - if (resource.getName().equals(MesosUtils.PORTS)) { - for (Range range : resource.getRanges().getRangeList()) { - return Optional.of(range.getBegin()); - } - } + public Optional getPortByIndex(int index) { + List ports = MesosUtils.getAllPorts(mesosTask.getResourcesList()); + if (index >= ports.size() || index < 0) { + return Optional.absent(); + } else { + return Optional.of(ports.get(index)); } - - return Optional.absent(); } @Override diff --git a/SingularityService/src/main/java/com/hubspot/singularity/data/SingularityValidator.java b/SingularityService/src/main/java/com/hubspot/singularity/data/SingularityValidator.java index 42bff5a4ee..d1488c50ed 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/data/SingularityValidator.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/data/SingularityValidator.java @@ -191,6 +191,21 @@ public SingularityDeploy checkDeploy(SingularityRequest request, SingularityDepl checkForIllegalResources(request, deploy); + if (deploy.getResources().isPresent()) { + if (deploy.getHealthcheckPortIndex().isPresent()) { + checkBadRequest(deploy.getHealthcheckPortIndex().get() >= 0, "healthcheckPortIndex must be greater than 0"); + checkBadRequest(deploy.getResources().get().getNumPorts() > deploy.getHealthcheckPortIndex().get(), String + .format("Must request %s ports for healthcheckPortIndex %s, only requested %s", deploy.getHealthcheckPortIndex().get() + 1, deploy.getHealthcheckPortIndex().get(), + deploy.getResources().get().getNumPorts())); + } + if (deploy.getLoadBalancerPortIndex().isPresent()) { + checkBadRequest(deploy.getLoadBalancerPortIndex().get() >= 0, "loadBalancerPortIndex must be greater than 0"); + checkBadRequest(deploy.getResources().get().getNumPorts() > deploy.getLoadBalancerPortIndex().get(), String + .format("Must request %s ports for loadBalancerPortIndex %s, only requested %s", deploy.getLoadBalancerPortIndex().get() + 1, deploy.getLoadBalancerPortIndex().get(), + deploy.getResources().get().getNumPorts())); + } + } + checkBadRequest(deploy.getCommand().isPresent() && !deploy.getExecutorData().isPresent() || deploy.getExecutorData().isPresent() && deploy.getCustomExecutorCmd().isPresent() && !deploy.getCommand().isPresent() || deploy.getContainerInfo().isPresent(), diff --git a/SingularityService/src/main/java/com/hubspot/singularity/hooks/LoadBalancerClientImpl.java b/SingularityService/src/main/java/com/hubspot/singularity/hooks/LoadBalancerClientImpl.java index 41e01c912a..39561a9f86 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/hooks/LoadBalancerClientImpl.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/hooks/LoadBalancerClientImpl.java @@ -181,10 +181,10 @@ private List tasksToUpstreams(List tasks, String final List upstreams = Lists.newArrayListWithCapacity(tasks.size()); for (SingularityTask task : tasks) { - final Optional maybeFirstPort = task.getFirstPort(); + final Optional maybeLoadBalancerPort = task.getPortByIndex(task.getTaskRequest().getDeploy().getLoadBalancerPortIndex().or(0)); - if (maybeFirstPort.isPresent()) { - String upstream = String.format("%s:%d", task.getOffer().getHostname(), maybeFirstPort.get()); + if (maybeLoadBalancerPort.isPresent()) { + String upstream = String.format("%s:%d", task.getOffer().getHostname(), maybeLoadBalancerPort.get()); upstreams.add(new UpstreamInfo(upstream, Optional.of(requestId), task.getRackId())); } else { LOG.warn("Task {} is missing port but is being passed to LB ({})", task.getTaskId(), task); diff --git a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityHealthchecker.java b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityHealthchecker.java index f4d5ddf133..fec74287b2 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityHealthchecker.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/scheduler/SingularityHealthchecker.java @@ -1,6 +1,7 @@ package com.hubspot.singularity.scheduler; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -19,6 +20,7 @@ import com.google.common.collect.Maps; import com.google.inject.Inject; import com.google.inject.name.Named; +import com.hubspot.mesos.MesosUtils; import com.hubspot.singularity.HealthcheckProtocol; import com.hubspot.singularity.SingularityAbort; import com.hubspot.singularity.SingularityMainModule; @@ -167,9 +169,9 @@ private Optional getHealthcheckUri(SingularityTask task) { final String hostname = task.getOffer().getHostname(); - Optional firstPort = task.getFirstPort(); + Optional healthcheckPort = task.getPortByIndex(task.getTaskRequest().getDeploy().getHealthcheckPortIndex().or(0)); - if (!firstPort.isPresent() || firstPort.get() < 1L) { + if (!healthcheckPort.isPresent() || healthcheckPort.get() < 1L) { LOG.warn("Couldn't find a port for health check for task {}", task); return Optional.absent(); } @@ -182,7 +184,7 @@ private Optional getHealthcheckUri(SingularityTask task) { HealthcheckProtocol protocol = task.getTaskRequest().getDeploy().getHealthcheckProtocol().or(DEFAULT_HEALTH_CHECK_SCHEME); - return Optional.of(String.format("%s://%s:%d/%s", protocol.getProtocol(), hostname, firstPort.get(), uri)); + return Optional.of(String.format("%s://%s:%d/%s", protocol.getProtocol(), hostname, healthcheckPort.get(), uri)); } private void saveFailure(SingularityHealthcheckAsyncHandler handler, String message) { diff --git a/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularitySchedulerTest.java b/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularitySchedulerTest.java index 422e8c578c..7014c4cc68 100644 --- a/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularitySchedulerTest.java +++ b/SingularityService/src/test/java/com/hubspot/singularity/scheduler/SingularitySchedulerTest.java @@ -2328,4 +2328,39 @@ private SingularityDeployBuilder dockerDeployWithPorts(int numPorts) { return deployBuilder; } + @Test + public void testPortIndices() { + configuration.setNewTaskCheckerBaseDelaySeconds(0); + configuration.setHealthcheckIntervalSeconds(0); + configuration.setDeployHealthyBySeconds(0); + configuration.setKillAfterTasksDoNotRunDefaultSeconds(1); + configuration.setHealthcheckMaxRetries(Optional.of(0)); + + initRequest(); + firstDeploy = initAndFinishDeploy(request, new SingularityDeployBuilder(request.getId(), firstDeployId) + .setCommand(Optional.of("sleep 100")) + .setHealthcheckUri(Optional.of("http://uri")) + .setResources(Optional.of(new Resources(1, 64, 3))) + .setHealthcheckPortIndex(Optional.of(1))); + + requestResource.postRequest(request.toBuilder().setInstances(Optional.of(2)).build()); + scheduler.drainPendingQueue(stateCacheProvider.get()); + + String[] portRange = {"80:82"}; + sms.resourceOffers(driver, Arrays.asList(createOffer(20, 20000, "slave1", "host1", Optional. absent(), Collections.emptyMap(), portRange))); + + SingularityTaskId firstTaskId = taskManager.getActiveTaskIdsForRequest(requestId).get(0); + + SingularityTask firstTask = taskManager.getTask(firstTaskId).get(); + statusUpdate(firstTask, TaskState.TASK_RUNNING); + + newTaskChecker.enqueueNewTaskCheck(firstTask, requestManager.getRequest(requestId), healthchecker); + + finishNewTaskChecks(); + finishHealthchecks(); + finishNewTaskChecksAndCleanup(); + + Assert.assertTrue(taskManager.getLastHealthcheck(firstTask.getTaskId()).get().toString().contains("host1:81")); + } + }