Skip to content

Commit

Permalink
sometimes send output to debug
Browse files Browse the repository at this point in the history
  • Loading branch information
wsorenson committed Sep 29, 2015
1 parent d3a0444 commit 430b10f
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 39 deletions.
Expand Up @@ -82,6 +82,10 @@ public class SingularityConfiguration extends Configuration {

private long deployHealthyBySeconds = 120;

private long debugCuratorCallOverBytes = 5000;

private long debugCuratorCallOverMillis = 1000;

private boolean enableCorsFilter = false;

private long healthcheckIntervalSeconds = 5;
Expand Down Expand Up @@ -246,6 +250,22 @@ public int getCooldownAfterFailures() {
return cooldownAfterFailures;
}

public long getDebugCuratorCallOverBytes() {
return debugCuratorCallOverBytes;
}

public void setDebugCuratorCallOverBytes(long debugCuratorCallOverBytes) {
this.debugCuratorCallOverBytes = debugCuratorCallOverBytes;
}

public long getDebugCuratorCallOverMillis() {
return debugCuratorCallOverMillis;
}

public void setDebugCuratorCallOverMillis(long debugCuratorCallOverMillis) {
this.debugCuratorCallOverMillis = debugCuratorCallOverMillis;
}

public double getCooldownAfterPctOfInstancesFail() {
return cooldownAfterPctOfInstancesFail;
}
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.hubspot.singularity.SingularityDeleteResult;
import com.hubspot.singularity.SingularityMachineAbstraction;
import com.hubspot.singularity.SingularityMachineStateHistoryUpdate;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.transcoders.Transcoder;

public abstract class AbstractMachineManager<T extends SingularityMachineAbstraction<T>> extends CuratorAsyncManager {
Expand All @@ -27,8 +28,8 @@ public abstract class AbstractMachineManager<T extends SingularityMachineAbstrac
private final Transcoder<T> transcoder;
private final Transcoder<SingularityMachineStateHistoryUpdate> historyTranscoder;

public AbstractMachineManager(CuratorFramework curator, long zkAsyncTimeout, Transcoder<T> transcoder, Transcoder<SingularityMachineStateHistoryUpdate> historyTranscoder) {
super(curator, zkAsyncTimeout);
public AbstractMachineManager(CuratorFramework curator, SingularityConfiguration configuration, Transcoder<T> transcoder, Transcoder<SingularityMachineStateHistoryUpdate> historyTranscoder) {
super(curator, configuration);

this.transcoder = transcoder;
this.historyTranscoder = historyTranscoder;
Expand Down
Expand Up @@ -6,7 +6,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
Expand All @@ -15,10 +14,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityId;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.transcoders.IdTranscoder;
import com.hubspot.singularity.data.transcoders.Transcoder;
import com.hubspot.singularity.data.transcoders.Transcoders;
Expand All @@ -27,12 +27,8 @@ public abstract class CuratorAsyncManager extends CuratorManager {

private static final Logger LOG = LoggerFactory.getLogger(CuratorAsyncManager.class);

private final long zkAsyncTimeout;

public CuratorAsyncManager(CuratorFramework curator, long zkAsyncTimeout) {
super(curator);

this.zkAsyncTimeout = zkAsyncTimeout;
public CuratorAsyncManager(CuratorFramework curator, SingularityConfiguration configuration) {
super(curator, configuration);
}

private <T> List<T> getAsyncChildrenThrows(final String parent, final Transcoder<T> transcoder) throws Exception {
Expand All @@ -55,7 +51,7 @@ private <T> List<T> getAsyncThrows(final String pathNameForLogs, final Collectio

final CountDownLatch latch = new CountDownLatch(paths.size());
final AtomicInteger missing = new AtomicInteger();
final AtomicLong bytes = new AtomicLong();
final AtomicInteger bytes = new AtomicInteger();

final BackgroundCallback callback = new BackgroundCallback() {

Expand Down Expand Up @@ -86,14 +82,14 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex

checkLatch(latch, pathNameForLogs);

LOG.trace("Fetched {} objects ({} bytes) in {} ({} - missing {})", objects.size(), bytes.get(), JavaUtils.duration(start), pathNameForLogs, missing.intValue());
log("Fetched", Optional.<Integer> of(objects.size()), Optional.<Integer> of(bytes.get()), start, pathNameForLogs);

return objects;
}

private void checkLatch(CountDownLatch latch, String path) throws InterruptedException {
if (!latch.await(zkAsyncTimeout, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException(String.format("Timed out waiting response for objects from %s, waited %s millis", path, zkAsyncTimeout));
if (!latch.await(configuration.getZookeeperAsyncTimeout(), TimeUnit.MILLISECONDS)) {
throw new IllegalStateException(String.format("Timed out waiting response for objects from %s, waited %s millis", path, configuration.getZookeeperAsyncTimeout()));
}
}

Expand Down Expand Up @@ -134,7 +130,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex

checkLatch(latch, pathNameforLogs);

LOG.trace("Fetched {} objects in {} ({} - missing {})", objects.size(), JavaUtils.duration(start), pathNameforLogs, missing.intValue());
log("Fetched", Optional.<Integer> of(objects.size()), Optional.<Integer> absent(), start, pathNameforLogs);

return objects;
}
Expand Down Expand Up @@ -184,7 +180,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex

checkLatch(latch, pathNameforLogs);

LOG.trace("Found {} objects in {} (out of {} from {})", objects.size(), JavaUtils.duration(start), paths.size(), pathNameforLogs);
log("Fetched", Optional.<Integer> of(objects.size()), Optional.<Integer> absent(), start, pathNameforLogs);

return objects;
}
Expand Down
Expand Up @@ -15,9 +15,9 @@

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityCreateResult;
import com.hubspot.singularity.SingularityDeleteResult;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.transcoders.StringTranscoder;
import com.hubspot.singularity.data.transcoders.Transcoder;

Expand All @@ -26,12 +26,25 @@ public abstract class CuratorManager {
private static final Logger LOG = LoggerFactory.getLogger(CuratorManager.class);
private static final byte[] EMPTY_BYTES = new byte[0];

protected final SingularityConfiguration configuration;
protected final CuratorFramework curator;

public CuratorManager(CuratorFramework curator) {
public CuratorManager(CuratorFramework curator, SingularityConfiguration configuration) {
this.configuration = configuration;
this.curator = curator;
}

protected void log(String type, Optional<Integer> numItems, Optional<Integer> bytes, long start, String path) {
final String message = String.format("%s (items: %s) (bytes: %s) in %s", type, numItems.or(1), bytes.or(0), path);

if (bytes.isPresent() && bytes.get() > configuration.getDebugCuratorCallOverBytes()
|| System.currentTimeMillis() - start > configuration.getDebugCuratorCallOverMillis()) {
LOG.debug(message);
} else {
LOG.trace(message);
}
}

protected int getNumChildren(String path) {
try {
Stat s = curator.checkExists().forPath(path);
Expand Down Expand Up @@ -77,7 +90,7 @@ protected List<String> getChildren(String root) {
} catch (Throwable t) {
throw Throwables.propagate(t);
} finally {
LOG.trace("Got {} children in {} ({})", numChildren, JavaUtils.duration(start), root);
log("Fetched children", Optional.of(numChildren), Optional.<Integer> absent(), start, root);
}
}

Expand All @@ -93,7 +106,7 @@ protected SingularityDeleteResult delete(String path) {
} catch (Throwable t) {
throw Throwables.propagate(t);
} finally {
LOG.trace("Deleted in {} ({})", JavaUtils.duration(start), path);
log("Deleted", Optional.<Integer> absent(), Optional.<Integer> absent(), start, path);
}
}

Expand All @@ -117,7 +130,7 @@ protected SingularityCreateResult create(String path, Optional<byte[]> data) {
} catch (Throwable t) {
throw Throwables.propagate(t);
} finally {
LOG.trace("Created {} bytes in {} ({})", data.or(EMPTY_BYTES).length, JavaUtils.duration(start), path);
log("Created", Optional.<Integer> absent(), Optional.<Integer> of(data.or(EMPTY_BYTES).length), start, path);
}
}

Expand Down Expand Up @@ -148,7 +161,7 @@ protected SingularityCreateResult save(String path, Optional<byte[]> data) {
} catch (Throwable t) {
throw Throwables.propagate(t);
} finally {
LOG.trace("Saved {} bytes in {} ({})", data.or(EMPTY_BYTES).length, JavaUtils.duration(start), path);
log("Saved", Optional.<Integer> absent(), Optional.<Integer> of(data.or(EMPTY_BYTES).length), start, path);
}
}

Expand All @@ -170,13 +183,13 @@ protected SingularityCreateResult set(String path, Optional<byte[]> data) {
} catch (Throwable t) {
throw Throwables.propagate(t);
} finally {
LOG.trace("Set {} bytes in {} ({})", data.or(EMPTY_BYTES).length, JavaUtils.duration(start), path);
log("Set", Optional.<Integer> absent(), Optional.<Integer> of(data.or(EMPTY_BYTES).length), start, path);
}
}

protected <T> Optional<T> getData(String path, Optional<Stat> stat, Transcoder<T> transcoder) {
final long start = System.currentTimeMillis();
long bytes = 0;
int bytes = 0;

try {
GetDataBuilder bldr = curator.getData();
Expand All @@ -200,7 +213,7 @@ protected <T> Optional<T> getData(String path, Optional<Stat> stat, Transcoder<T
} catch (Throwable t) {
throw Throwables.propagate(t);
} finally {
LOG.trace("Got {} bytes in {} ({})", bytes, JavaUtils.duration(start), path);
log("Fetched", Optional.<Integer> absent(), Optional.<Integer> of(bytes), start, path);
}
}

Expand Down
Expand Up @@ -65,10 +65,10 @@ public class DeployManager extends CuratorAsyncManager {
private static final String DEPLOY_RESULT_KEY = "RESULT_STATE";

@Inject
public DeployManager(SingularityConfiguration configuration, CuratorFramework curator, SingularityEventListener singularityEventListener, Transcoder<SingularityDeploy> deployTranscoder,
public DeployManager(CuratorFramework curator, SingularityConfiguration configuration, SingularityEventListener singularityEventListener, Transcoder<SingularityDeploy> deployTranscoder,
Transcoder<SingularityRequestDeployState> requestDeployStateTranscoder, Transcoder<SingularityPendingDeploy> pendingDeployTranscoder, Transcoder<SingularityDeployMarker> deployMarkerTranscoder,
Transcoder<SingularityDeployStatistics> deployStatisticsTranscoder, Transcoder<SingularityDeployResult> deployStateTranscoder, IdTranscoder<SingularityDeployKey> deployKeyTranscoder) {
super(curator, configuration.getZookeeperAsyncTimeout());
super(curator, configuration);

this.singularityEventListener = singularityEventListener;
this.pendingDeployTranscoder = pendingDeployTranscoder;
Expand Down
Expand Up @@ -12,6 +12,7 @@
import com.google.inject.Singleton;
import com.hubspot.singularity.SingularityRequest;
import com.hubspot.singularity.config.EmailConfigurationEnums.EmailType;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.transcoders.StringTranscoder;

@Singleton
Expand All @@ -24,8 +25,8 @@ public class MetadataManager extends CuratorManager {
private static final String MAIL_IN_COOLDOWN_MARKER_KEY = "COOLDOWN_ACTIVE";

@Inject
public MetadataManager(CuratorFramework curator) {
super(curator);
public MetadataManager(CuratorFramework curator, SingularityConfiguration configuration) {
super(curator, configuration);
}

private String getMailRecordPathForRequest(String requestId) {
Expand Down
Expand Up @@ -16,7 +16,7 @@ public class RackManager extends AbstractMachineManager<SingularityRack> {

@Inject
public RackManager(CuratorFramework curator, SingularityConfiguration configuration, Transcoder<SingularityRack> rackTranscoder, Transcoder<SingularityMachineStateHistoryUpdate> stateHistoryTranscoder) {
super(curator, configuration.getZookeeperAsyncTimeout(), rackTranscoder, stateHistoryTranscoder);
super(curator, configuration, rackTranscoder, stateHistoryTranscoder);
}

@Override
Expand Down
Expand Up @@ -50,9 +50,9 @@ public class RequestManager extends CuratorAsyncManager {
private static final String HISTORY_PATH_ROOT = REQUEST_ROOT + "/history";

@Inject
public RequestManager(SingularityConfiguration configuration, CuratorFramework curator, SingularityEventListener singularityEventListener, Transcoder<SingularityRequestCleanup> requestCleanupTranscoder,
public RequestManager(CuratorFramework curator, SingularityConfiguration configuration, SingularityEventListener singularityEventListener, Transcoder<SingularityRequestCleanup> requestCleanupTranscoder,
Transcoder<SingularityRequestWithState> requestTranscoder, Transcoder<SingularityPendingRequest> pendingRequestTranscoder, Transcoder<SingularityRequestHistory> requestHistoryTranscoder) {
super(curator, configuration.getZookeeperAsyncTimeout());
super(curator, configuration);

this.requestTranscoder = requestTranscoder;
this.requestCleanupTranscoder = requestCleanupTranscoder;
Expand Down
Expand Up @@ -16,7 +16,7 @@ public class SlaveManager extends AbstractMachineManager<SingularitySlave> {

@Inject
public SlaveManager(CuratorFramework curator, SingularityConfiguration configuration, Transcoder<SingularitySlave> slaveTranscoder, Transcoder<SingularityMachineStateHistoryUpdate> stateHistoryTranscoder) {
super(curator, configuration.getZookeeperAsyncTimeout(), slaveTranscoder, stateHistoryTranscoder);
super(curator, configuration, slaveTranscoder, stateHistoryTranscoder);
}

@Override
Expand Down
Expand Up @@ -53,9 +53,9 @@ public class StateManager extends CuratorManager {
private final SingularityAuthDatastore authDatastore;

@Inject
public StateManager(CuratorFramework curatorFramework, RequestManager requestManager, TaskManager taskManager, DeployManager deployManager, SlaveManager slaveManager, RackManager rackManager,
public StateManager(CuratorFramework curatorFramework, SingularityConfiguration configuration, RequestManager requestManager, TaskManager taskManager, DeployManager deployManager, SlaveManager slaveManager, RackManager rackManager,
Transcoder<SingularityState> stateTranscoder, Transcoder<SingularityHostState> hostStateTranscoder, SingularityConfiguration singularityConfiguration, SingularityAuthDatastore authDatastore) {
super(curatorFramework);
super(curatorFramework, configuration);

this.requestManager = requestManager;
this.taskManager = taskManager;
Expand Down
Expand Up @@ -95,12 +95,12 @@ public class TaskManager extends CuratorAsyncManager {
private final String serverId;

@Inject
public TaskManager(SingularityConfiguration configuration, CuratorFramework curator, SingularityEventListener singularityEventListener, IdTranscoder<SingularityPendingTaskId> pendingTaskIdTranscoder,
public TaskManager(CuratorFramework curator, SingularityConfiguration configuration, SingularityEventListener singularityEventListener, IdTranscoder<SingularityPendingTaskId> pendingTaskIdTranscoder,
IdTranscoder<SingularityTaskId> taskIdTranscoder, Transcoder<SingularityLoadBalancerUpdate> taskLoadBalancerHistoryUpdateTranscoder,
Transcoder<SingularityTaskStatusHolder> taskStatusTranscoder, Transcoder<SingularityTaskHealthcheckResult> healthcheckResultTranscoder, Transcoder<SingularityTask> taskTranscoder,
Transcoder<SingularityTaskCleanup> taskCleanupTranscoder, Transcoder<SingularityTaskHistoryUpdate> taskHistoryUpdateTranscoder, Transcoder<SingularityPendingTask> pendingTaskTranscoder,
Transcoder<SingularityKilledTaskIdRecord> killedTaskIdRecordTranscoder, @Named(SingularityMainModule.SERVER_ID_PROPERTY) String serverId) {
super(curator, configuration.getZookeeperAsyncTimeout());
super(curator, configuration);

this.healthcheckResultTranscoder = healthcheckResultTranscoder;
this.taskTranscoder = taskTranscoder;
Expand Down
Expand Up @@ -34,9 +34,9 @@ public class WebhookManager extends CuratorAsyncManager implements SingularityEv
private final Transcoder<SingularityDeployUpdate> deployWebhookTranscoder;

@Inject
public WebhookManager(SingularityConfiguration configuration, CuratorFramework curator, Transcoder<SingularityWebhook> webhookTranscoder,
public WebhookManager(CuratorFramework curator, SingularityConfiguration configuration, Transcoder<SingularityWebhook> webhookTranscoder,
Transcoder<SingularityRequestHistory> requestHistoryTranscoder, Transcoder<SingularityTaskHistoryUpdate> taskHistoryUpdateTranscoder, Transcoder<SingularityDeployUpdate> deployWebhookTranscoder) {
super(curator, configuration.getZookeeperAsyncTimeout());
super(curator, configuration);
this.webhookTranscoder = webhookTranscoder;
this.taskHistoryUpdateTranscoder = taskHistoryUpdateTranscoder;
this.requestHistoryTranscoder = requestHistoryTranscoder;
Expand Down

0 comments on commit 430b10f

Please sign in to comment.