Skip to content

Commit

Permalink
More usagge collection in parallel, less webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
ssalinas committed Mar 27, 2018
1 parent 26e2aa1 commit 4f9a3a3
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 178 deletions.
Expand Up @@ -75,6 +75,8 @@ public class SingularityConfiguration extends Configuration {

private int usageIntervalSeconds = 5760; // 15 saved each 5760 seconds (96 min) apart is 1 day of usage

private int maxConcurrentUsageCollections = 15;

private boolean shuffleTasksForOverloadedSlaves = false; // recommended 'true' when oversubscribing cpu for larger clusters

private int maxTasksToShuffleTotal = 6; // Do not allow more than this many shuffle cleanups at once cluster-wide
Expand Down Expand Up @@ -326,6 +328,8 @@ public class SingularityConfiguration extends Configuration {
@Valid
private WebhookAuthConfiguration webhookAuthConfiguration = new WebhookAuthConfiguration();

private int maxConcurrentWebhooks = 100;

@JsonProperty("auth")
@NotNull
@Valid
Expand Down Expand Up @@ -1313,6 +1317,14 @@ public void setWebhookAuthConfiguration(WebhookAuthConfiguration webhookAuthConf
this.webhookAuthConfiguration = webhookAuthConfiguration;
}

public int getMaxConcurrentWebhooks() {
return maxConcurrentWebhooks;
}

public void setMaxConcurrentWebhooks(int maxConcurrentWebhooks) {
this.maxConcurrentWebhooks = maxConcurrentWebhooks;
}

public void setLdapConfiguration(LDAPConfiguration ldapConfiguration) {
this.ldapConfiguration = ldapConfiguration;
}
Expand Down Expand Up @@ -1486,6 +1498,14 @@ public void setUsageIntervalSeconds(int usageIntervalSeconds) {
this.usageIntervalSeconds = usageIntervalSeconds;
}

public int getMaxConcurrentUsageCollections() {
return maxConcurrentUsageCollections;
}

public void setMaxConcurrentUsageCollections(int maxConcurrentUsageCollections) {
this.maxConcurrentUsageCollections = maxConcurrentUsageCollections;
}

public int getMaxTasksToShufflePerHost() {
return maxTasksToShufflePerHost;
}
Expand Down
@@ -1,5 +1,7 @@
package com.hubspot.singularity.hooks;

import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,6 +19,8 @@ public abstract class AbstractSingularityWebhookAsyncHandler<T> extends AsyncCom
private final long start;
private final boolean shouldDeleteUpdateOnFailure;

private CompletableFuture<Void> completableFuture;

public AbstractSingularityWebhookAsyncHandler(SingularityWebhook webhook, T update, boolean shouldDeleteUpdateOnFailure) {
this.webhook = webhook;
this.update = update;
Expand All @@ -32,6 +36,10 @@ public void onThrowable(Throwable t) {
if (shouldDeleteUpdateOnFailure) {
deleteWebhookUpdate();
}

if (completableFuture != null) {
completableFuture.completeExceptionally(t);
}
}

public boolean shouldDeleteUpdateDueToQueueAboveCapacity() {
Expand All @@ -50,9 +58,16 @@ public Response onCompleted(Response response) throws Exception {
deleteWebhookUpdate();
}

if (completableFuture != null) {
completableFuture.complete(null);
}

return response;
}

public abstract void deleteWebhookUpdate();

public void setCompletableFuture(CompletableFuture<Void> completableFuture) {
this.completableFuture = completableFuture;
}
}
@@ -1,7 +1,11 @@
package com.hubspot.singularity.hooks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.inject.Singleton;
Expand All @@ -14,6 +18,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityDeployUpdate;
Expand All @@ -22,6 +27,8 @@
import com.hubspot.singularity.SingularityTaskHistoryUpdate;
import com.hubspot.singularity.SingularityTaskWebhook;
import com.hubspot.singularity.SingularityWebhook;
import com.hubspot.singularity.async.AsyncSemaphore;
import com.hubspot.singularity.async.CompletableFutures;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.WebhookManager;
import com.hubspot.singularity.data.history.TaskHistoryHelper;
Expand All @@ -39,13 +46,19 @@ public class SingularityWebhookSender {
private final TaskHistoryHelper taskHistoryHelper;
private final ObjectMapper objectMapper;

private final AsyncSemaphore<Void> webhookSemaphore;
private final ExecutorService webhookExecutorService;

@Inject
public SingularityWebhookSender(SingularityConfiguration configuration, AsyncHttpClient http, ObjectMapper objectMapper, TaskHistoryHelper taskHistoryHelper, WebhookManager webhookManager) {
this.configuration = configuration;
this.http = http;
this.webhookManager = webhookManager;
this.taskHistoryHelper = taskHistoryHelper;
this.objectMapper = objectMapper;

this.webhookSemaphore = AsyncSemaphore.newBuilder(configuration::getMaxConcurrentWebhooks).build();
this.webhookExecutorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("webhooks-%d").build());
}

public void checkWebhooks() {
Expand All @@ -60,23 +73,30 @@ public void checkWebhooks() {
int requestUpdates = 0;
int deployUpdates = 0;

for (SingularityWebhook webhook : webhooks) {
List<CompletableFuture<Void>> webhookFutures = new ArrayList<>();

for (SingularityWebhook webhook : webhooks) {
switch (webhook.getType()) {
case TASK:
taskUpdates += checkTaskUpdates(webhook);
taskUpdates += checkTaskUpdates(webhook, webhookFutures);
break;
case REQUEST:
requestUpdates += checkRequestUpdates(webhook);
requestUpdates += checkRequestUpdates(webhook, webhookFutures);
break;
case DEPLOY:
deployUpdates += checkDeployUpdates(webhook);
deployUpdates += checkDeployUpdates(webhook, webhookFutures);
break;
default:
break;
}
}

CompletableFutures.allOf(webhookFutures)
.exceptionally((t) -> {
LOG.error("Exception in webhook", t);
return null;
});

LOG.info("Sent {} task, {} request, and {} deploy updates for {} webhooks in {}", taskUpdates, requestUpdates, deployUpdates, webhooks.size(), JavaUtils.duration(start));
}

Expand All @@ -85,37 +105,45 @@ private boolean shouldDeleteUpdateOnFailure(int numUpdates, long updateTimestamp
return true;
}
final long updateAge = System.currentTimeMillis() - updateTimestamp;
if (configuration.getDeleteUndeliverableWebhooksAfterHours() > 0 && updateAge > TimeUnit.HOURS.toMillis(configuration.getDeleteUndeliverableWebhooksAfterHours())) {
return true;
}
return false;
return configuration.getDeleteUndeliverableWebhooksAfterHours() > 0
&& updateAge > TimeUnit.HOURS.toMillis(configuration.getDeleteUndeliverableWebhooksAfterHours());
}

private int checkRequestUpdates(SingularityWebhook webhook) {
private int checkRequestUpdates(SingularityWebhook webhook, List<CompletableFuture<Void>> webhookFutures) {
final List<SingularityRequestHistory> requestUpdates = webhookManager.getQueuedRequestHistoryForHook(webhook.getId());

int numRequestUpdates = 0;

for (SingularityRequestHistory requestUpdate : requestUpdates) {
executeWebhook(webhook, requestUpdate, new SingularityRequestWebhookAsyncHandler(webhookManager, webhook, requestUpdate, shouldDeleteUpdateOnFailure(numRequestUpdates, requestUpdate.getCreatedAt())));
webhookSemaphore.call(() ->
executeWebhookAsync(
webhook,
requestUpdate,
new SingularityRequestWebhookAsyncHandler(webhookManager, webhook, requestUpdate, shouldDeleteUpdateOnFailure(numRequestUpdates, requestUpdate.getCreatedAt())))
);
}

return requestUpdates.size();
}

private int checkDeployUpdates(SingularityWebhook webhook) {
private int checkDeployUpdates(SingularityWebhook webhook, List<CompletableFuture<Void>> webhookFuture) {
final List<SingularityDeployUpdate> deployUpdates = webhookManager.getQueuedDeployUpdatesForHook(webhook.getId());

int numDeployUpdates = 0;

for (SingularityDeployUpdate deployUpdate : deployUpdates) {
executeWebhook(webhook, deployUpdate, new SingularityDeployWebhookAsyncHandler(webhookManager, webhook, deployUpdate, shouldDeleteUpdateOnFailure(numDeployUpdates, deployUpdate.getDeployMarker().getTimestamp())));
webhookSemaphore.call(() ->
executeWebhookAsync(
webhook,
deployUpdate,
new SingularityDeployWebhookAsyncHandler(webhookManager, webhook, deployUpdate, shouldDeleteUpdateOnFailure(numDeployUpdates, deployUpdate.getDeployMarker().getTimestamp())))
);
}

return deployUpdates.size();
}

private int checkTaskUpdates(SingularityWebhook webhook) {
private int checkTaskUpdates(SingularityWebhook webhook, List<CompletableFuture<Void>> webhookFuture) {
final List<SingularityTaskHistoryUpdate> taskUpdates = webhookManager.getQueuedTaskUpdatesForHook(webhook.getId());

int numTaskUpdates = 0;
Expand All @@ -130,18 +158,21 @@ private int checkTaskUpdates(SingularityWebhook webhook) {
continue;
}

executeWebhook(webhook, new SingularityTaskWebhook(task.get(), taskUpdate), new SingularityTaskWebhookAsyncHandler(webhookManager, webhook, taskUpdate, shouldDeleteUpdateOnFailure(numTaskUpdates, taskUpdate.getTimestamp())));
webhookSemaphore.call(() ->
executeWebhookAsync(
webhook,
new SingularityTaskWebhook(task.get(), taskUpdate),
new SingularityTaskWebhookAsyncHandler(webhookManager, webhook, taskUpdate, shouldDeleteUpdateOnFailure(numTaskUpdates, taskUpdate.getTimestamp())))
);
}

return taskUpdates.size();
}

// TODO handle retries, errors.
private <T> void executeWebhook(SingularityWebhook webhook, Object payload, AbstractSingularityWebhookAsyncHandler<T> handler) {
private <T> CompletableFuture<Void> executeWebhookAsync(SingularityWebhook webhook, Object payload, AbstractSingularityWebhookAsyncHandler<T> handler) {
LOG.trace("Sending {} to {}", payload, webhook.getUri());

BoundRequestBuilder postRequest = http.preparePost(webhook.getUri());

postRequest.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");

try {
Expand All @@ -150,15 +181,19 @@ private <T> void executeWebhook(SingularityWebhook webhook, Object payload, Abst
throw Throwables.propagate(e);
}

CompletableFuture<Void> webhookFuture = new CompletableFuture<>();
try {
handler.setCompletableFuture(webhookFuture);
postRequest.execute(handler);
} catch (IOException e) {
LOG.warn("Couldn't execute webhook to {}", webhook.getUri(), e);

if (handler.shouldDeleteUpdateDueToQueueAboveCapacity()) {
handler.deleteWebhookUpdate();
}
webhookFuture.completeExceptionally(e);
}
return webhookFuture;
}


Expand Down

0 comments on commit 4f9a3a3

Please sign in to comment.