/
Acknowledgements.java
571 lines (509 loc) · 22.5 KB
/
Acknowledgements.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.model.signals.acks;
import static org.eclipse.ditto.base.model.common.ConditionChecker.argumentNotEmpty;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.common.ResponseType;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.entity.type.WithEntityType;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.contenttype.ContentType;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonParsableCommandResponse;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.base.model.signals.WithOptionalEntity;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonArrayBuilder;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
/**
* Acknowledgements aggregate several {@link Acknowledgement}s and contain an aggregated overall
* {@link #getHttpStatus() HTTP status} describing the aggregated status of all contained Acknowledgements as well as
* a {@link #getEntity(org.eclipse.ditto.base.model.json.JsonSchemaVersion)} returning the contained Json entity.
*
* @since 1.1.0
*/
@JsonParsableCommandResponse(type = Acknowledgements.TYPE)
@Immutable
public final class Acknowledgements
implements Iterable<Acknowledgement>, CommandResponse<Acknowledgements>, WithOptionalEntity<Acknowledgements>,
WithEntityType, SignalWithEntityId<Acknowledgements> {
static final String TYPE = "acknowledgements";
private final EntityId entityId;
private final List<Acknowledgement> acknowledgements;
private final HttpStatus httpStatus;
private final DittoHeaders dittoHeaders;
private Acknowledgements(final EntityId entityId,
final Collection<Acknowledgement> acknowledgements,
final HttpStatus httpStatus,
final DittoHeaders dittoHeaders) {
this.entityId = entityId;
this.acknowledgements = Collections.unmodifiableList(new ArrayList<>(acknowledgements));
this.httpStatus = checkNotNull(httpStatus, "httpStatus");
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder =
checkNotNull(dittoHeaders, "dittoHeaders").isResponseRequired() ? dittoHeaders
.toBuilder()
.responseRequired(false) : dittoHeaders.toBuilder();
this.dittoHeaders = dittoHeadersBuilder
.removeHeader(DittoHeaderDefinition.REQUESTED_ACKS.getKey())
.build();
}
/**
* Returns a new instance of {@code Acknowledgements} with the given acknowledgements.
*
* @param acknowledgements the acknowledgements of the result.
* @param dittoHeaders the headers of the returned Acknowledgements instance.
* @return the instance.
* @throws NullPointerException if any argument is {@code null}.
* @throws IllegalArgumentException if the given {@code acknowledgements} are empty or if the entity IDs or entity
* types of the given acknowledgements are not equal.
*/
public static Acknowledgements of(final Collection<Acknowledgement> acknowledgements,
final DittoHeaders dittoHeaders) {
argumentNotEmpty(acknowledgements, "acknowledgements");
return of(getEntityId(acknowledgements), acknowledgements, getCombinedHttpStatus(acknowledgements),
dittoHeaders);
}
/**
* Returns a new instance of {@code Acknowledgements} with the given parameters.
*
* @param entityId the ID of the affected entity being acknowledged.
* @param acknowledgements the map of acknowledgements to be included in the result.
* @param httpStatus the HTTP status of the combined Acknowledgements.
* @param dittoHeaders the headers of the returned Acknowledgements instance.
* @return the Acknowledgements.
* @throws NullPointerException if any argument is {@code null}.
* @throws IllegalArgumentException if the given {@code acknowledgements} are empty or if the entity IDs or entity
* types of the given acknowledgements are not equal.
* @since 2.0.0
*/
public static Acknowledgements of(final EntityId entityId,
final Collection<Acknowledgement> acknowledgements,
final HttpStatus httpStatus,
final DittoHeaders dittoHeaders) {
checkNotNull(entityId, "entityId");
argumentNotEmpty(acknowledgements, "acknowledgements");
checkNotNull(httpStatus, "httpStatus");
checkNotNull(dittoHeaders, "dittoHeaders");
return new Acknowledgements(entityId, acknowledgements, httpStatus, dittoHeaders);
}
private static EntityId getEntityId(final Iterable<Acknowledgement> acknowledgements) {
final Iterator<Acknowledgement> acknowledgementIterator = acknowledgements.iterator();
Acknowledgement acknowledgement = acknowledgementIterator.next();
final EntityId entityId = acknowledgement.getEntityId();
while (acknowledgementIterator.hasNext()) {
acknowledgement = acknowledgementIterator.next();
final EntityId acknowledgementEntityId = acknowledgement.getEntityId();
if (!entityId.equals(acknowledgement.getEntityId())) {
final String pattern = "The entity ID <{0}> is not compatible with <{1}>!";
throw new IllegalArgumentException(MessageFormat.format(pattern, acknowledgementEntityId, entityId));
}
}
return entityId;
}
private static HttpStatus getCombinedHttpStatus(final Collection<Acknowledgement> acknowledgements) {
final HttpStatus result;
if (1 == acknowledgements.size()) {
result = acknowledgements.stream()
.findFirst()
.map(Acknowledgement::getHttpStatus)
.orElse(HttpStatus.INTERNAL_SERVER_ERROR);
} else {
final Stream<Acknowledgement> acknowledgementStream = acknowledgements.stream();
final boolean allAcknowledgementsSuccessful = acknowledgementStream.allMatch(Acknowledgement::isSuccess);
if (allAcknowledgementsSuccessful) {
result = HttpStatus.OK;
} else {
result = HttpStatus.FAILED_DEPENDENCY;
}
}
return result;
}
/**
* Returns an empty instance of {@code Acknowledgements}.
*
* @param entityId the entity ID for which no acknowledgements were received at all.
* @param dittoHeaders the headers of the returned Acknowledgements instance.
* @return the instance.
* @throws NullPointerException if any argument is {@code null}.
*/
public static Acknowledgements empty(final EntityId entityId, final DittoHeaders dittoHeaders) {
final List<Acknowledgement> acknowledgements = Collections.emptyList();
return new Acknowledgements(checkNotNull(entityId, "entityId"),
acknowledgements,
getCombinedHttpStatus(acknowledgements),
dittoHeaders);
}
/**
* Returns the HTTP status of the Acknowledgements:
* <ul>
* <li>If only one acknowledgement is included, its status is returned.</li>
* <li>
* If several acknowledgements are included:
* <ul>
* <li>
* If all contained acknowledgements are successful, the overall HTTP status is
* {@link org.eclipse.ditto.base.model.common.HttpStatus#OK}.
* </li>
* <li>
* If at least one acknowledgement failed, the overall HTTP status is
* {@link org.eclipse.ditto.base.model.common.HttpStatus#FAILED_DEPENDENCY}.
* </li>
* </ul>
* </li>
* </ul>
*
* @return the HTTP status.
* @since 2.0.0
*/
@Override
public HttpStatus getHttpStatus() {
return httpStatus;
}
/**
* Returns a set containing the the AcknowledgementLabels.
*
* @return the unanswered acknowledgement labels.
* Changes on the returned set are not reflected back to this AcknowledgementsPerRequest instance.
*/
public Set<AcknowledgementLabel> getMissingAcknowledgementLabels() {
return stream()
.filter(Acknowledgement::isTimeout)
.map(Acknowledgement::getLabel)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
/**
* Returns a set containing the the successful acknowledgements.
*
* @return the successful acknowledgements.
* The returned set maintains the order in which the acknowledgement were received.
* Changes on the returned set are not reflected back to this AcknowledgementsPerRequest instance.
*/
public Set<Acknowledgement> getSuccessfulAcknowledgements() {
return stream()
.filter(Acknowledgement::isSuccess)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
/**
* Returns a set containing the the failed acknowledgements.
*
* @return the failed acknowledgements.
* The returned set maintains the order in which the acknowledgement were received.
* Changes on the returned set are not reflected back to this AcknowledgementsPerRequest instance.
*/
public Set<Acknowledgement> getFailedAcknowledgements() {
return stream()
.filter(acknowledgement -> !acknowledgement.isSuccess())
.collect(Collectors.toCollection(LinkedHashSet::new));
}
/**
* Returns the in this Acknowledgements contained acknowledgement identified by the passed
* {@code acknowledgementLabel}, if it was present. {@link java.util.Optional#empty()} otherwise.
*
* @param acknowledgementLabel the acknowledgement label to return.
* @return the found acknowledgement if the {@code acknowledgementLabel} was part of this Acknowledgements, empty
* Optional otherwise.
*/
public Optional<Acknowledgement> getAcknowledgement(final AcknowledgementLabel acknowledgementLabel) {
return stream()
.filter(ack -> acknowledgementLabel.equals(ack.getLabel()))
.findAny();
}
@Override
public Iterator<Acknowledgement> iterator() {
return acknowledgements.iterator();
}
/**
* Returns the size of the Acknowledgements, i. e. the number of contained values.
*
* @return the size.
*/
public int getSize() {
return acknowledgements.size();
}
/**
* Indicates whether this Acknowledgements is empty.
*
* @return {@code true} if this Acknowledgements does not contain any values, {@code false} else.
*/
public boolean isEmpty() {
return acknowledgements.isEmpty();
}
/**
* Returns a sequential {@code Stream} with the values of this Acknowledgements as its source.
*
* @return a sequential stream of the Acknowledgements of this container.
*/
public Stream<Acknowledgement> stream() {
return acknowledgements.stream();
}
@Override
public DittoHeaders getDittoHeaders() {
if (acknowledgements.size() == 1) {
return acknowledgements.get(0).getDittoHeaders().toBuilder().putHeaders(dittoHeaders).build();
}
return dittoHeaders;
}
@Override
public Acknowledgements setDittoHeaders(final DittoHeaders dittoHeaders) {
return new Acknowledgements(entityId, acknowledgements, httpStatus, dittoHeaders);
}
@Override
public EntityId getEntityId() {
return entityId;
}
@Override
public EntityType getEntityType() {
return entityId.getEntityType();
}
@Override
public String getType() {
return TYPE;
}
/**
* Returns the JSON representation of this Acknowledgements' entity.
* <ul>
* <li>
* If only one acknowledgement is included, the {@link Acknowledgement#getEntity(org.eclipse.ditto.base.model.json.JsonSchemaVersion)} of this
* Ack is returned.
* </li>
* <li>
* If several acknowledgements are included, the {@link Acknowledgement#getEntity(org.eclipse.ditto.base.model.json.JsonSchemaVersion)} of
* each Ack is returned in a JsonObject with the AcknowledgementLabel as key of each Ack entry.
* </li>
* </ul>
*
* @param schemaVersion the JsonSchemaVersion in which to return the JSON.
* @return the entity's JSON representation.
*/
@Override
public Optional<JsonValue> getEntity(final JsonSchemaVersion schemaVersion) {
final int acknowledgementsSize = acknowledgements.size();
final Optional<JsonValue> result;
if (0 == acknowledgementsSize) {
result = Optional.empty();
} else if (1 == acknowledgementsSize) {
final Acknowledgement soleAcknowledgement = acknowledgements.get(0);
result = soleAcknowledgement.getEntity(schemaVersion);
} else {
result = Optional.of(acknowledgementsEntitiesToJson(schemaVersion));
}
return result;
}
@Override
public Acknowledgements setEntity(final JsonValue entity) {
return this;
}
private JsonObject acknowledgementsEntitiesToJson(final JsonSchemaVersion schemaVersion) {
return acknowledgementsToJsonWithDisambiguation(schemaVersion, FieldType.all(), (ack, version, predicate) -> {
final JsonObjectBuilder jsonObjectBuilder = JsonObject.newBuilder()
.set(Acknowledgement.JsonFields.STATUS_CODE, ack.getHttpStatus().getCode());
final Optional<JsonValue> ackEntity = ack.getEntity(version);
ackEntity.ifPresent(ae -> jsonObjectBuilder.set(Acknowledgement.JsonFields.PAYLOAD, ae));
final DittoHeaders ackHeaders = ack.getDittoHeaders();
jsonObjectBuilder.set(Acknowledgement.JsonFields.DITTO_HEADERS, buildHeadersJson(ackHeaders));
return jsonObjectBuilder.build();
});
}
private static JsonObject buildHeadersJson(final DittoHeaders dittoHeaders) {
final boolean containsDittoContentType = dittoHeaders.getDittoContentType()
.filter(ContentType::isDittoProtocol)
.isPresent();
if (containsDittoContentType) {
return dittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.CONTENT_TYPE.getKey())
.build()
.toJson();
} else {
return dittoHeaders.toJson();
}
}
/**
* Create a JSON object of acknowledgements with labels as key such that acks of the same label are grouped into
* an array.
*
* @param schemaVersion the schema version.
* @param predicate the JSON field predicate.
* @param acknowledgementToJson the function to turn each ack into a JSON object.
* @return the disambiguated JSON object.
*/
private JsonObject acknowledgementsToJsonWithDisambiguation(final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> predicate,
final Acknowledgements.AcknowledgementToJson acknowledgementToJson) {
// use a linked hash map to preserve the order of the first appearance of ack labels
final Map<CharSequence, JsonArrayBuilder> disambiguationMap = new LinkedHashMap<>();
for (final Acknowledgement ack : acknowledgements) {
disambiguationMap.compute(ack.getLabel(), (label, previousBuilder) -> {
final JsonArrayBuilder builder = previousBuilder == null ? JsonArray.newBuilder() : previousBuilder;
return builder.add(acknowledgementToJson.toJson(ack, schemaVersion, predicate));
});
}
return disambiguationMap.entrySet()
.stream()
.map(entry -> {
final JsonArray array = entry.getValue().build();
final JsonValue value = array.getSize() == 1 ? array.get(0).orElse(array) : array;
return JsonField.newInstance(entry.getKey(), value);
})
.collect(JsonCollectors.fieldsToObject());
}
/**
* Returns a new {@code Acknowledgements} parsed from the given JSON object.
*
* @param jsonObject the JSON object to be parsed.
* @param dittoHeaders the ditto headers of the acknowledgements
* @return the Acknowledgements.
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws org.eclipse.ditto.json.JsonMissingFieldException if {@code jsonObject} misses a required field.
* @throws org.eclipse.ditto.json.JsonParseException if {@code jsonObject} contained an unexpected value type.
*/
public static Acknowledgements fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
return AcknowledgementsJsonParser.getInstance(new AcknowledgementJsonParser()).apply(jsonObject, dittoHeaders);
}
@Override
public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<JsonField> thePredicate) {
final Predicate<JsonField> predicate = schemaVersion.and(thePredicate);
final JsonObject acksJsonObject =
acknowledgementsToJsonWithDisambiguation(schemaVersion, thePredicate, Acknowledgement::toJson);
return JsonObject.newBuilder()
.set(CommandResponse.JsonFields.TYPE, getType(), predicate)
.set(JsonFields.ENTITY_ID, entityId.toString(), predicate)
.set(JsonFields.ENTITY_TYPE, getEntityType().toString(), predicate)
.set(JsonFields.STATUS_CODE, httpStatus.getCode(), predicate)
.set(JsonFields.ACKNOWLEDGEMENTS, acksJsonObject, predicate)
.build();
}
@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Acknowledgements that = (Acknowledgements) o;
return entityId.equals(that.entityId) &&
acknowledgements.equals(that.acknowledgements) &&
httpStatus.equals(that.httpStatus) &&
dittoHeaders.equals(that.dittoHeaders);
}
@Override
public int hashCode() {
return Objects.hash(entityId, acknowledgements, httpStatus, dittoHeaders);
}
@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"entityId=" + entityId +
", acknowledgements=" + acknowledgements +
", httpStatus=" + httpStatus +
", dittoHeaders=" + dittoHeaders +
"]";
}
@Override
public ResponseType getResponseType() {
if (stream().allMatch(Acknowledgement::isSuccess)) {
return ResponseType.RESPONSE;
} else {
return ResponseType.NACK;
}
}
/**
* Returns all non-hidden marked fields of this Acknowledgement.
*
* @return a JSON object representation of this Acknowledgement including only non-hidden marked fields.
*/
@Override
public JsonObject toJson() {
return toJson(FieldType.notHidden());
}
@Override
public String getManifest() {
return getType();
}
@Override
public JsonPointer getResourcePath() {
return JsonPointer.empty();
}
@Override
public String getResourceType() {
return getType();
}
/**
* Definition of fields of the JSON representation of an {@link Acknowledgements}.
*/
@Immutable
public static final class JsonFields {
private JsonFields() {
throw new AssertionError();
}
/**
* Definition of the JSON field for the Acknowledgements' entity ID.
*/
static final JsonFieldDefinition<String> ENTITY_ID =
JsonFactory.newStringFieldDefinition("entityId", FieldType.REGULAR,
JsonSchemaVersion.V_2);
/**
* Definition of the JSON field for the Acknowledgements' entity type.
*/
static final JsonFieldDefinition<String> ENTITY_TYPE =
JsonFactory.newStringFieldDefinition("entityType", FieldType.REGULAR,
JsonSchemaVersion.V_2);
/**
* Definition of the JSON field for the Acknowledgements' statusCode.
*/
static final JsonFieldDefinition<Integer> STATUS_CODE =
JsonFactory.newIntFieldDefinition("statusCode", FieldType.REGULAR,
JsonSchemaVersion.V_2);
/**
* Definition of the JSON field for the Acknowledgements' acknowledgements.
*/
static final JsonFieldDefinition<JsonObject> ACKNOWLEDGEMENTS =
JsonFactory.newJsonObjectFieldDefinition("acknowledgements", FieldType.REGULAR,
JsonSchemaVersion.V_2);
}
@FunctionalInterface
private interface AcknowledgementToJson {
JsonObject toJson(Acknowledgement ack, JsonSchemaVersion schemaVersion, Predicate<JsonField> predicate);
}
}