Skip to content

Commit

Permalink
made RetrieveConnectionIdsByTag a ConnectivitySudoCommand
Browse files Browse the repository at this point in the history
* made it possible to overwrite "messages during startup" processing in AbstractPersistenceSupervisor
* adjusted ConnectionSupervisorActor to not load config overwrites in preStart(), but in the handleMessagesDuringStartup() instead where the DittoHeaders of the initial command are available
* inject DittoHeaders in ConnectionConfigProvider in order to lookup connection config overwrites
* enhanced MongoReadJournal by a functionality to retrieve all "tags" of the latest journal entry of a given "pid"

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jul 1, 2022
1 parent abd48f7 commit 971edbc
Show file tree
Hide file tree
Showing 15 changed files with 279 additions and 83 deletions.
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.api.commands.sudo;

import org.eclipse.ditto.base.api.commands.sudo.SudoCommand;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.Command;

/**
* Aggregates all connectivity sudo commands.
*
* @param <T> the type of the implementing class.
*/
public interface ConnectivitySudoCommand<T extends ConnectivitySudoCommand<T>> extends SudoCommand<T> {

/**
* Type Prefix of Sudo commands.
*/
String TYPE_PREFIX = "connectivity." + SUDO_TYPE_QUALIFIER;

/**
* Thing sudo resource type.
*/
String RESOURCE_TYPE = "connectivity-sudo";

@Override
default String getResourceType() {
return RESOURCE_TYPE;
}

@Override
default String getTypePrefix() {
return TYPE_PREFIX;
}

@Override
T setDittoHeaders(DittoHeaders dittoHeaders);

/**
* An enumeration of the known {@link org.eclipse.ditto.json.JsonField}s of a Connectivity sudo command.
*/
class JsonFields extends Command.JsonFields {

private JsonFields() {
throw new AssertionError();
}
}

}
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.api.commands.sudo;

import org.eclipse.ditto.base.api.commands.sudo.SudoQueryCommandResponse;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.json.JsonValue;

/**
* Aggregates all ConnectivitySudoCommand Responses.
*
* @param <T> the type of the implementing class.
*/
public interface ConnectivitySudoQueryCommandResponse<T extends ConnectivitySudoQueryCommandResponse<T>> extends
SudoQueryCommandResponse<T> {

/**
* Type Prefix of thing sudo command responses.
*/
String TYPE_PREFIX = "connectivity." + SUDO_TYPE_QUALIFIER;

@Override
default String getResourceType() {
return ConnectivitySudoCommand.RESOURCE_TYPE;
}

@Override
T setEntity(JsonValue entity);

@Override
T setDittoHeaders(DittoHeaders dittoHeaders);

/**
* An enumeration of the known {@link org.eclipse.ditto.json.JsonField}s of a Sudo Thing command response.
*/
class JsonFields extends CommandResponse.JsonFields {

private JsonFields() {
throw new AssertionError();
}
}

}
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.model.signals.commands.query;
package org.eclipse.ditto.connectivity.api.commands.sudo;

import java.util.Objects;
import java.util.function.Predicate;
Expand All @@ -24,7 +24,6 @@
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.AbstractCommand;
import org.eclipse.ditto.base.model.signals.commands.CommandJsonDeserializer;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
Expand All @@ -38,39 +37,38 @@
* @since 3.0.0
*/
@Immutable
@JsonParsableCommand(typePrefix = ConnectivityCommand.TYPE_PREFIX, name = RetrieveConnectionIdsByTag.NAME)
public final class RetrieveConnectionIdsByTag extends AbstractCommand<RetrieveConnectionIdsByTag>
implements ConnectivityQueryCommand<RetrieveConnectionIdsByTag> {
@JsonParsableCommand(typePrefix = ConnectivitySudoCommand.TYPE_PREFIX, name = SudoRetrieveConnectionIdsByTag.NAME)
public final class SudoRetrieveConnectionIdsByTag extends AbstractCommand<SudoRetrieveConnectionIdsByTag>
implements ConnectivitySudoCommand<SudoRetrieveConnectionIdsByTag> {

private static final JsonFieldDefinition<String> JSON_TAG =
JsonFieldDefinition.ofString("tag", FieldType.REGULAR, JsonSchemaVersion.V_2);
public static final String NAME = "sudoRetrieveConnectionIdsByTag";

public static final String NAME = "retrieveConnectionIdsByTag";
public static final String TYPE = TYPE_PREFIX + NAME;

public static final String TYPE = ConnectivityCommand.TYPE_PREFIX + NAME;
private static final JsonFieldDefinition<String> JSON_TAG =
JsonFieldDefinition.ofString("tag", FieldType.REGULAR, JsonSchemaVersion.V_2);

private final String tag;


private RetrieveConnectionIdsByTag(final String tag, final DittoHeaders dittoHeaders) {
private SudoRetrieveConnectionIdsByTag(final String tag, final DittoHeaders dittoHeaders) {
super(TYPE, dittoHeaders);
this.tag = tag;
}

/**
* Returns a new instance of {@code RetrieveConnectionIdsByTag}.
* Returns a new instance of {@code SudoRetrieveConnectionIdsByTag}.
*
* @param tag the tag for which the filtering should be applied.
* @param dittoHeaders the headers of the request.
* @return a new RetrieveConnectionIdsByTag command.
* @return a new SudoRetrieveConnectionIdsByTag command.
* @throws NullPointerException if any argument is {@code null}.
*/
public static RetrieveConnectionIdsByTag of(final String tag, final DittoHeaders dittoHeaders) {
return new RetrieveConnectionIdsByTag(tag, dittoHeaders);
public static SudoRetrieveConnectionIdsByTag of(final String tag, final DittoHeaders dittoHeaders) {
return new SudoRetrieveConnectionIdsByTag(tag, dittoHeaders);
}

/**
* Creates a new {@code RetrieveConnectionIdsByTag} from a JSON string.
* Creates a new {@code SudoRetrieveConnectionIdsByTag} from a JSON string.
*
* @param jsonString the JSON string of which the command is to be retrieved.
* @param dittoHeaders the headers of the command.
Expand All @@ -80,12 +78,12 @@ public static RetrieveConnectionIdsByTag of(final String tag, final DittoHeaders
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonString} was not in the expected
* format.
*/
public static RetrieveConnectionIdsByTag fromJson(final String jsonString, final DittoHeaders dittoHeaders) {
public static SudoRetrieveConnectionIdsByTag fromJson(final String jsonString, final DittoHeaders dittoHeaders) {
return fromJson(JsonFactory.newObject(jsonString), dittoHeaders);
}

/**
* Creates a new {@code RetrieveConnectionIdsByTag} from a JSON object.
* Creates a new {@code SudoRetrieveConnectionIdsByTag} from a JSON object.
*
* @param jsonObject the JSON object of which the command is to be created.
* @param dittoHeaders the headers of the command.
Expand All @@ -94,8 +92,8 @@ public static RetrieveConnectionIdsByTag fromJson(final String jsonString, final
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected
* format.
*/
public static RetrieveConnectionIdsByTag fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
return new CommandJsonDeserializer<RetrieveConnectionIdsByTag>(TYPE, jsonObject).deserialize(
public static SudoRetrieveConnectionIdsByTag fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
return new CommandJsonDeserializer<SudoRetrieveConnectionIdsByTag>(TYPE, jsonObject).deserialize(
() -> {
final String tag = jsonObject.getValueOrThrow(JSON_TAG);
return of(tag, dittoHeaders);
Expand All @@ -114,7 +112,7 @@ public Category getCategory() {
}

@Override
public RetrieveConnectionIdsByTag setDittoHeaders(final DittoHeaders dittoHeaders) {
public SudoRetrieveConnectionIdsByTag setDittoHeaders(final DittoHeaders dittoHeaders) {
return of(tag, dittoHeaders);
}

Expand All @@ -124,7 +122,7 @@ public String getTag() {

@Override
protected boolean canEqual(@Nullable final Object other) {
return other instanceof RetrieveConnectionIdsByTag;
return other instanceof SudoRetrieveConnectionIdsByTag;
}

@Override
Expand All @@ -137,7 +135,7 @@ public boolean equals(@Nullable final Object o) {
if (!super.equals(o)) {
return false;
}
final RetrieveConnectionIdsByTag that = (RetrieveConnectionIdsByTag) o;
final SudoRetrieveConnectionIdsByTag that = (SudoRetrieveConnectionIdsByTag) o;
return Objects.equals(tag, that.tag);
}

Expand Down
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.model.signals.commands.query;
package org.eclipse.ditto.connectivity.api.commands.sudo;

import java.util.Collections;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -39,66 +39,66 @@
import org.eclipse.ditto.json.JsonValue;

/**
* Response to a {@link RetrieveConnectionIdsByTag} command.
* Response to a {@link SudoRetrieveConnectionIdsByTag} command.
*
* @since 3.0.0
*/
@Immutable
@JsonParsableCommandResponse(type = RetrieveConnectionIdsByTagResponse.TYPE)
public final class RetrieveConnectionIdsByTagResponse
extends AbstractCommandResponse<RetrieveConnectionIdsByTagResponse>
implements ConnectivityQueryCommandResponse<RetrieveConnectionIdsByTagResponse> {
@JsonParsableCommandResponse(type = SudoRetrieveConnectionIdsByTagResponse.TYPE)
public final class SudoRetrieveConnectionIdsByTagResponse
extends AbstractCommandResponse<SudoRetrieveConnectionIdsByTagResponse>
implements ConnectivitySudoQueryCommandResponse<SudoRetrieveConnectionIdsByTagResponse> {

/**
* Type of this response.
*/
public static final String TYPE = TYPE_PREFIX + RetrieveConnectionIdsByTag.NAME;
public static final String TYPE = TYPE_PREFIX + SudoRetrieveConnectionIdsByTag.NAME;

static final JsonFieldDefinition<JsonArray> CONNECTION_IDS =
JsonFieldDefinition.ofJsonArray("connectionIds", FieldType.REGULAR, JsonSchemaVersion.V_2);

private static final HttpStatus HTTP_STATUS = HttpStatus.OK;

private static final CommandResponseJsonDeserializer<RetrieveConnectionIdsByTagResponse> JSON_DESERIALIZER =
private static final CommandResponseJsonDeserializer<SudoRetrieveConnectionIdsByTagResponse> JSON_DESERIALIZER =
CommandResponseJsonDeserializer.newInstance(TYPE,
context -> {
final JsonObject jsonObject = context.getJsonObject();
return new RetrieveConnectionIdsByTagResponse(
return new SudoRetrieveConnectionIdsByTagResponse(
fromArray(jsonObject.getValueOrThrow(CONNECTION_IDS)),
context.getDeserializedHttpStatus(),
context.getDittoHeaders());
});

private final Set<ConnectionId> connectionIds;

private RetrieveConnectionIdsByTagResponse(final Set<ConnectionId> connectionIds,
private SudoRetrieveConnectionIdsByTagResponse(final Set<ConnectionId> connectionIds,
final HttpStatus httpStatus,
final DittoHeaders dittoHeaders) {

super(TYPE,
CommandResponseHttpStatusValidator.validateHttpStatus(httpStatus,
Collections.singleton(HTTP_STATUS),
RetrieveConnectionIdsByTagResponse.class),
SudoRetrieveConnectionIdsByTagResponse.class),
dittoHeaders);
this.connectionIds = Collections.unmodifiableSet(new LinkedHashSet<>(connectionIds));
}

/**
* Returns a new instance of {@code RetrieveConnectionIdsByTagResponse}.
* Returns a new instance of {@code SudoRetrieveConnectionIdsByTagResponse}.
*
* @param dittoHeaders the headers of the request.
* @param connectionIds the connection ids.
* @return a new RetrieveAllConnectionIdsResponse response.
* @throws NullPointerException if any argument is {@code null}.
*/
public static RetrieveConnectionIdsByTagResponse of(final Set<ConnectionId> connectionIds,
public static SudoRetrieveConnectionIdsByTagResponse of(final Set<ConnectionId> connectionIds,
final DittoHeaders dittoHeaders) {

return new RetrieveConnectionIdsByTagResponse(connectionIds, HTTP_STATUS, dittoHeaders);
return new SudoRetrieveConnectionIdsByTagResponse(connectionIds, HTTP_STATUS, dittoHeaders);
}

/**
* Creates a new {@code RetrieveConnectionIdsByTagResponse} from a JSON string.
* Creates a new {@code SudoRetrieveConnectionIdsByTagResponse} from a JSON string.
*
* @param jsonString the JSON string of which the response is to be retrieved.
* @param dittoHeaders the headers of the response.
Expand All @@ -108,13 +108,13 @@ public static RetrieveConnectionIdsByTagResponse of(final Set<ConnectionId> conn
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonString} was not in the expected
* format.
*/
public static RetrieveConnectionIdsByTagResponse fromJson(final String jsonString,
public static SudoRetrieveConnectionIdsByTagResponse fromJson(final String jsonString,
final DittoHeaders dittoHeaders) {
return fromJson(JsonObject.of(jsonString), dittoHeaders);
}

/**
* Creates a new {@code RetrieveConnectionIdsByTagResponse} from a JSON object.
* Creates a new {@code SudoRetrieveConnectionIdsByTagResponse} from a JSON object.
*
* @param jsonObject the JSON object of which the response is to be created.
* @param dittoHeaders the headers of the response.
Expand All @@ -123,7 +123,7 @@ public static RetrieveConnectionIdsByTagResponse fromJson(final String jsonStrin
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected
* format.
*/
public static RetrieveConnectionIdsByTagResponse fromJson(final JsonObject jsonObject,
public static SudoRetrieveConnectionIdsByTagResponse fromJson(final JsonObject jsonObject,
final DittoHeaders dittoHeaders) {

return JSON_DESERIALIZER.deserialize(jsonObject, dittoHeaders);
Expand All @@ -143,7 +143,7 @@ public Set<ConnectionId> getConnectionIds() {
}

@Override
public RetrieveConnectionIdsByTagResponse setEntity(final JsonValue entity) {
public SudoRetrieveConnectionIdsByTagResponse setEntity(final JsonValue entity) {
return of(fromArray(entity.asArray()), getDittoHeaders());
}

Expand All @@ -161,13 +161,13 @@ public JsonValue getEntity(final JsonSchemaVersion schemaVersion) {
}

@Override
public RetrieveConnectionIdsByTagResponse setDittoHeaders(final DittoHeaders dittoHeaders) {
public SudoRetrieveConnectionIdsByTagResponse setDittoHeaders(final DittoHeaders dittoHeaders) {
return of(connectionIds, dittoHeaders);
}

@Override
protected boolean canEqual(@Nullable final Object other) {
return other instanceof RetrieveConnectionIdsByTagResponse;
return other instanceof SudoRetrieveConnectionIdsByTagResponse;
}

@Override
Expand All @@ -181,7 +181,7 @@ public boolean equals(@Nullable final Object o) {
if (!super.equals(o)) {
return false;
}
final RetrieveConnectionIdsByTagResponse that = (RetrieveConnectionIdsByTagResponse) o;
final SudoRetrieveConnectionIdsByTagResponse that = (SudoRetrieveConnectionIdsByTagResponse) o;
return Objects.equals(connectionIds, that.connectionIds);
}

Expand Down

0 comments on commit 971edbc

Please sign in to comment.