Skip to content

Commit

Permalink
Merge pull request #1207 from mattrjacobs/fix-concurrent-execution-count
Browse files Browse the repository at this point in the history
Different fix for concurrency count command metric
  • Loading branch information
mattrjacobs committed May 16, 2016
2 parents 9f64774 + 7b484eb commit b63fdf6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Expand Up @@ -94,6 +94,7 @@ protected static enum TimedOutStatus {
protected final AtomicReference<Reference<TimerListener>> timeoutTimer = new AtomicReference<Reference<TimerListener>>();

protected final AtomicBoolean commandStarted = new AtomicBoolean();
protected volatile boolean executionStarted = false;
protected volatile boolean isExecutionComplete = false;

/*
Expand Down Expand Up @@ -445,8 +446,6 @@ public Observable<R> call() {
.doOnUnsubscribe(unsubscribeCommandCleanup); // perform cleanup once
}



private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
Expand Down Expand Up @@ -584,6 +583,7 @@ private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
executionStarted = true;
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
Expand Down Expand Up @@ -622,6 +622,7 @@ public Boolean call() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
executionStarted = true;
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
Expand Down Expand Up @@ -843,7 +844,7 @@ private void cleanUpAfterResponseFromCache() {
.setNotExecutedInThread();
ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE)
.markUserThreadCompletion(latency);
metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey);
metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, executionStarted);
eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);
}

Expand All @@ -857,9 +858,9 @@ private void handleCommandEnd() {
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
ExecutionResult cancelled = executionResultAtTimeOfCancellation;
if (cancelled == null) {
metrics.markCommandDone(executionResult, commandKey, threadPoolKey);
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, executionStarted);
} else {
metrics.markCommandDone(cancelled, commandKey, threadPoolKey);
metrics.markCommandDone(cancelled, commandKey, threadPoolKey, executionStarted);
}
}

Expand Down
Expand Up @@ -343,9 +343,9 @@ public int getCurrentConcurrentExecutionCount() {
HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount);
}

/* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
/* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
if (executionResult.executionOccurred()) {
if (executionStarted) {
concurrentExecutionCount.decrementAndGet();
}
}
Expand Down

0 comments on commit b63fdf6

Please sign in to comment.