/
InboundEvent.java
424 lines (382 loc) · 14.4 KB
/
InboundEvent.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
/*
* Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.media.sse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Arrays;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.sse.InboundSseEvent;
import org.glassfish.jersey.message.MessageBodyWorkers;
import org.glassfish.jersey.message.internal.MessageBodyProviderNotFoundException;
/**
* Inbound event.
*
* @author Pavel Bucek
* @author Marek Potociar
*/
public class InboundEvent implements InboundSseEvent {
private static final GenericType<String> STRING_AS_GENERIC_TYPE = new GenericType<>(String.class);
private final String name;
private final String id;
private final String comment;
private final byte[] data;
private final long reconnectDelay;
private final MessageBodyWorkers messageBodyWorkers;
private final Annotation[] annotations;
private final MediaType mediaType;
private final MultivaluedMap<String, String> headers;
/**
* Inbound event builder. This implementation is not thread-safe.
*/
static class Builder {
private String name;
private String id;
private long reconnectDelay = SseFeature.RECONNECT_NOT_SET;
private final ByteArrayOutputStream dataStream;
private final MessageBodyWorkers workers;
private final Annotation[] annotations;
private final MediaType mediaType;
private final MultivaluedMap<String, String> headers;
private final StringBuilder commentBuilder;
/**
* Create new inbound event builder.
*
* @param workers configured client-side {@link MessageBodyWorkers entity providers} used for
* {@link javax.ws.rs.ext.MessageBodyReader} lookup.
* @param annotations annotations attached to the Java type to be read. Used for
* {@link javax.ws.rs.ext.MessageBodyReader} lookup.
* @param mediaType media type of the SSE event data.
* Used for {@link javax.ws.rs.ext.MessageBodyReader} lookup.
* @param headers response headers. Used for {@link javax.ws.rs.ext.MessageBodyWriter} lookup.
*/
public Builder(MessageBodyWorkers workers,
Annotation[] annotations,
MediaType mediaType,
MultivaluedMap<String, String> headers) {
this.workers = workers;
this.annotations = annotations;
this.mediaType = mediaType;
this.headers = headers;
this.commentBuilder = new StringBuilder();
this.dataStream = new ByteArrayOutputStream();
}
/**
* Set inbound event name.
* <p/>
* Value of the received SSE {@code "event"} field.
*
* @param name {@code "event"} field value.
* @return updated builder instance.
*/
public Builder name(String name) {
this.name = name;
return this;
}
/**
* Set inbound event identifier.
* <p/>
* Value of the received SSE {@code "id"} field.
*
* @param id {@code "id"} field value.
* @return updated builder instance.
*/
public Builder id(String id) {
this.id = id;
return this;
}
/**
* Add a comment line to the event.
* <p>
* The comment line will be added to the received SSE event comment as a new line in the comment field.
* If the comment line parameter is {@code null}, the call will be ignored.
* </p>
*
* @param commentLine comment line to be added to the event comment.
* @return updated builder instance.
* @since 2.21
*/
public Builder commentLine(final CharSequence commentLine) {
if (commentLine != null) {
commentBuilder.append(commentLine).append('\n');
}
return this;
}
/**
* Set reconnection delay (in milliseconds) that indicates how long the event receiver should wait
* before attempting to reconnect in case a connection to SSE event source is lost.
* <p>
* Value of the received SSE {@code "retry"} field.
* </p>
*
* @param milliseconds reconnection delay in milliseconds. Negative values un-set the reconnection delay.
* @return updated builder instance.
* @since 2.3
*/
public Builder reconnectDelay(long milliseconds) {
if (milliseconds < 0) {
milliseconds = SseFeature.RECONNECT_NOT_SET;
}
this.reconnectDelay = milliseconds;
return this;
}
/**
* Add more inbound event data.
*
* @param data byte array containing data stored in the incoming event.
* @return updated builder instance.
*/
public Builder write(byte[] data) {
if (data == null || data.length == 0) {
return this;
}
try {
this.dataStream.write(data);
} catch (IOException ex) {
// ignore - this is not possible with ByteArrayOutputStream
}
return this;
}
/**
* Build a new inbound event instance using the supplied data.
*
* @return new inbound event instance.
*/
public InboundEvent build() {
return new InboundEvent(
name,
id,
commentBuilder.length() > 0 ? commentBuilder.substring(0, commentBuilder.length() - 1) : null,
reconnectDelay,
dataStream.toByteArray(),
workers,
annotations,
mediaType,
headers);
}
}
private InboundEvent(final String name,
final String id,
final String comment,
final long reconnectDelay,
final byte[] data,
final MessageBodyWorkers messageBodyWorkers,
final Annotation[] annotations,
final MediaType mediaType,
final MultivaluedMap<String, String> headers) {
this.name = name;
this.id = id;
this.comment = comment;
this.reconnectDelay = reconnectDelay;
this.data = stripLastLineBreak(data);
this.messageBodyWorkers = messageBodyWorkers;
this.annotations = annotations;
this.mediaType = mediaType;
this.headers = headers;
}
/**
* Get event name.
* <p>
* Contains value of SSE {@code "event"} field. This field is optional. Method may return {@code null}, if the event
* name is not specified.
* </p>
*
* @return event name, or {@code null} if not set.
*/
public String getName() {
return name;
}
/**
* Get event identifier.
* <p>
* Contains value of SSE {@code "id"} field. This field is optional. Method may return {@code null}, if the event
* identifier is not specified.
* </p>
*
* @return event id.
* @since 2.3
*/
public String getId() {
return id;
}
/**
* Get a comment string that accompanies the event.
* <p>
* Contains value of the comment associated with SSE event. This field is optional. Method may return {@code null},
* if the event comment is not specified.
* </p>
*
* @return comment associated with the event.
* @since 2.21
*/
public String getComment() {
return comment;
}
/**
* Get new connection retry time in milliseconds the event receiver should wait before attempting to
* reconnect after a connection to the SSE event source is lost.
* <p>
* Contains value of SSE {@code "retry"} field. This field is optional. Method returns {@link SseFeature#RECONNECT_NOT_SET}
* if no value has been set.
* </p>
*
* @return reconnection delay in milliseconds or {@link SseFeature#RECONNECT_NOT_SET} if no value has been set.
* @since 2.3
*/
public long getReconnectDelay() {
return reconnectDelay;
}
/**
* Check if the connection retry time has been set in the event.
*
* @return {@code true} if new reconnection delay has been set in the event, {@code false} otherwise.
* @since 2.3
*/
public boolean isReconnectDelaySet() {
return reconnectDelay > SseFeature.RECONNECT_NOT_SET;
}
/**
* Check if the event is empty (i.e. does not contain any data).
*
* @return {@code true} if current instance does not contain any data, {@code false} otherwise.
*/
public boolean isEmpty() {
return data.length == 0;
}
/**
* Get the original event data string {@link String}.
*
* @return event data de-serialized into a string.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
public String readData() {
return readData(STRING_AS_GENERIC_TYPE, null);
}
/**
* Read event data as a given Java type.
*
* @param type Java type to be used for event data de-serialization.
* @return event data de-serialized as an instance of a given type.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
public <T> T readData(Class<T> type) {
return readData(new GenericType<T>(type), null);
}
/**
* Read event data as a given generic type.
*
* @param type generic type to be used for event data de-serialization.
* @return event data de-serialized as an instance of a given type.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
@SuppressWarnings("unused")
public <T> T readData(GenericType<T> type) {
return readData(type, null);
}
/**
* Read event data as a given Java type.
*
* @param messageType Java type to be used for event data de-serialization.
* @param mediaType {@link MediaType media type} to be used for event data de-serialization.
* @return event data de-serialized as an instance of a given type.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
@SuppressWarnings("unused")
public <T> T readData(Class<T> messageType, MediaType mediaType) {
return readData(new GenericType<T>(messageType), mediaType);
}
/**
* Read event data as a given generic type.
*
* @param type generic type to be used for event data de-serialization.
* @param mediaType {@link MediaType media type} to be used for event data de-serialization.
* @return event data de-serialized as an instance of a given type.
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
* @since 2.3
*/
public <T> T readData(GenericType<T> type, MediaType mediaType) {
final MediaType effectiveMediaType = mediaType == null ? this.mediaType : mediaType;
final MessageBodyReader reader =
messageBodyWorkers.getMessageBodyReader(type.getRawType(), type.getType(), annotations, mediaType);
if (reader == null) {
throw new MessageBodyProviderNotFoundException(LocalizationMessages.EVENT_DATA_READER_NOT_FOUND());
}
return readAndCast(type, effectiveMediaType, reader);
}
@SuppressWarnings("unchecked")
private <T> T readAndCast(GenericType<T> type, MediaType effectiveMediaType, MessageBodyReader reader) {
try {
return (T) reader.readFrom(
type.getRawType(),
type.getType(),
annotations,
effectiveMediaType,
headers,
new ByteArrayInputStream(data));
} catch (IOException ex) {
throw new ProcessingException(ex);
}
}
/**
* Get the raw event data bytes.
*
* @return raw event data bytes. The returned byte array may be empty if the event does not
* contain any data.
*/
@SuppressWarnings("unused")
public byte[] getRawData() {
if (isEmpty()) {
return data;
}
return Arrays.copyOf(data, data.length);
}
@Override
public String toString() {
String s;
try {
s = readData();
} catch (ProcessingException e) {
s = "<Error reading data into a string>";
}
return "InboundEvent{"
+ "name='" + name + '\''
+ ", id='" + id + '\''
+ ", comment=" + (comment == null ? "[no comments]" : '\'' + comment + '\'')
+ ", data=" + s
+ '}';
}
/**
* String last line break from data. (Last line-break should not be considered as part of received data).
*
* @param data data
* @return updated byte array.
*/
private static byte[] stripLastLineBreak(final byte[] data) {
if (data.length > 0 && data[data.length - 1] == '\n') {
return Arrays.copyOf(data, data.length - 1);
}
return data;
}
}