Skip to content

Commit

Permalink
make SudoStreamThings no longer a sudo command
Browse files Browse the repository at this point in the history
* Since it's built internally but with information provided by the user
  via CreateSubscription this command should be handled as external command
  and also enforced

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 19, 2022
1 parent e70314b commit 14ca298
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoStreamThings;
import org.eclipse.ditto.thingsearch.api.commands.sudo.StreamThings;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -161,7 +161,7 @@ public void searchWithoutQueryParameters() {
routeResult.assertMediaType(MediaTypes.TEXT_EVENT_STREAM);
routeResult.assertStatusCode(StatusCodes.OK);
});
proxyActor.expectMsgClass(SudoStreamThings.class);
proxyActor.expectMsgClass(StreamThings.class);
replySourceRef(proxyActor, Source.lazily(Source::empty));
assertions.join();
}
Expand All @@ -178,13 +178,13 @@ public void searchWithQueryParameters() {
routeResult.assertMediaType(MediaTypes.TEXT_EVENT_STREAM);
routeResult.assertStatusCode(StatusCodes.OK);
});
final SudoStreamThings sudoStreamThings = proxyActor.expectMsgClass(SudoStreamThings.class);
final StreamThings streamThings = proxyActor.expectMsgClass(StreamThings.class);
replySourceRef(proxyActor, Source.lazily(Source::empty));
assertions.join();
assertThat(sudoStreamThings.getFilter()).contains(filter);
assertThat(sudoStreamThings.getNamespaces()).contains(Set.of("a", "b", "c"));
assertThat(streamThings.getFilter()).contains(filter);
assertThat(streamThings.getNamespaces()).contains(Set.of("a", "b", "c"));
// sort options are parsed and appended with +/thingId
assertThat(sudoStreamThings.getSort()).contains("sort(-/policyId,+/thingId)");
assertThat(streamThings.getSort()).contains("sort(-/policyId,+/thingId)");
}

@Test
Expand All @@ -209,10 +209,10 @@ public void searchWithResumption() {
null,
retrieveThing.getDittoHeaders()
));
final SudoStreamThings sudoStreamThings = proxyActor.expectMsgClass(SudoStreamThings.class);
final StreamThings streamThings = proxyActor.expectMsgClass(StreamThings.class);
replySourceRef(proxyActor, Source.lazily(Source::empty));
assertions.join();
assertThat(sudoStreamThings.getSortValues()).contains(JsonArray.of(JsonValue.of(lastEventId)));
assertThat(streamThings.getSortValues()).contains(JsonArray.of(JsonValue.of(lastEventId)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoStreamThings;
import org.eclipse.ditto.thingsearch.api.commands.sudo.StreamThings;
import org.eclipse.ditto.thingsearch.api.events.ThingsOutOfSync;

import akka.NotUsed;
Expand Down Expand Up @@ -68,7 +68,7 @@ public final class SearchSource {
private final Duration searchAskTimeout;
@Nullable private final JsonFieldSelector fields;
private final JsonFieldSelector sortFields;
private final SudoStreamThings sudoStreamThings;
private final StreamThings streamThings;
private final boolean thingIdOnly;
private final String lastThingId;

Expand All @@ -78,15 +78,15 @@ public final class SearchSource {
final Duration searchAskTimeout,
@Nullable final JsonFieldSelector fields,
final JsonFieldSelector sortFields,
final SudoStreamThings sudoStreamThings,
final StreamThings streamThings,
final String lastThingId) {
this.pubSubMediator = pubSubMediator;
this.commandForwarder = commandForwarder;
this.thingsAskTimeout = thingsAskTimeout;
this.searchAskTimeout = searchAskTimeout;
this.fields = fields;
this.sortFields = sortFields;
this.sudoStreamThings = sudoStreamThings;
this.streamThings = streamThings;
this.thingIdOnly = fields != null && fields.getSize() == 1 &&
fields.getPointers().contains(Thing.JsonFields.ID.getPointer());
this.lastThingId = lastThingId;
Expand Down Expand Up @@ -142,11 +142,11 @@ public Source<Pair<String, JsonObject>, NotUsed> startAsPair(final Consumer<Resu
*/
private Optional<Throwable> mapError(final Throwable error) {
if (error instanceof RemoteStreamRefActorTerminatedException) {
LOGGER.withCorrelationId(sudoStreamThings).info("Resuming from: {}", error.toString());
LOGGER.withCorrelationId(streamThings).info("Resuming from: {}", error.toString());
return Optional.empty();
} else {
return Optional.of(DittoRuntimeException.asDittoRuntimeException(error, e -> {
LOGGER.withCorrelationId(sudoStreamThings).error("Unexpected error", e);
LOGGER.withCorrelationId(streamThings).error("Unexpected error", e);
return DittoInternalErrorException.newBuilder().build();
}));
}
Expand All @@ -166,11 +166,11 @@ private String nextSeed(final List<Pair<String, JsonObject>> finalElements) {
: finalElements.get(finalElements.size() - 1).first();
}

private Source<SudoStreamThings, NotUsed> streamThingsFrom(final String lastThingId) {
private Source<StreamThings, NotUsed> streamThingsFrom(final String lastThingId) {
if (lastThingId.isEmpty()) {
return Source.single(sudoStreamThings);
return Source.single(streamThings);
} else {
return retrieveSortValues(lastThingId).map(sudoStreamThings::setSortValues);
return retrieveSortValues(lastThingId).map(streamThings::setSortValues);
}
}

Expand Down Expand Up @@ -223,7 +223,7 @@ private Source<JsonObject, NotUsed> retrieveThing(final String thingId,
}

private DittoHeaders getDittoHeaders() {
return sudoStreamThings.getDittoHeaders();
return streamThings.getDittoHeaders();
}

private <T> Flow<Object, T, NotUsed> expectMsgClass(final Class<T> clazz) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.eclipse.ditto.rql.query.expression.ThingsFieldExpressionFactory;
import org.eclipse.ditto.rql.query.things.ModelBasedThingsFieldExpressionFactory;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoStreamThings;
import org.eclipse.ditto.thingsearch.api.commands.sudo.StreamThings;
import org.eclipse.ditto.thingsearch.model.Option;
import org.eclipse.ditto.thingsearch.model.SizeOption;
import org.eclipse.ditto.thingsearch.model.SortOption;
Expand Down Expand Up @@ -78,15 +78,15 @@ public final class SearchSourceBuilder {
* invalid.
*/
public SearchSource build() {
final SudoStreamThings sudoStreamThings = constructStreamThings();
final StreamThings streamThings = constructStreamThings();
return new SearchSource(
checkNotNull(pubSubMediator, "pubSubMediator"),
checkNotNull(commandForwarder, "commandForwarder"),
thingsAskTimeout,
searchAskTimeout,
fields,
sortFields,
sudoStreamThings,
streamThings,
lastThingId);
}

Expand Down Expand Up @@ -333,8 +333,8 @@ private SortOption findUniqueSortOption(final String optionString) {
}
}

private SudoStreamThings constructStreamThings() {
return SudoStreamThings.of(filter, namespaces, sort, sortValues, checkNotNull(dittoHeaders, "dittoHeaders"));
private StreamThings constructStreamThings() {
return StreamThings.of(filter, namespaces, sort, sortValues, checkNotNull(dittoHeaders, "dittoHeaders"));
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public final class SubscriptionManager extends AbstractActor {
*
* @param idleTimeout lifetime of an idle SubscriptionActor.
* @param pubSubMediator pub-sub mediator for reporting of out-of-sync things.
* @param proxyActor recipient of thing and SudoStreamThings commands.
* @param proxyActor recipient of thing and StreamThings commands.
* @param materializer materializer for the search streams.
* @return Props of the actor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoStreamThings;
import org.eclipse.ditto.thingsearch.api.commands.sudo.StreamThings;
import org.eclipse.ditto.thingsearch.api.events.ThingsOutOfSync;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -205,8 +205,8 @@ private SourceRef<Object> materializeSourceProbe() {
return materializedValues.second();
}

private SudoStreamThings streamThings(@Nullable final JsonArray sortValues) {
return SudoStreamThings.of(null, null, SORT, sortValues, dittoHeaders);
private StreamThings streamThings(@Nullable final JsonArray sortValues) {
return StreamThings.of(null, null, SORT, sortValues, dittoHeaders);
}

private void startTestSearchSource(@Nullable final JsonFieldSelector fields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoStreamThings;
import org.eclipse.ditto.thingsearch.api.commands.sudo.StreamThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CancelSubscription;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CreateSubscription;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.RequestFromSubscription;
Expand Down Expand Up @@ -149,9 +149,9 @@ public void parallelSessions() {

// there should be no upstream request until downstream requests.
for (int i = 0; i < 4; ++i) {
final SudoStreamThings sudoStreamThings = edgeCommandForwarderProbe.expectMsgClass(SudoStreamThings.class);
final StreamThings streamThings = edgeCommandForwarderProbe.expectMsgClass(StreamThings.class);
final ActorRef sender = edgeCommandForwarderProbe.sender();
final Object source = sources.get(getTag(sudoStreamThings) - 1);
final Object source = sources.get(getTag(streamThings) - 1);
if (source instanceof Source) {
final SourceRef<?> sourceRef = ((Source<?, ?>) source).runWith(StreamRefs.sourceRef(), materializer);
sender.tell(sourceRef, ActorRef.noSender());
Expand Down Expand Up @@ -194,8 +194,8 @@ private static RequestFromSubscription request(final String subscriptionId) {
return RequestFromSubscription.of(subscriptionId, 1L, DittoHeaders.empty());
}

private int getTag(final SudoStreamThings sudoStreamThings) {
return sudoStreamThings.getFilter()
private int getTag(final StreamThings streamThings) {
return streamThings.getFilter()
.map(filter -> {
int i = "exists(attributes/tag".length();
return filter.substring(i, i + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,22 @@
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand;

/**
* Ditto-internal command to start or resume a search request for a stream of thing IDs.
*
* @since 1.1.0
*/
@Immutable
@JsonParsableCommand(typePrefix = ThingSearchSudoCommand.TYPE_PREFIX, name = SudoStreamThings.NAME)
public final class SudoStreamThings
extends AbstractCommand<SudoStreamThings> implements ThingSearchSudoCommand<SudoStreamThings> {
@JsonParsableCommand(typePrefix = ThingSearchSudoCommand.TYPE_PREFIX, name = StreamThings.NAME)
public final class StreamThings
extends AbstractCommand<StreamThings> implements ThingSearchQueryCommand<StreamThings> {

/**
* Name of the command.
*/
public static final String NAME = "sudoStreamThings";
public static final String NAME = "streamThings";

/**
* Type of this command.
Expand All @@ -62,7 +63,7 @@ public final class SudoStreamThings
@Nullable private final String sort;
@Nullable private final JsonArray sortValues;

private SudoStreamThings(@Nullable final String filter,
private StreamThings(@Nullable final String filter,
@Nullable final JsonArray namespaces,
@Nullable final String sort,
@Nullable final JsonArray sortValues,
Expand All @@ -76,7 +77,7 @@ private SudoStreamThings(@Nullable final String filter,
}

/**
* Returns a new instance of {@code SudoStreamThings}.
* Returns a new instance of {@code StreamThings}.
*
* @param filter the optional query filter string.
* @param namespaces namespaces to search, or null to search all namespaces.
Expand All @@ -85,16 +86,16 @@ private SudoStreamThings(@Nullable final String filter,
* @param dittoHeaders the headers of the command.
* @return a new command for streaming search results.
*/
public static SudoStreamThings of(@Nullable final String filter,
public static StreamThings of(@Nullable final String filter,
@Nullable final JsonArray namespaces,
@Nullable final String sort,
@Nullable final JsonArray sortValues,
final DittoHeaders dittoHeaders) {
return new SudoStreamThings(filter, namespaces, sort, sortValues, dittoHeaders);
return new StreamThings(filter, namespaces, sort, sortValues, dittoHeaders);
}

/**
* Creates a new {@code SudoStreamThings} from a JSON object.
* Creates a new {@code StreamThings} from a JSON object.
*
* @param jsonObject the JSON object of which the command is to be created.
* @param dittoHeaders the headers of the command.
Expand All @@ -103,13 +104,13 @@ public static SudoStreamThings of(@Nullable final String filter,
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected
* format.
*/
public static SudoStreamThings fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
return new CommandJsonDeserializer<SudoStreamThings>(TYPE, jsonObject).deserialize(() -> {
public static StreamThings fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
return new CommandJsonDeserializer<StreamThings>(TYPE, jsonObject).deserialize(() -> {
final String filter = jsonObject.getValue(JsonFields.FILTER).orElse(null);
final JsonArray namespaces = jsonObject.getValue(JsonFields.NAMESPACES).orElse(null);
final String sort = jsonObject.getValue(JsonFields.SORT).orElse(null);
final JsonArray sortValues = jsonObject.getValue(JsonFields.SORT_VALUES).orElse(null);
return new SudoStreamThings(filter, namespaces, sort, sortValues, dittoHeaders);
return new StreamThings(filter, namespaces, sort, sortValues, dittoHeaders);
});
}

Expand Down Expand Up @@ -165,14 +166,14 @@ public Optional<JsonArray> getSortValues() {
* @param namespaces the namespaces.
* @return the created command.
*/
public SudoStreamThings setNamespaces(@Nullable final Collection<String> namespaces) {
public StreamThings setNamespaces(@Nullable final Collection<String> namespaces) {
if (namespaces == null) {
return new SudoStreamThings(filter, JsonArray.empty(), sort, sortValues, getDittoHeaders());
return new StreamThings(filter, JsonArray.empty(), sort, sortValues, getDittoHeaders());
} else {
final JsonArray namespacesJson = namespaces.stream()
.map(JsonValue::of)
.collect(JsonCollectors.valuesToArray());
return new SudoStreamThings(filter, namespacesJson, sort, sortValues, getDittoHeaders());
return new StreamThings(filter, namespacesJson, sort, sortValues, getDittoHeaders());
}
}

Expand All @@ -199,8 +200,8 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Js
* @param sortValues the new sort values.
* @return the new command.
*/
public SudoStreamThings setSortValues(final JsonArray sortValues) {
return new SudoStreamThings(filter, namespaces, sort, sortValues, getDittoHeaders());
public StreamThings setSortValues(final JsonArray sortValues) {
return new StreamThings(filter, namespaces, sort, sortValues, getDittoHeaders());
}

@Override
Expand All @@ -209,19 +210,19 @@ public Category getCategory() {
}

@Override
public SudoStreamThings setDittoHeaders(final DittoHeaders dittoHeaders) {
return new SudoStreamThings(filter, namespaces, sort, sortValues, dittoHeaders);
public StreamThings setDittoHeaders(final DittoHeaders dittoHeaders) {
return new StreamThings(filter, namespaces, sort, sortValues, dittoHeaders);
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o)
return true;
if (!(o instanceof SudoStreamThings))
if (!(o instanceof StreamThings))
return false;
if (!super.equals(o))
return false;
final SudoStreamThings that = (SudoStreamThings) o;
final StreamThings that = (StreamThings) o;
return Objects.equals(filter, that.filter) &&
Objects.equals(namespaces, that.namespaces) &&
Objects.equals(sort, that.sort) &&
Expand Down
Loading

0 comments on commit 14ca298

Please sign in to comment.