-
-
Notifications
You must be signed in to change notification settings - Fork 22
/
ParallelPipe.java
112 lines (90 loc) · 3.42 KB
/
ParallelPipe.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package nu.marginalia.util;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/** Generalization of the workflow <br>
* -- single provider thread reading sequentially from disk <br>
* -> multiple independent CPU-bound processing tasks <br>
* -> single consumer thread writing to network/disk <br>
* <p>
*/
public abstract class ParallelPipe<INPUT,INTERMEDIATE> {
private final LinkedBlockingQueue<INPUT> inputs;
private final LinkedBlockingQueue<INTERMEDIATE> intermediates;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final List<Thread> processThreads = new ArrayList<>();
private final Thread receiverThread;
private volatile boolean expectingInput = true;
private volatile boolean expectingOutput = true;
public ParallelPipe(String name, int numberOfThreads, int inputQueueSize, int intermediateQueueSize) {
inputs = new LinkedBlockingQueue<>(inputQueueSize);
intermediates = new LinkedBlockingQueue<>(intermediateQueueSize);
for (int i = 0; i < numberOfThreads; i++) {
processThreads.add(new Thread(this::runProcessThread, name + "-process["+i+"]"));
}
receiverThread = new Thread(this::runReceiverThread, name + "-receiver");
processThreads.forEach(Thread::start);
receiverThread.start();
}
public void clearQueues() {
inputs.clear();
intermediates.clear();
}
@SneakyThrows
private void runProcessThread() {
while (expectingInput || !inputs.isEmpty()) {
var in = inputs.poll(10, TimeUnit.SECONDS);
if (in != null) {
try {
var ret = onProcess(in);
if (ret != null) {
intermediates.put(ret);
}
}
catch (InterruptedException ex) {
throw ex;
}
catch (Exception ex) {
logger.error("Exception", ex);
}
}
}
logger.info("Terminating {}", Thread.currentThread().getName());
}
@SneakyThrows
private void runReceiverThread() {
while (expectingOutput || !inputs.isEmpty() || !intermediates.isEmpty()) {
var intermediate = intermediates.poll(997, TimeUnit.MILLISECONDS);
if (intermediate != null) {
try {
onReceive(intermediate);
}
catch (Exception ex) {
logger.error("Exception", ex);
}
}
}
logger.info("Terminating {}", Thread.currentThread().getName());
}
/** Begin processing an item */
@SneakyThrows
public void accept(INPUT input) {
inputs.put(input);
}
/** The meat of the processor thread runtime */
protected abstract INTERMEDIATE onProcess(INPUT input) throws Exception;
/** The meat of the consumer thread runtime */
protected abstract void onReceive(INTERMEDIATE intermediate) throws Exception;
public void join() throws InterruptedException {
expectingInput = false;
for (var thread : processThreads) {
thread.join();
}
expectingOutput = false;
receiverThread.join();
}
}