Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script: Fix value of ctx._now to be current epoch time in milliseconds #23175

Merged
merged 3 commits into from
Feb 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
final UpdateHelper.Result translate;
// translate update request
try {
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
translate = updateHelper.prepare(updateRequest, primary, threadPool::absoluteTimeInMillis);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
final ShardId shardId = request.getShardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.getId());
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::estimatedTimeInMillis);
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis);
switch (result.getResponseResult()) {
case CREATED:
IndexRequest upsertRequest = result.action();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public Codec getCodec() {

/**
* Returns a thread-pool mainly used to get estimated time stamps from
* {@link org.elasticsearch.threadpool.ThreadPool#estimatedTimeInMillis()} and to schedule
* {@link org.elasticsearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule
* async force merge calls on the {@link org.elasticsearch.threadpool.ThreadPool.Names#FORCE_MERGE} thread-pool
*/
public ThreadPool getThreadPool() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
EngineMergeScheduler scheduler = null;
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
Expand Down Expand Up @@ -446,7 +446,7 @@ private boolean checkVersionConflict(final Operation op, final long currentVersi

private long checkDeletedAndGCed(VersionValue versionValue) {
long currentVersion;
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
Expand Down Expand Up @@ -726,7 +726,7 @@ public DeleteResult delete(Delete delete) throws IOException {
private void maybePruneDeletedTombstones() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
pruneDeletedTombstones();
}
}
Expand Down Expand Up @@ -772,7 +772,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
deleteResult = new DeleteResult(updatedVersion, seqNo, found);

versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().relativeTimeInMillis()));
}
if (!deleteResult.hasFailure()) {
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
Expand Down Expand Up @@ -1047,7 +1047,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
}

private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ private void contextProcessing(SearchContext context) {
}

private void contextProcessedSuccessfully(SearchContext context) {
context.accessed(threadPool.estimatedTimeInMillis());
context.accessed(threadPool.relativeTimeInMillis());
}

private void cleanContext(SearchContext context) {
Expand Down Expand Up @@ -794,7 +794,7 @@ public int getActiveContexts() {
class Reaper implements Runnable {
@Override
public void run() {
final long time = threadPool.estimatedTimeInMillis();
final long time = threadPool.relativeTimeInMillis();
for (SearchContext context : activeContexts.values()) {
// Use the same value for both checks since lastAccessTime can
// be modified by another thread between checks!
Expand Down
76 changes: 57 additions & 19 deletions core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public static ThreadPoolType fromType(String type) {

private final ScheduledThreadPoolExecutor scheduler;

private final EstimatedTimeThread estimatedTimeThread;
private final CachedTimeThread cachedTimeThread;

static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();

Expand Down Expand Up @@ -213,16 +213,33 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
this.scheduler.setRemoveOnCancelPolicy(true);

TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.estimatedTimeThread.start();
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();
}

public long estimatedTimeInMillis() {
return estimatedTimeThread.estimatedTimeInMillis();
/**
* Returns a value of milliseconds that may be used for relative time calculations.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
public long relativeTimeInMillis() {
return cachedTimeThread.relativeTimeInMillis();
}

/**
* Returns the value of milliseconds since UNIX epoch.
*
* This method should only be used for exact date/time formatting. For calculating
* time deltas that should not suffer from negative deltas, which are possible with
* this method, see {@link #relativeTimeInMillis()}.
*/
public long absoluteTimeInMillis() {
return cachedTimeThread.absoluteTimeInMillis();
}

public Counter estimatedTimeInMillisCounter() {
return estimatedTimeThread.counter;
return cachedTimeThread.counter;
}

public ThreadPoolInfo info() {
Expand Down Expand Up @@ -342,8 +359,8 @@ public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable co
}

public void shutdown() {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
cachedTimeThread.running = false;
cachedTimeThread.interrupt();
scheduler.shutdown();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
Expand All @@ -353,8 +370,8 @@ public void shutdown() {
}

public void shutdownNow() {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
cachedTimeThread.running = false;
cachedTimeThread.interrupt();
scheduler.shutdownNow();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
Expand All @@ -371,7 +388,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
}
}

estimatedTimeThread.join(unit.toMillis(timeout));
cachedTimeThread.join(unit.toMillis(timeout));
return result;
}

Expand Down Expand Up @@ -471,29 +488,50 @@ public String toString() {
}
}

static class EstimatedTimeThread extends Thread {
/**
* A thread to cache millisecond time values from
* {@link System#nanoTime()} and {@link System#currentTimeMillis()}.
*
* The values are updated at a specified interval.
*/
static class CachedTimeThread extends Thread {

final long interval;
final TimeCounter counter;
volatile boolean running = true;
volatile long estimatedTimeInMillis;
volatile long relativeMillis;
volatile long absoluteMillis;

EstimatedTimeThread(String name, long interval) {
CachedTimeThread(String name, long interval) {
super(name);
this.interval = interval;
this.estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime());
this.relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
this.absoluteMillis = System.currentTimeMillis();
this.counter = new TimeCounter();
setDaemon(true);
}

public long estimatedTimeInMillis() {
return this.estimatedTimeInMillis;
/**
* Return the current time used for relative calculations. This is
* {@link System#nanoTime()} truncated to milliseconds.
*/
long relativeTimeInMillis() {
return relativeMillis;
}

/**
* Return the current epoch time, used to find absolute time. This is
* a cached version of {@link System#currentTimeMillis()}.
*/
long absoluteTimeInMillis() {
return absoluteMillis;
}

@Override
public void run() {
while (running) {
estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime());
relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
absoluteMillis = System.currentTimeMillis();
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
Expand All @@ -512,7 +550,7 @@ public long addAndGet(long delta) {

@Override
public long get() {
return estimatedTimeInMillis;
return relativeMillis;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,17 @@ public void testBoundedByBetweenMinAndMax() {
assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value));
}

public void testAbsoluteTime() throws Exception {
TestThreadPool threadPool = new TestThreadPool("test");
try {
long currentTime = System.currentTimeMillis();
long gotTime = threadPool.absoluteTimeInMillis();
long delta = Math.abs(gotTime - currentTime);
assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime,
delta < 10000); // the delta can be large, we just care it is the same order of magnitude
} finally {
threadPool.shutdown();
threadPool.close();
}
}
}