/
ChannelBuffer.java
327 lines (296 loc) · 8.74 KB
/
ChannelBuffer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
/*
* Copyright (C) 2020 ActiveJ LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.activej.csp.queue;
import io.activej.common.Checks;
import io.activej.common.recycle.Recyclers;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.reactor.ImplicitlyReactive;
import org.jetbrains.annotations.Nullable;
import static io.activej.common.Checks.checkState;
import static io.activej.reactor.Reactive.checkInReactorThread;
import static java.lang.Integer.numberOfLeadingZeros;
import static java.lang.Math.max;
/**
* Represents a queue of elements which you can {@code put} and {@code take}.
* In order to mark if an object is pending put or take to/from the queue,
* there are corresponding {@code put} and {@code take} {@link SettablePromise}s.
*
* @param <T> the type of values that are stored in the buffer
*/
public final class ChannelBuffer<T> extends ImplicitlyReactive implements ChannelQueue<T> {
private static final boolean CHECKS = Checks.isEnabled(ChannelBuffer.class);
private Exception exception;
private Object[] elements;
private int tail;
private int head;
private final int bufferMinSize;
private final int bufferMaxSize;
private @Nullable SettablePromise<Void> put;
private @Nullable SettablePromise<T> take;
/**
* @see #ChannelBuffer(int, int)
*/
public ChannelBuffer(int bufferSize) {
this(0, bufferSize);
}
/**
* Creates a ChannelBuffer with the buffer size of the next highest
* power of 2 (for example, if {@code bufferMinxSize = 113}, a buffer
* of 128 elements will be created).
*
* @param bufferMinSize a minimal amount of elements in the buffer
* @param bufferMaxSize a max amount of elements in the buffer
*/
public ChannelBuffer(int bufferMinSize, int bufferMaxSize) {
this.bufferMinSize = bufferMinSize + 1;
this.bufferMaxSize = bufferMaxSize;
this.elements = new Object[1 << (32 - numberOfLeadingZeros(max(16, this.bufferMinSize) - 1))];
}
/**
* Checks if this buffer is saturated by comparing
* its current size with {@code bufferMaxSize}.
*
* @return {@code true} if this buffer size is greater
* than {@code bufferMaxSize}, otherwise returns {@code false}
*/
@Override
public boolean isSaturated() {
return size() > bufferMaxSize;
}
/**
* Checks if this buffer will be saturated if at
* least one more element will be added, by comparing
* its current size with {@code bufferMaxSize}.
*
* @return {@code true} if this buffer size is
* bigger or equal to the {@code bufferMaxSize},
* otherwise returns {@code false}
*/
public boolean willBeSaturated() {
return size() >= bufferMaxSize;
}
/**
* Checks if this buffer has fewer
* elements than {@code bufferMinSize}.
*
* @return {@code true} if this buffer size is
* smaller than {@code bufferMinSize},
* otherwise returns {@code false}
*/
@Override
public boolean isExhausted() {
return size() < bufferMinSize;
}
/**
* Checks if this buffer will have fewer elements
* than {@code bufferMinSize}, if at least one
* more element will be taken, by comparing its
* current size with {@code bufferMinSize}.
*
* @return {@code true} if this buffer size is
* smaller or equal to the {@code bufferMinSize},
* otherwise returns {@code false}
*/
public boolean willBeExhausted() {
return size() <= bufferMinSize;
}
/**
* Checks if this buffer contains elements.
*
* @return {@code true} if {@code tail}
* and {@code head} values are equal,
* otherwise {@code false}
*/
public boolean isEmpty() {
return tail == head;
}
/**
* Returns amount of elements in this buffer.
*/
public int size() {
return tail - head;
}
/**
* Adds provided item to the buffer and resets current {@code take}.
*/
public void add(@Nullable T item) throws Exception {
if (CHECKS) checkInReactorThread(this);
if (exception == null) {
if (take != null) {
assert isEmpty();
SettablePromise<T> take = this.take;
this.take = null;
take.set(item);
if (exception != null) throw exception;
return;
}
doAdd(item);
} else {
Recyclers.recycle(item);
throw exception;
}
}
private void doAdd(@Nullable T value) {
elements[(tail++) & (elements.length - 1)] = value;
}
/**
* Returns the head of the buffer if it is not empty,
* otherwise returns {@code null}. Increases the value of {@code head}.
* <p>
* If the buffer will have fewer elements than {@code bufferMinSize}
* after this poll and {@code put} promise is not {@code null},
* {@code put} will be set {@code null} after the poll.
* <p>
* If current {@code exception} is not {@code null},
* it will be thrown.
*
* @return head element of this buffer
* index if the buffer is not empty, otherwise {@code null}
* @throws Exception if current {@code exception}
* is not {@code null}
*/
public @Nullable T poll() throws Exception {
if (CHECKS) checkInReactorThread(this);
if (exception != null) throw exception;
if (put != null && willBeExhausted()) {
T item = doPoll();
SettablePromise<Void> put = this.put;
this.put = null;
put.set(null);
return item;
}
return !isEmpty() ? doPoll() : null;
}
private T doPoll() {
assert head != tail;
int pos = (head++) & (elements.length - 1);
@SuppressWarnings("unchecked")
T result = (T) elements[pos];
elements[pos] = null; // Must null out slot
return result;
}
/**
* Puts {@code value} in this buffer and increases {@code tail} value.
* <p>
* Current {@code put} must be {@code null}. If
* current {@code exception} is not {@code null},
* a promise of this exception will be returned and
* the {@code value} will be recycled.
* <p>
* If this {@code take} is not {@code null}, the value
* will be set directly to the {@code set}, without
* adding to the buffer.
*
* @param item a value passed to the buffer
* @return promise of {@code null} or {@code exception}
* as a marker of completion
*/
@Override
public Promise<Void> put(@Nullable T item) {
if (CHECKS) {
checkInReactorThread(this);
checkState(put == null, "Previous put() has not finished yet");
}
if (exception == null) {
if (take != null) {
assert isEmpty();
SettablePromise<T> take = this.take;
this.take = null;
take.set(item);
return Promise.complete();
}
doAdd(item);
if (isSaturated()) {
put = new SettablePromise<>();
return put;
} else {
return Promise.complete();
}
} else {
Recyclers.recycle(item);
return Promise.ofException(exception);
}
}
/**
* Returns a promise of the head of
* the {@code buffer} if it is not empty.
* <p>
* If this buffer will be exhausted after this
* take and {@code put} promise is not {@code null},
* {@code put} will be set {@code null} after the poll.
* <p>
* Current {@code take} must be {@code null}. If
* current {@code exception} is not {@code null},
* a promise of this exception will be returned.
*
* @return promise of element taken from the buffer
*/
@Override
public Promise<T> take() {
if (CHECKS) {
checkInReactorThread(this);
checkState(take == null, "Previous take() has not finished yet");
}
if (exception == null) {
if (put != null && willBeExhausted()) {
assert !isEmpty();
T item = doPoll();
SettablePromise<Void> put = this.put;
this.put = null;
put.set(null);
return Promise.of(item);
}
if (!isEmpty()) {
return Promise.of(doPoll());
}
take = new SettablePromise<>();
return take;
} else {
return Promise.ofException(exception);
}
}
/**
* Closes the buffer if this {@code exception} is not
* {@code null}. Recycles all elements of the buffer and
* sets {@code elements}, {@code put} and {@code take} to
* {@code null}.
*
* @param e exception that is used to close buffer with
*/
@Override
public void closeEx(Exception e) {
checkInReactorThread(this);
if (exception != null) return;
exception = e;
if (put != null) {
put.setException(e);
put = null;
}
if (take != null) {
take.setException(e);
take = null;
}
for (int i = head; i != tail; i = (i + 1) & (elements.length - 1)) {
Recyclers.recycle(elements[i]);
}
//noinspection AssignmentToNull - resource release
elements = null;
}
public @Nullable Exception getException() {
return exception;
}
}