Skip to content

Commit

Permalink
review: replaced Pair with PlainJson type, throw an exception when Re…
Browse files Browse the repository at this point in the history
…trieveThingsResponse JSON parsing fails instead of returning null for the json string

Signed-off-by: Johannes Schneider <johannes.schneider@bosch-si.com>
  • Loading branch information
Johannes Schneider committed Oct 18, 2018
1 parent de9b85c commit 292ef2d
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 66 deletions.
Expand Up @@ -10,12 +10,15 @@
*/
package org.eclipse.ditto.services.utils.aggregator;

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -45,7 +48,6 @@
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.Pair;
import akka.japi.pf.PFBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.PatternsCS;
Expand Down Expand Up @@ -149,38 +151,28 @@ private void handleSudoRetrieveThings(final SudoRetrieveThings rt, final Object

private void handleSourceRef(final SourceRef sourceRef, final List<String> thingIds,
final Command<?> originatingCommand, final ActorRef originatingSender) {

final Comparator<Pair<String, String>> comparator = (t1, t2) -> {
final String thingId1 = t1.first();
final String thingId2 = t2.first();
return Integer.compare(thingIds.indexOf(thingId1), thingIds.indexOf(thingId2));
};

final Function<Jsonifiable<?>, Pair<String, String>> thingPairSupplier;
final Function<List<Pair<String, String>>, CommandResponse<?>> overallResponseSupplier;
final Function<Jsonifiable<?>, PlainJson> thingPlainJsonSupplier;
final Function<List<PlainJson>, CommandResponse<?>> overallResponseSupplier;
final Function<List<PlainJson>, List<PlainJson>> plainJsonSorter = supplyPlainJsonSorter(thingIds);

if (originatingCommand instanceof SudoRetrieveThings) {
thingPairSupplier = supplyPairFromSudoRetrieveThingResponse();
overallResponseSupplier = supplySudoRetrieveThingsResponse(
originatingCommand.getDittoHeaders()
);
thingPlainJsonSupplier = supplyPlainJsonFromSudoRetrieveThingResponse();
overallResponseSupplier = supplySudoRetrieveThingsResponse(originatingCommand.getDittoHeaders());
} else {
thingPairSupplier = supplyPairFromRetrieveThingResponse();
overallResponseSupplier = supplyRetrieveThingsResponse(
originatingCommand.getDittoHeaders(),
((RetrieveThings) originatingCommand).getNamespace().orElse(null)
);
thingPlainJsonSupplier = supplyPlainJsonFromRetrieveThingResponse();
final String namespace = ((RetrieveThings) originatingCommand).getNamespace().orElse(null);
overallResponseSupplier = supplyRetrieveThingsResponse(originatingCommand.getDittoHeaders(), namespace);
}

final StartedTimer timer = DittoMetrics.expiringTimer(TRACE_AGGREGATOR_RETRIEVE_THINGS)
.tag("size", Integer.toString(thingIds.size()))
.build();

final CompletionStage<List<Pair<String, String>>> o =
(CompletionStage<List<Pair<String, String>>>) sourceRef.getSource()
final CompletionStage<List<PlainJson>> o =
(CompletionStage<List<PlainJson>>) sourceRef.getSource()
.orElse(Source.single(ThingNotAccessibleException.newBuilder("").build()))
.filterNot(el -> el instanceof DittoRuntimeException)
.map(param -> thingPairSupplier.apply((Jsonifiable<?>) param))
.map(param -> thingPlainJsonSupplier.apply((Jsonifiable<?>) param))
.log("retrieve-thing-response", log)
.recover(new PFBuilder()
.match(NoSuchElementException.class,
Expand All @@ -190,73 +182,128 @@ private void handleSourceRef(final SourceRef sourceRef, final List<String> thing
.runWith(Sink.seq(), actorMaterializer);

final CompletionStage<? extends CommandResponse<?>> commandResponseCompletionStage = o
.thenApply(listOfPairs -> {
final List<Pair<String, String>> sortedList = new ArrayList<>(listOfPairs);
sortedList.sort(comparator);
return listOfPairs;
})
.thenApply(plainJsonSorter)
.thenApply(overallResponseSupplier::apply)
.thenApply(list -> {
stopTimer(timer);
return list;
});

PatternsCS.pipe(commandResponseCompletionStage, getContext().dispatcher())
.to(originatingSender);
PatternsCS.pipe(commandResponseCompletionStage, getContext().dispatcher()).to(originatingSender);
}

private Function<Jsonifiable<?>, Pair<String, String>> supplyPairFromRetrieveThingResponse() {
return response -> {
if (response instanceof RetrieveThingResponse) {
final RetrieveThingResponse retrieveThingResponse = (RetrieveThingResponse) response;
return Pair.apply(
retrieveThingResponse.getId(),
retrieveThingResponse.getEntityPlainString()
.orElseGet(() -> retrieveThingResponse.getEntity(
retrieveThingResponse.getImplementedSchemaVersion()).toString())
);
private Function<Jsonifiable<?>, PlainJson> supplyPlainJsonFromRetrieveThingResponse() {
return jsonifiable -> {
if (jsonifiable instanceof RetrieveThingResponse) {
final RetrieveThingResponse response = (RetrieveThingResponse) jsonifiable;
final String json = response.getEntityPlainString().orElseGet(() ->
response.getEntity(response.getImplementedSchemaVersion()).toString());
return PlainJson.of(response.getId(), json);
} else {
return Pair.apply(null, null);
return null;
}
};
}

private Function<Jsonifiable<?>, Pair<String, String>> supplyPairFromSudoRetrieveThingResponse() {
return response -> {
if (response instanceof SudoRetrieveThingResponse) {
final SudoRetrieveThingResponse retrieveThingResponse = (SudoRetrieveThingResponse) response;
return Pair.apply(
retrieveThingResponse.getId(),
retrieveThingResponse.getEntityPlainString()
.orElseGet(() -> retrieveThingResponse.getEntity(
retrieveThingResponse.getImplementedSchemaVersion()).toString())

);
private Function<Jsonifiable<?>, PlainJson> supplyPlainJsonFromSudoRetrieveThingResponse() {
return jsonifiable -> {
if (jsonifiable instanceof SudoRetrieveThingResponse) {
final SudoRetrieveThingResponse response = (SudoRetrieveThingResponse) jsonifiable;
final String json = response.getEntityPlainString().orElseGet(() ->
response.getEntity(response.getImplementedSchemaVersion()).toString());
return PlainJson.of(response.getId(), json);
} else {
return Pair.apply(null, null);
return null;
}
};
}

private Function<List<Pair<String, String>>, CommandResponse<?>> supplyRetrieveThingsResponse(
private Function<List<PlainJson>, List<PlainJson>> supplyPlainJsonSorter(final List<String> thingIds) {
return plainJsonThings -> {
final Comparator<PlainJson> comparator = (pj1, pj2) -> {
final String thingId1 = pj1.getId();
final String thingId2 = pj2.getId();
return Integer.compare(thingIds.indexOf(thingId1), thingIds.indexOf(thingId2));
};

final List<PlainJson> sortedList = new ArrayList<>(plainJsonThings);
sortedList.sort(comparator);
return sortedList;
};
}

private Function<List<PlainJson>, CommandResponse<?>> supplyRetrieveThingsResponse(
final DittoHeaders dittoHeaders,
@Nullable final String namespace) {
return plainJsonThings -> RetrieveThingsResponse.of(plainJsonThings.stream()
.map(Pair::second)
.collect(Collectors.toList()),
namespace,
dittoHeaders);
.map(PlainJson::getJson)
.collect(Collectors.toList()), namespace, dittoHeaders);
}

private Function<List<Pair<String, String>>, CommandResponse<?>> supplySudoRetrieveThingsResponse(
private Function<List<PlainJson>, CommandResponse<?>> supplySudoRetrieveThingsResponse(
final DittoHeaders dittoHeaders) {
return plainJsonThings -> SudoRetrieveThingsResponse.of(plainJsonThings.stream()
.map(Pair::second)
.collect(Collectors.toList()),
dittoHeaders);
.map(PlainJson::getJson)
.collect(Collectors.toList()), dittoHeaders);
}

private static void stopTimer(final StartedTimer timer) {
timer.stop(); // stop timer
}

/**
* Internal representation of an entity's JSON string.
*/
private static final class PlainJson {

private final String id;
private final String json;

private PlainJson(final String id, final String json) {
this.id = id;
this.json = json;
}

static PlainJson of(final String id, final String json) {
checkNotNull(id, "ID");
checkNotNull(json, "JSON");
return new PlainJson(id, json);
}

String getId() {
return id;
}

String getJson() {
return json;
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final PlainJson plainJson = (PlainJson) o;
return Objects.equals(id, plainJson.id) &&
Objects.equals(json, plainJson.json);
}

@Override
public int hashCode() {
return Objects.hash(id, json);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
", id=" + id +
", json=" + json +
"]";
}

}

}
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonMissingFieldException;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
Expand Down Expand Up @@ -145,10 +146,17 @@ public static RetrieveThingResponse fromJson(final JsonObject jsonObject, final
jsonObject.getValueOrThrow(ThingQueryCommandResponse.JsonFields.JSON_THING_ID);
final JsonObject extractedThing = jsonObject.getValue(JSON_THING).orElse(null);
final String extractedThingPlainJson = jsonObject.getValue(JSON_THING_PLAIN_JSON)
.orElseGet(() -> extractedThing != null ? extractedThing.toString() : null);

return new RetrieveThingResponse(thingId, statusCode,
extractedThing, extractedThingPlainJson, dittoHeaders);
.orElseGet(() -> {
if (null == extractedThing) {
throw JsonMissingFieldException.newBuilder()
.fieldName(JSON_THING.getPointer())
.build();
}
return extractedThing.toString();
});

return new RetrieveThingResponse(thingId, statusCode, extractedThing, extractedThingPlainJson,
dittoHeaders);
});
}

Expand Down Expand Up @@ -209,9 +217,10 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Js

@Override
protected boolean canEqual(@Nullable final Object other) {
return (other instanceof RetrieveThingResponse);
return other instanceof RetrieveThingResponse;
}

@SuppressWarnings("OverlyComplexMethod")
@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
Expand Down

0 comments on commit 292ef2d

Please sign in to comment.