-
Notifications
You must be signed in to change notification settings - Fork 18
/
VirtualThreadsManagedThreadFactory.java
119 lines (101 loc) · 4.16 KB
/
VirtualThreadsManagedThreadFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.enterprise.concurrent.virtualthreads;
import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.glassfish.enterprise.concurrent.ContextServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl;
import org.glassfish.enterprise.concurrent.internal.ManagedFutureTask;
import org.glassfish.enterprise.concurrent.internal.ThreadExpiredException;
import org.glassfish.enterprise.concurrent.spi.ContextHandle;
/**
*
* @author Ondro Mihalyi
*/
public class VirtualThreadsManagedThreadFactory extends ManagedThreadFactoryImpl {
// map from thread to the time of task start
// TODO replace ManagedThreadFactoryImpl.threads with startTimes.getKeySet?
final Map<Thread, Long> startTimes = new ConcurrentHashMap<>();
public VirtualThreadsManagedThreadFactory(String name) {
super(name);
}
public VirtualThreadsManagedThreadFactory(String name, ContextServiceImpl contextService) {
super(name, contextService);
}
@Override
protected Thread createThread(Runnable taskToRun, ContextHandle contextHandleForSetup) {
RunnableWithContext taskToRunWithContext = new RunnableWithContext(taskToRun, contextHandleForSetup);
return Thread.ofVirtual().unstarted(taskToRunWithContext);
}
@Override
protected void shutdown(Thread t) {
if (t != null) {
/* TODO - interrup not guaranteed to work on all JDKs if the thread hasn't started yet.
Look how to improve this later. */
t.interrupt();
}
}
@Override
public void taskStarting(Thread t, ManagedFutureTask task) {
if (t != null) {
startTimes.put(t, System.currentTimeMillis()); // TODO: use nanoTime instead?
}
}
@Override
public void taskDone(Thread t) {
if (t != null) {
startTimes.remove(t);
}
}
public boolean isTaskHung(Thread thread, long now) {
Long startTime = startTimes.get(thread);
if (startTime == null) {
return false;
}
return now - startTime > getHungTaskThreshold();
}
static private final System.Logger loggerForRunnableWithContext = System.getLogger(RunnableWithContext.class.getName());
private final class RunnableWithContext implements Runnable {
private final Runnable nestedRunnable;
private final ContextHandle contextHandleForSetup;
private final System.Logger logger = loggerForRunnableWithContext;
public RunnableWithContext(Runnable nestedRunnable, ContextHandle contextHandleForSetup) {
this.nestedRunnable = nestedRunnable;
this.contextHandleForSetup = contextHandleForSetup;
}
@Override
public void run() {
ContextHandle handle = null;
try {
if (contextHandleForSetup != null) {
handle = getContextSetupProvider().setup(contextHandleForSetup);
}
nestedRunnable.run();
} catch (ThreadExpiredException ex) {
logger.log(INFO, ex.toString());
} catch (Throwable t) {
logger.log(ERROR, getName(), t);
} finally {
if (handle != null) {
getContextSetupProvider().reset(handle);
}
removeThread(Thread.currentThread());
}
}
}
}