Skip to content

Commit

Permalink
comparing upstreams for each service
Browse files Browse the repository at this point in the history
typo
  • Loading branch information
sjeropkipruto committed Dec 11, 2018
1 parent 5d2f676 commit d70ea9b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 73 deletions.
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.Set;

import com.google.common.base.Optional;
import com.hubspot.baragon.models.UpstreamInfo;
import com.hubspot.singularity.LoadBalancerRequestType.LoadBalancerRequestId;
import com.hubspot.singularity.SingularityDeploy;
Expand All @@ -21,5 +22,7 @@ public interface LoadBalancerClient {

SingularityLoadBalancerUpdate delete(LoadBalancerRequestId loadBalancerRequestId, String requestId, Set<String> loadBalancerGroups, String serviceBasePath);

Collection<UpstreamInfo> getUpstreams();
Collection<UpstreamInfo> getBaragonUpstreamsForRequest (String requestId);

List<UpstreamInfo> tasksToUpstreams(List<SingularityTask> tasks, String requestId, Optional<String> loadBalancerUpstreamGroup);
}
Expand Up @@ -72,32 +72,16 @@ public LoadBalancerClientImpl(SingularityConfiguration configuration, ObjectMapp
this.mesosProtosUtils = mesosProtosUtils;
}

public String getLoadBalancerUri() {
return loadBalancerUri;
}

public Optional<Map<String, String>> getLoadBalancerQueryParams() {
return loadBalancerQueryParams;
}

public long getLoadBalancerTimeoutMillis() {
return loadBalancerTimeoutMillis;
}

public Optional<String> getTaskLabelForLoadBalancerUpstreamGroup() {
return taskLabelForLoadBalancerUpstreamGroup;
}

public MesosProtosUtils getMesosProtosUtils() {
return mesosProtosUtils;
private String getStateUriFromRequestUri (){
return loadBalancerUri.replace("request", "state");
}

private String getLoadBalancerStateUri (){
return loadBalancerUri.replace("request", "state");
private String getLoadBalancerStateUri (String requestId){
return String.format(OPERATION_URI, getStateUriFromRequestUri(), requestId);
}

private Collection<BaragonServiceState> getBaragonServiceStates () {
final String loadBalancerStateUri = getLoadBalancerStateUri();
private Collection<BaragonServiceState> getBaragonServiceStateForLoadBalancerRequest (String requestId) {
final String loadBalancerStateUri = getLoadBalancerStateUri(requestId);
final BoundRequestBuilder requestBuilder = httpClient.prepareGet(loadBalancerStateUri);
final Request request = requestBuilder.build();
try {
Expand All @@ -113,8 +97,8 @@ private Collection<BaragonServiceState> getBaragonServiceStates () {
}
}

public Collection<UpstreamInfo> getUpstreams () {
Collection<BaragonServiceState> baragonServiceStates = getBaragonServiceStates();
public Collection<UpstreamInfo> getBaragonUpstreamsForRequest (String requestId) {
Collection<BaragonServiceState> baragonServiceStates = getBaragonServiceStateForLoadBalancerRequest(requestId);
return baragonServiceStates.stream()
.flatMap(bbs -> bbs.getUpstreams().stream())
.collect(Collectors.toList());
Expand Down Expand Up @@ -238,7 +222,7 @@ public SingularityLoadBalancerUpdate enqueue(LoadBalancerRequestId loadBalancerR
return sendBaragonRequest(loadBalancerRequestId, loadBalancerRequest, LoadBalancerMethod.ENQUEUE);
}

private List<UpstreamInfo> tasksToUpstreams(List<SingularityTask> tasks, String requestId, Optional<String> loadBalancerUpstreamGroup) {
public List<UpstreamInfo> tasksToUpstreams(List<SingularityTask> tasks, String requestId, Optional<String> loadBalancerUpstreamGroup) {
final List<UpstreamInfo> upstreams = Lists.newArrayListWithCapacity(tasks.size());

for (SingularityTask task : tasks) {
Expand Down
@@ -1,80 +1,65 @@
package com.hubspot.singularity.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import javax.inject.Inject;
import javax.inject.Singleton;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.hubspot.baragon.models.UpstreamInfo;
import com.hubspot.mesos.protos.MesosParameter;
import com.hubspot.singularity.LoadBalancerRequestType;
import com.hubspot.singularity.LoadBalancerRequestType.LoadBalancerRequestId;
import com.hubspot.singularity.SingularityRequest;
import com.hubspot.singularity.SingularityTask;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.helpers.MesosUtils;
import com.hubspot.singularity.hooks.LoadBalancerClient;
import com.hubspot.singularity.hooks.LoadBalancerClientImpl;

@Singleton
public class SingularityUpstreamChecker {

private static final Logger LOG = LoggerFactory.getLogger(LoadBalancerClient.class);

private final LoadBalancerClientImpl lbClient;
private final TaskManager taskManager;
private final RequestManager requestManager;

@Inject
public SingularityUpstreamChecker(LoadBalancerClientImpl lbClient, TaskManager taskManager) {
public SingularityUpstreamChecker(LoadBalancerClientImpl lbClient, TaskManager taskManager, RequestManager requestManager) {
this.lbClient = lbClient;
this.taskManager = taskManager;
this.requestManager = requestManager;
}

private UpstreamInfo taskToUpstreamInfo (SingularityTask task, Optional<String> loadBalancerUpstreamGroup) {
UpstreamInfo upstreamInfo = null; //TODO: what is a better way to do this? We cannot initialize an empty upstream
final Optional<Long> maybeLoadBalancerPort = MesosUtils.getPortByIndex(lbClient.getMesosProtosUtils().toResourceList(task.getMesosTask().getResources()), task.getTaskRequest().getDeploy().getLoadBalancerPortIndex().or(0));
if (maybeLoadBalancerPort.isPresent()) {
final String host = task.getHostname();
final long port = maybeLoadBalancerPort.get();
String upstream = String.format("%s:%d", host, port);
final Optional<String> requestId = Optional.of(task.getTaskRequest().getRequest().getId());
Optional<String> group = loadBalancerUpstreamGroup;
if (lbClient.getTaskLabelForLoadBalancerUpstreamGroup().isPresent()) {
for (MesosParameter label : task.getMesosTask().getLabels().getLabels()) {
if (label.hasKey() && label.getKey().equals(lbClient.getTaskLabelForLoadBalancerUpstreamGroup().get()) && label.hasValue()) {
group = Optional.of(label.getValue());
break;
}
}
private List<SingularityTask> getActiveSingularityTasksForRequest (String requestId) {
List<SingularityTask> activeSingularityTasksForRequest = new ArrayList<>();
for (SingularityTaskId taskId: taskManager.getActiveTaskIdsForRequest(requestId)){
Optional<SingularityTask> maybeTask = taskManager.getTask(taskId);
if (maybeTask.isPresent()) {
activeSingularityTasksForRequest.add(maybeTask.get());
}
upstreamInfo = new UpstreamInfo(upstream, requestId, task.getRackId(), Optional.<String>absent(), group);
} else {
LOG.warn("Task {} is missing port, not passed to upstreams({})", task.getTaskId(), task);
}
return upstreamInfo;
return activeSingularityTasksForRequest;
}

public Collection<UpstreamInfo> getUpstreamsFromActiveTasks() {
final List<UpstreamInfo> upstreams = Lists.newArrayListWithCapacity(taskManager.getNumActiveTasks());
for (SingularityTask task: taskManager.getActiveTasks()){
final Optional<String> loadBalancerUpstreamGroup = task.getTaskRequest().getDeploy().getLoadBalancerUpstreamGroup();
final UpstreamInfo upstream = taskToUpstreamInfo(task, loadBalancerUpstreamGroup);
if (upstream.equals(null)){
upstreams.add(upstream);
}
}
return upstreams;
private Collection<UpstreamInfo> getUpstreamsFromActiveTasks(String requestId, Optional<String> loadBalancerUpstreamGroup) {
return lbClient.tasksToUpstreams(getActiveSingularityTasksForRequest(requestId), requestId, loadBalancerUpstreamGroup);
}

private void syncUpstreams( ) {
Collection<UpstreamInfo> upstreamsInBaragon = lbClient.getUpstreams();
Collection<UpstreamInfo> upstreamsInSingularity = getUpstreamsFromActiveTasks();
upstreamsInBaragon.removeAll(upstreamsInSingularity);
for (UpstreamInfo upstream: upstreamsInBaragon){
private void syncUpstreamsForService(SingularityRequest request, Optional<String> loadBalancerUpstreamGroup) {
String requestId = request.getId();
Collection<UpstreamInfo> upstreamsInBaragonForRequest = lbClient.getBaragonUpstreamsForRequest(requestId);
Collection<UpstreamInfo> upstreamsInSingularityForRequest = getUpstreamsFromActiveTasks(requestId, loadBalancerUpstreamGroup);
upstreamsInBaragonForRequest.removeAll(upstreamsInSingularityForRequest);
LoadBalancerRequestId loadBalancerRequestId = new LoadBalancerRequestId(requestId, LoadBalancerRequestType.REMOVE, Optional.absent());
for (UpstreamInfo upstream: upstreamsInBaragonForRequest){
//TODO: remove the upstream from Baragon
}
}

public void syncUpstreams() {
// TODO: check through the active requests and run the method above

}
}
Expand Up @@ -27,6 +27,6 @@ public class SingularityUpstreamPoller extends SingularityLeaderOnlyPoller {
@Override
public void runActionOnPoll() {
LOG.info("Checking upstreams");
upstreamChecker.checkUpstreams();
upstreamChecker.syncUpstreams();
}
}
Expand Up @@ -53,7 +53,12 @@ public SingularityLoadBalancerUpdate delete(LoadBalancerRequestId loadBalancerRe
}

@Override
public Collection<UpstreamInfo> getUpstreams() {
public Collection<UpstreamInfo> getBaragonUpstreamsForRequest(String requestId) {
return Collections.emptyList();
}

@Override
public List<UpstreamInfo> tasksToUpstreams(List<SingularityTask> tasks, String requestId, Optional<String> loadBalancerUpstreamGroup) {
return Collections.emptyList();
}

Expand Down

0 comments on commit d70ea9b

Please sign in to comment.