/
InpSignal.java
70 lines (61 loc) · 1.88 KB
/
InpSignal.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package org.df4j.core.port;
import org.df4j.core.connector.AsyncSemaphore;
import org.df4j.core.actor.AsyncProc;
import org.df4j.protocol.SignalFlow;
import org.reactivestreams.Subscription;
/**
* asynchronous receiver of permit flow from a {@link SignalFlow.Publisher}, e.g. {@link AsyncSemaphore}.
*/
public class InpSignal extends AsyncProc.Port implements SignalFlow.Subscriber {
private Subscription subscription;
/** the port is blocked if permits ≤ 0 */
protected long permits = 0;
/**
* @param parent {@link AsyncProc} to which this port belongs
*/
public InpSignal(AsyncProc parent) {
super(parent, false);
}
/**
* @param parent {@link AsyncProc} to which this port belongs
* @param permits initial number of permits; can be negative
*/
public InpSignal(AsyncProc parent, long permits) {
super(parent, permits > 0);
this.permits = permits;
}
@Override
public synchronized void onSubscribe(Subscription subscription) {
this.subscription = subscription;
}
@Override
public synchronized void acquire(long n) {
boolean wasUnBlocked = isReady();
permits -= n;
if (wasUnBlocked && permits <= 0) {
block();
}
}
@Override
public synchronized void release(long n) {
boolean wasBlocked = !isReady();
permits += n;
if (wasBlocked && permits > 0) {
unblock();
}
}
/**
* Reduces the number of permits.
* Analogue of {@link InpFlow#remove()} and {@link java.util.concurrent.Semaphore#acquire(int)}
*/
public synchronized void remove() {
if (permits <= 0) {
throw new IllegalStateException("no avalable permits");
}
boolean wasReady = isReady();
permits--;
if (wasReady && permits == 0) {
block();
}
}
}