Skip to content

Commit

Permalink
[Fix] Replace cached time with system clock in MasterService debug lo…
Browse files Browse the repository at this point in the history
  • Loading branch information
andrross committed Jul 6, 2023
1 parent 68a4d36 commit 58f4ce6
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- With only GlobalAggregation in request causes unnecessary wrapping with MultiCollector ([#8125](https://github.com/opensearch-project/OpenSearch/pull/8125))
- Fix mapping char_filter when mapping a hashtag ([#7591](https://github.com/opensearch-project/OpenSearch/pull/7591))
- Fix NPE in multiterms aggregations involving empty buckets ([#7318](https://github.com/opensearch-project/OpenSearch/pull/7318))
- Precise system clock time in MasterService debug logs ([#7902](https://github.com/opensearch-project/OpenSearch/pull/7902))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,14 @@ private void runTasks(TaskInputs taskInputs) {
return;
}

final long computationStartTime = threadPool.relativeTimeInMillis();
final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);

if (taskOutputs.clusterStateUnchanged()) {
final long notificationStartTime = threadPool.relativeTimeInMillis();
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
Expand All @@ -309,7 +309,7 @@ private void runTasks(TaskInputs taskInputs) {
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
}
final long publicationStartTime = threadPool.relativeTimeInMillis();
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
Expand All @@ -335,8 +335,8 @@ private void runTasks(TaskInputs taskInputs) {
}
}

private TimeValue getTimeSince(long startTimeMillis) {
return TimeValue.timeValueMillis(Math.max(0, threadPool.relativeTimeInMillis() - startTimeMillis));
private TimeValue getTimeSince(long startTimeNanos) {
return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.preciseRelativeTimeInNanos() - startTimeNanos));
}

protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
Expand All @@ -358,7 +358,7 @@ protected boolean blockingAllowed() {
}

void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs) {
final long notificationStartTime = threadPool.relativeTimeInMillis();
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state());

try {
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,19 @@ public long relativeTimeInNanos() {
return cachedTimeThread.relativeTimeInNanos();
}

/**
* Returns a value of nanoseconds that may be used for relative time calculations
* that require the highest precision possible. Performance critical code must use
* either {@link #relativeTimeInNanos()} or {@link #relativeTimeInMillis()} which
* give better performance at the cost of lower precision.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
public long preciseRelativeTimeInNanos() {
return System.nanoTime();
}

/**
* Returns the value of milliseconds since UNIX epoch.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@
public class MasterServiceTests extends OpenSearchTestCase {

private static ThreadPool threadPool;
private static long relativeTimeInMillis;
private static long timeDiffInMillis;

@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool(MasterServiceTests.class.getName()) {
@Override
public long relativeTimeInMillis() {
return relativeTimeInMillis;
public long preciseRelativeTimeInNanos() {
return timeDiffInMillis * TimeValue.NSEC_PER_MSEC;
}
};
}
Expand All @@ -119,7 +119,7 @@ public static void stopThreadPool() {

@Before
public void randomizeCurrentTime() {
relativeTimeInMillis = randomLongBetween(0L, 1L << 62);
timeDiffInMillis = randomLongBetween(0L, 1L << 50);
}

private ClusterManagerService createClusterManagerService(boolean makeClusterManager) {
Expand Down Expand Up @@ -426,7 +426,7 @@ public void testClusterStateUpdateLogging() throws Exception {
clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(1).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(1).millis();
return currentState;
}

Expand All @@ -441,7 +441,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(2).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(2).millis();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}

Expand All @@ -456,13 +456,13 @@ public void onFailure(String source, Exception e) {}
clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(3).millis();
return ClusterState.builder(currentState).incrementVersion().build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(4).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(4).millis();
}

@Override
Expand Down Expand Up @@ -1080,12 +1080,12 @@ public void testLongClusterStateUpdateLogging() throws Exception {
final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> {
if (event.source().contains("test5")) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
}
if (event.source().contains("test6")) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
throw new OpenSearchException("simulated error during slow publication which should trigger logging");
Expand All @@ -1101,7 +1101,7 @@ public void testLongClusterStateUpdateLogging() throws Exception {
clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += randomLongBetween(
timeDiffInMillis += randomLongBetween(
0L,
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
);
Expand All @@ -1124,7 +1124,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
Expand All @@ -1143,7 +1143,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
return ClusterState.builder(currentState).incrementVersion().build();
Expand All @@ -1162,7 +1162,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
return currentState;
Expand Down

0 comments on commit 58f4ce6

Please sign in to comment.