Skip to content

Commit

Permalink
Add method safeRestart
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz committed May 23, 2024
1 parent 24aa79c commit 040e35b
Showing 1 changed file with 13 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
Expand Down Expand Up @@ -96,7 +95,6 @@ public class ChangeRequestHttpSyncer<T>
private final String logIdentity;
private int consecutiveFailedAttemptCount = 0;

// All stopwatches are guarded by the startStopLock
private final Stopwatch sinceSyncerStart = Stopwatch.createUnstarted();
private final Stopwatch sinceLastSyncRequest = Stopwatch.createUnstarted();
private final Stopwatch sinceLastSyncSuccess = Stopwatch.createUnstarted();
Expand Down Expand Up @@ -147,7 +145,7 @@ public void start()
startStopLock.exitStart();
}

sinceSyncerStart.restart();
safeRestart(sinceSyncerStart);
addNextSyncToWorkQueue();
}
}
Expand Down Expand Up @@ -237,11 +235,7 @@ private void sendSyncRequest()
return;
}

// Synchronized to tackle the remote possibility of two syncs trying to reset
// the stopwatch together
synchronized (startStopLock) {
sinceLastSyncRequest.restart();
}
safeRestart(sinceLastSyncRequest);

try {
final String req = getRequestString();
Expand Down Expand Up @@ -277,7 +271,7 @@ public void onSuccess(InputStream stream)
final int responseCode = responseHandler.getStatus();
if (responseCode == HttpServletResponse.SC_NO_CONTENT) {
log.debug("Received NO CONTENT from server[%s]", logIdentity);
sinceLastSyncSuccess.restart();
safeRestart(sinceLastSyncSuccess);
return;
} else if (responseCode != HttpServletResponse.SC_OK) {
handleFailure(new ISE("Received sync response [%d]", responseCode));
Expand Down Expand Up @@ -313,7 +307,7 @@ public void onSuccess(InputStream stream)
log.info("Server[%s] synced successfully.", logIdentity);
}

sinceLastSyncSuccess.restart();
safeRestart(sinceLastSyncSuccess);
}
catch (Exception ex) {
markServerUnstableAndAlert(ex, "Processing Response");
Expand Down Expand Up @@ -342,7 +336,6 @@ public void onFailure(Throwable t)
}
}

@GuardedBy("startStopLock")
private void handleFailure(Throwable t)
{
String logMsg = StringUtils.format(
Expand All @@ -358,9 +351,7 @@ private void handleFailure(Throwable t)
}
catch (Throwable th) {
try {
synchronized (startStopLock) {
markServerUnstableAndAlert(th, "Sending Request");
}
markServerUnstableAndAlert(th, "Sending Request");
}
finally {
addNextSyncToWorkQueue();
Expand Down Expand Up @@ -420,11 +411,17 @@ private void addNextSyncToWorkQueue()
}
}

@GuardedBy("startStopLock")
private void safeRestart(Stopwatch stopwatch)
{
synchronized (startStopLock) {
stopwatch.restart();
}
}

private void markServerUnstableAndAlert(Throwable throwable, String action)
{
if (consecutiveFailedAttemptCount++ == 0) {
sinceUnstable.restart();
safeRestart(sinceUnstable);
}

final long unstableSeconds = getUnstableTimeMillis() / 1000;
Expand Down

0 comments on commit 040e35b

Please sign in to comment.