Skip to content

Commit

Permalink
Issue #611: fixed AcknowledgementsAdapter for single acks
Browse files Browse the repository at this point in the history
* fixed StreamingSessionActor so that he sends out commandResponses again (was broken)

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Apr 3, 2020
1 parent 12624d8 commit 40d9567
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.text.MessageFormat;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -85,32 +84,43 @@ private static ThingId getThingId(final Adaptable adaptable) {
private static List<Acknowledgement> gatherContainedAcknowledgements(final Adaptable adaptable,
final ThingId thingId) {

return Optional.ofNullable(getPayloadValueOrNull(adaptable))
.filter(JsonValue::isObject)
.map(value -> value.asObject().stream()
.map(field -> {
if (filterForAcknowledgementJsonObject(field)) {
return ThingAcknowledgementFactory.fromJson(
field.getValue().asObject().toBuilder()
.set(Acknowledgement.JsonFields.LABEL, field.getKey().toString())
.set(Acknowledgement.JsonFields.ENTITY_ID, thingId.toString())
.set(Acknowledgement.JsonFields.ENTITY_TYPE,
ThingConstants.ENTITY_TYPE.toString())
.build()
);
} else {
return ThingAcknowledgementFactory.newAcknowledgement(
getLabelInCaseOfSingleAcknowledgement(adaptable),
getThingId(adaptable),
getStatusCodeOrThrow(adaptable),
adaptable.getDittoHeaders(),
JsonObject.newBuilder().set(field).build()
);
}
})
)
.orElse(Stream.empty())
.collect(Collectors.toList());
final JsonValue adaptablePayloadValue = adaptable.getPayload().getValue().orElse(JsonValue.nullLiteral());
return buildAcknowledgements(adaptable, thingId, adaptablePayloadValue);
}

private static List<Acknowledgement> buildAcknowledgements(final Adaptable adaptable, final ThingId thingId,
final JsonValue value) {

if (value.isNull()) {
return Collections.singletonList(buildSingleAcknowledgement(adaptable, null));
} else if (value.isObject()) {
return value.asObject().stream().map(field -> {
if (filterForAcknowledgementJsonObject(field)) {
return ThingAcknowledgementFactory.fromJson(
field.getValue().asObject().toBuilder()
.set(Acknowledgement.JsonFields.LABEL, field.getKey().toString())
.set(Acknowledgement.JsonFields.ENTITY_ID, thingId.toString())
.set(Acknowledgement.JsonFields.ENTITY_TYPE, ThingConstants.ENTITY_TYPE.toString())
.build()
);
} else {
return buildSingleAcknowledgement(adaptable, field);
}
}).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}

private static Acknowledgement buildSingleAcknowledgement(final Adaptable adaptable,
@Nullable final JsonField field) {
return ThingAcknowledgementFactory.newAcknowledgement(
getLabelInCaseOfSingleAcknowledgement(adaptable),
getThingId(adaptable),
getStatusCodeOrThrow(adaptable),
adaptable.getDittoHeaders(),
null != field ? JsonObject.newBuilder().set(field).build() : null
);
}

private static AcknowledgementLabel getLabelInCaseOfSingleAcknowledgement(final Adaptable adaptable) {
Expand All @@ -133,12 +143,6 @@ private static HttpStatusCode getStatusCodeOrThrow(final Adaptable adaptable) {
.orElseThrow(() -> new JsonMissingFieldException(Payload.JsonFields.STATUS));
}

@Nullable
private static JsonValue getPayloadValueOrNull(final Adaptable adaptable) {
final Payload payload = adaptable.getPayload();
return payload.getValue().orElse(null);
}

@Override
public Adaptable toAdaptable(final Acknowledgements acknowledgements, final TopicPath.Channel channel) {
return Adaptable.newBuilder(getTopicPath(acknowledgements, channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private void handleSignalsToStartAckForwarderFor(final Signal<?> signal, final E
private void handleSignal(final Signal<?> signal) {
logger.setCorrelationId(signal);
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
if (connectionCorrelationId.equals(dittoHeaders.getOrigin().orElse(null))) {
if (!(signal instanceof CommandResponse) && connectionCorrelationId.equals(dittoHeaders.getOrigin().orElse(null))) {
logger.debug("Got Signal <{}> in <{}> session, but this was issued by this connection itself, not telling" +
" EventAndResponsePublisher about it", signal.getType(), type);
} else if (signal instanceof Acknowledgements){
Expand Down

0 comments on commit 40d9567

Please sign in to comment.