Skip to content

Commit

Permalink
Use System.nanoTime when measuring elapsed time
Browse files Browse the repository at this point in the history
System.currentTimeMillis is more vulnerable when the clock shifts.

Closes #11058
  • Loading branch information
mikemccand committed May 11, 2015
1 parent cc3f02c commit 08f7caa
Show file tree
Hide file tree
Showing 20 changed files with 100 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Set<OnGoingMerge> onGoingMerges() {
protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
int totalNumDocs = merge.totalNumDocs();
long totalSizeInBytes = merge.totalBytesSize();
long time = System.currentTimeMillis();
long timeNS = System.nanoTime();
currentMerges.inc();
currentMergesNumDocs.inc(totalNumDocs);
currentMergesSizeInBytes.inc(totalSizeInBytes);
Expand All @@ -116,7 +116,7 @@ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IO
beforeMerge(onGoingMerge);
super.doMerge(writer, merge);
} finally {
long took = System.currentTimeMillis() - time;
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - timeNS);

onGoingMerges.remove(onGoingMerge);
afterMerge(onGoingMerge);
Expand All @@ -127,26 +127,26 @@ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IO

totalMergesNumDocs.inc(totalNumDocs);
totalMergesSizeInBytes.inc(totalSizeInBytes);
totalMerges.inc(took);
totalMerges.inc(tookMS);

long stoppedMS = merge.rateLimiter.getTotalStoppedNS()/1000000;
long throttledMS = merge.rateLimiter.getTotalPausedNS()/1000000;
long stoppedMS = TimeValue.nsecToMSec(merge.rateLimiter.getTotalStoppedNS());
long throttledMS = TimeValue.nsecToMSec(merge.rateLimiter.getTotalPausedNS());

totalMergeStoppedTime.inc(stoppedMS);
totalMergeThrottledTime.inc(throttledMS);

String message = String.format(Locale.ROOT,
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs], [%s stopped], [%s throttled], [%,.1f MB written], [%,.1f MB/sec throttle]",
merge.info == null ? "_na_" : merge.info.info.name,
TimeValue.timeValueMillis(took),
TimeValue.timeValueMillis(tookMS),
totalSizeInBytes/1024f/1024f,
totalNumDocs,
TimeValue.timeValueMillis(stoppedMS),
TimeValue.timeValueMillis(throttledMS),
merge.rateLimiter.getTotalBytesWritten()/1024f/1024f,
merge.rateLimiter.getMBPerSec());

if (took > 20000) { // if more than 20 seconds, DEBUG log it
if (tookMS > 20000) { // if more than 20 seconds, DEBUG log it
logger.debug(message);
} else if (logger.isTraceEnabled()) {
logger.trace(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected ClusterHealthResponse newResponse() {
@Override
protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) {
if (request.waitForEvents() != null) {
final long endTime = System.currentTimeMillis() + request.timeout().millis();
final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis();
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -76,7 +76,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
final long timeoutInMillis = Math.max(0, endTime - System.currentTimeMillis());
final long timeoutInMillis = Math.max(0, endTimeMS - TimeValue.nsecToMSec(System.nanoTime()));
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
request.timeout(newTimeout);
executeHealth(request, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public UpdateHelper(Settings settings, ScriptService scriptService) {
*/
@SuppressWarnings("unchecked")
public Result prepare(UpdateRequest request, IndexShard indexShard) {
long getDate = System.currentTimeMillis();
long getDateNS = System.nanoTime();
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME, TimestampFieldMapper.NAME},
true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE, false);
Expand Down Expand Up @@ -222,7 +222,7 @@ public Result prepare(UpdateRequest request, IndexShard indexShard) {
if (ttl == null) {
ttl = getResult.getFields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).getValue() : null;
if (ttl != null) {
ttl = ttl - (System.currentTimeMillis() - getDate); // It is an approximation of exact TTL value, could be improved
ttl = ttl - TimeValue.nsecToMSec(System.nanoTime() - getDateNS); // It is an approximation of exact TTL value, could be improved
}
}

Expand Down
26 changes: 13 additions & 13 deletions src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public boolean apply(ClusterChangedEvent changedEvent) {
final TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener();
// observingContext is not null when waiting on cluster state changes
final AtomicReference<ObservingContext> observingContext = new AtomicReference<ObservingContext>(null);
volatile Long startTime;
volatile Long startTimeNS;
volatile boolean timedOut;


Expand All @@ -69,7 +69,7 @@ public ClusterStateObserver(ClusterService clusterService, TimeValue timeout, ES
this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state()));
this.timeOutValue = timeout;
if (timeOutValue != null) {
this.startTime = System.currentTimeMillis();
this.startTimeNS = System.nanoTime();
}
this.logger = logger;
}
Expand Down Expand Up @@ -111,28 +111,28 @@ public void waitForNextChange(Listener listener, ChangePredicate changePredicate
throw new ElasticsearchException("already waiting for a cluster state change");
}

Long timeoutTimeLeft;
Long timeoutTimeLeftMS;
if (timeOutValue == null) {
timeOutValue = this.timeOutValue;
if (timeOutValue != null) {
long timeSinceStart = System.currentTimeMillis() - startTime;
timeoutTimeLeft = timeOutValue.millis() - timeSinceStart;
if (timeoutTimeLeft <= 0l) {
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
timeoutTimeLeftMS = timeOutValue.millis() - timeSinceStartMS;
if (timeoutTimeLeftMS <= 0l) {
// things have timeout while we were busy -> notify
logger.debug("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart));
logger.debug("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStartMS));
// update to latest, in case people want to retry
timedOut = true;
lastObservedState.set(new ObservedState(clusterService.state()));
listener.onTimeout(timeOutValue);
return;
}
} else {
timeoutTimeLeft = null;
timeoutTimeLeftMS = null;
}
} else {
this.startTime = System.currentTimeMillis();
this.startTimeNS = System.nanoTime();
this.timeOutValue = timeOutValue;
timeoutTimeLeft = timeOutValue.millis();
timeoutTimeLeftMS = timeOutValue.millis();
timedOut = false;
}

Expand All @@ -150,7 +150,7 @@ public void waitForNextChange(Listener listener, ChangePredicate changePredicate
if (!observingContext.compareAndSet(null, context)) {
throw new ElasticsearchException("already waiting for a cluster state change");
}
clusterService.add(timeoutTimeLeft == null ? null : new TimeValue(timeoutTimeLeft), clusterStateListener);
clusterService.add(timeoutTimeLeftMS == null ? null : new TimeValue(timeoutTimeLeftMS), clusterStateListener);
}
}

Expand Down Expand Up @@ -230,8 +230,8 @@ public void onTimeout(TimeValue timeout) {
ObservingContext context = observingContext.getAndSet(null);
if (context != null) {
clusterService.remove(this);
long timeSinceStart = System.currentTimeMillis() - startTime;
logger.debug("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart));
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
logger.debug("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStartMS));
// update to latest, in case people want to retry
lastObservedState.set(new ObservedState(clusterService.state()));
timedOut = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void onRefreshSettings(Settings settings) {
*/
class DiskListener implements ClusterInfoService.Listener {
private final Client client;
private long lastRun;
private long lastRunNS;

DiskListener(Client client) {
this.client = client;
Expand Down Expand Up @@ -168,8 +168,8 @@ public void onNewInfo(ClusterInfo info) {
warnAboutDiskIfNeeded(entry);
if (entry.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes() ||
entry.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) {
if ((System.currentTimeMillis() - lastRun) > DiskThresholdDecider.this.rerouteInterval.millis()) {
lastRun = System.currentTimeMillis();
if ((System.nanoTime() - lastRunNS) > DiskThresholdDecider.this.rerouteInterval.nanos()) {
lastRunNS = System.nanoTime();
reroute = true;
} else {
logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,19 +334,19 @@ public int numberOfPendingTasks() {


static abstract class TimedPrioritizedRunnable extends PrioritizedRunnable {
private final long creationTime;
private final long creationTimeNS;
protected final String source;

protected TimedPrioritizedRunnable(Priority priority, String source) {
super(priority);
this.source = source;
this.creationTime = System.currentTimeMillis();
this.creationTimeNS = System.nanoTime();
}

public long timeSinceCreatedInMillis() {
// max with 0 to make sure we always return a non negative number
// even if time shifts.
return Math.max(0, System.currentTimeMillis() - creationTime);
return Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - creationTimeNS));
}

public String source() {
Expand Down Expand Up @@ -378,11 +378,11 @@ public void run() {
return;
}
ClusterState newClusterState;
long startTime = System.currentTimeMillis();
long startTimeNS = System.nanoTime();
try {
newClusterState = updateTask.execute(previousClusterState);
} catch (Throwable e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(previousClusterState.nodes().prettyPrint());
Expand All @@ -403,7 +403,7 @@ public void run() {
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
return;
Expand Down Expand Up @@ -523,11 +523,11 @@ public void run() {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}

TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.uuid());
warnAboutSlowTaskIfNeeded(executionTime, source);
} catch (Throwable t) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.uuid()).append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
sb.append(newClusterState.routingTable().prettyPrint());
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/elasticsearch/common/StopWatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Simple stop watch, allowing for timing of a number of tasks,
* exposing total running time and running time for each named task.
* <p/>
* <p>Conceals use of <code>System.currentTimeMillis()</code>, improving the
* <p>Conceals use of <code>System.nanoTime()</code>, improving the
* readability of application code and reducing the likelihood of calculation errors.
* <p/>
* <p>Note that this object is not designed to be thread-safe and does not
Expand All @@ -58,7 +58,7 @@ public class StopWatch {
/**
* Start time of the current task
*/
private long startTimeMillis;
private long startTimeNS;

/**
* Is the stop watch currently running?
Expand All @@ -77,7 +77,7 @@ public class StopWatch {
/**
* Total running time
*/
private long totalTimeMillis;
private long totalTimeNS;

/**
* Construct a new stop watch. Does not start any task.
Expand Down Expand Up @@ -129,7 +129,7 @@ public StopWatch start(String taskName) throws IllegalStateException {
if (this.running) {
throw new IllegalStateException("Can't start StopWatch: it's already running");
}
this.startTimeMillis = System.currentTimeMillis();
this.startTimeNS = System.nanoTime();
this.running = true;
this.currentTaskName = taskName;
return this;
Expand All @@ -146,9 +146,9 @@ public StopWatch stop() throws IllegalStateException {
if (!this.running) {
throw new IllegalStateException("Can't stop StopWatch: it's not running");
}
long lastTime = System.currentTimeMillis() - this.startTimeMillis;
this.totalTimeMillis += lastTime;
this.lastTaskInfo = new TaskInfo(this.currentTaskName, lastTime);
long lastTimeNS = System.nanoTime() - this.startTimeNS;
this.totalTimeNS += lastTimeNS;
this.lastTaskInfo = new TaskInfo(this.currentTaskName, TimeValue.nsecToMSec(lastTimeNS));
if (this.keepTaskList) {
this.taskList.add(lastTaskInfo);
}
Expand Down Expand Up @@ -189,7 +189,7 @@ public String lastTaskName() throws IllegalStateException {
* Return the total time for all tasks.
*/
public TimeValue totalTime() {
return new TimeValue(totalTimeMillis, TimeUnit.MILLISECONDS);
return new TimeValue(totalTimeNS, TimeUnit.NANOSECONDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.elasticsearch.common.inject.internal;

import org.elasticsearch.common.unit.TimeValue;

import java.util.logging.Logger;

/**
Expand All @@ -26,17 +28,17 @@
public class Stopwatch {
private static final Logger logger = Logger.getLogger(Stopwatch.class.getName());

private long start = System.currentTimeMillis();
private long startNS = System.nanoTime();

/**
* Resets and returns elapsed time in milliseconds.
*/
public long reset() {
long now = System.currentTimeMillis();
long nowNS = System.nanoTime();
try {
return now - start;
return TimeValue.nsecToMSec(nowNS - startNS);
} finally {
start = now;
startNS = nowNS;
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/elasticsearch/common/unit/TimeValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
*/
public class TimeValue implements Serializable, Streamable {

/** How many nano-seconds in one milli-second */
public static final long NSEC_PER_MSEC = 1000000;

public static TimeValue timeValueNanos(long nanos) {
return new TimeValue(nanos, TimeUnit.NANOSECONDS);
}
Expand Down Expand Up @@ -296,4 +299,8 @@ public int hashCode() {
long normalized = timeUnit.toNanos(duration);
return (int) (normalized ^ (normalized >>> 32));
}

public static long nsecToMSec(long ns) {
return ns / NSEC_PER_MSEC;
}
}
7 changes: 4 additions & 3 deletions src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -384,11 +385,11 @@ public List<ShardLock> lockAllForIndex(Index index, @IndexSettings Settings sett
logger.trace("locking all shards for index {} - [{}]", index, numShards);
List<ShardLock> allLocks = new ArrayList<>(numShards);
boolean success = false;
long startTime = System.currentTimeMillis();
long startTimeNS = System.nanoTime();
try {
for (int i = 0; i < numShards; i++) {
long timeoutLeft = Math.max(0, lockTimeoutMS - (System.currentTimeMillis() - startTime));
allLocks.add(shardLock(new ShardId(index, i), timeoutLeft));
long timeoutLeftMS = Math.max(0, lockTimeoutMS - TimeValue.nsecToMSec((System.nanoTime() - startTimeNS)));
allLocks.add(shardLock(new ShardId(index, i), timeoutLeftMS));
}
success = true;
} finally {
Expand Down
Loading

0 comments on commit 08f7caa

Please sign in to comment.