/
ThreadExpiringThreadPool.java
146 lines (129 loc) · 6.08 KB
/
ThreadExpiringThreadPool.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sling.commons.threads.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* An extension of ThreadPoolExecutor, which keeps track of the age
* of the worker threads and expires them when they get older than
* a specified max-age.
* <br/>
* To be precise, a thread is expired when it finishes processing
* a task and its max-age has been exceeded at that time. I.e. if a
* thread is idle past its expiry, it may still process a single
* task before it is expired.
*/
public class ThreadExpiringThreadPool extends ThreadPoolExecutor {
private static final Logger LOG = LoggerFactory.getLogger(ThreadExpiringThreadPool.class);
/**
* Map from thread-id to the time (in milliseconds) when a thread was first used to
* process a task. This is used to look determine when a thread is to be expired.
*/
private final ConcurrentHashMap<Long, Long> threadStartTimes;
/**
* Thread max-age in milliseconds.
*/
private final long maxThreadAge;
/**
* Convenience flag indicating whether threads expire or not.
* This is equivalent to {@code maxThreadAge >= 0}.
*/
private final boolean enableThreadExpiry;
/**
* Marker exception object thrown to terminate threads that have
* reached or exceeded their max-age. This exception is intentionally
* used for (minimal) control flow, i.e. the {@code ThreadPoolExecutor}
* will dispose of any thread that threw an exception and create a new
* one in its stead. This exception should never show up in any logs,
* otherwise it is a bug.
*/
private final RuntimeException expiredThreadException;
public ThreadExpiringThreadPool(
final int corePoolSize,
final int maximumPoolSize,
final long maxThreadAge,
final TimeUnit maxThreadAgeUnit,
final long keepAliveTime,
final TimeUnit keepAliveTimeUnit,
final BlockingQueue<Runnable> workQueue,
final ThreadFactory threadFactory,
final RejectedExecutionHandler handler
) {
super(corePoolSize, maximumPoolSize, keepAliveTime, keepAliveTimeUnit, workQueue, threadFactory, handler);
this.threadStartTimes = new ConcurrentHashMap<Long, Long>(maximumPoolSize);
this.maxThreadAge = TimeUnit.MILLISECONDS.convert(maxThreadAge, maxThreadAgeUnit);
this.enableThreadExpiry = maxThreadAge >= 0;
this.expiredThreadException = new RuntimeException("Kill old thread");
}
@Override
protected void beforeExecute(final Thread thread, final Runnable runnable) {
if (enableThreadExpiry) {
recordStartTime(thread);
}
super.beforeExecute(thread, runnable);
}
private void recordStartTime(final Thread thread) {
final long threadId = thread.getId();
if (threadStartTimes.putIfAbsent(threadId, System.currentTimeMillis()) == null) {
LOG.debug("{} used for the first time.", thread);
// The uncaught exception handler makes sure that the exception
// signalling the death of a thread is swallowed. All other
// Throwables are handed to the originalHandler.
final Thread.UncaughtExceptionHandler originalHandler = thread.getUncaughtExceptionHandler();
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread thread, final Throwable throwable) {
// first reset the original uncaught exception handler - just as a precaution
thread.setUncaughtExceptionHandler(originalHandler);
// ignore expected exception thrown to terminate the thread
if (throwable == expiredThreadException) {
return;
}
// delegate any other exceptions to the original uncaught exception handler
if (originalHandler != null) {
originalHandler.uncaughtException(thread, throwable);
}
}
});
}
}
@Override
protected void afterExecute(final Runnable runnable, final Throwable throwable) {
super.afterExecute(runnable, throwable);
if (throwable == null && enableThreadExpiry) {
checkMaxThreadAge(Thread.currentThread());
}
}
private void checkMaxThreadAge(final Thread thread) {
final long now = System.currentTimeMillis();
final long threadId = thread.getId();
final Long started = threadStartTimes.get(threadId);
if (started != null && now >= started + maxThreadAge) {
final long delta = now - (started + maxThreadAge);
LOG.debug("{} exceeded its max age by {}ms and will be replaced.", thread, delta);
threadStartTimes.remove(threadId);
// throw marker exception to kill this thread and thus trigger creation of a new one
throw expiredThreadException;
}
}
}