Skip to content

Commit

Permalink
DevOps commands error responses fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Balarev <andrey.balarev@bosch.io>
  • Loading branch information
abalarev committed Apr 26, 2022
1 parent 2967d5e commit 8f5e9d7
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public final class AggregatedDevOpsCommandResponse
private final String responsesType;

private AggregatedDevOpsCommandResponse(final JsonObject aggregatedResponses,
final String responsesType,
final HttpStatus httpStatus,
final DittoHeaders dittoHeaders) {
final String responsesType,
final HttpStatus httpStatus,
final DittoHeaders dittoHeaders) {

super(TYPE, null, null, httpStatus, dittoHeaders);
this.aggregatedResponses = aggregatedResponses;
Expand All @@ -80,16 +80,16 @@ private AggregatedDevOpsCommandResponse(final JsonObject aggregatedResponses,
* Returns a new instance of {@code AggregatedDevOpsCommandResponse}.
*
* @param commandResponses the aggregated {@link DevOpsCommandResponse}s.
* @param responsesType the responses type of the responses to expect.
* @param httpStatus the HTTP status to send back as response status.
* @param dittoHeaders the headers of the request.
* @param responsesType the responses type of the responses to expect.
* @param httpStatus the HTTP status to send back as response status.
* @param dittoHeaders the headers of the request.
* @return the new RetrieveLoggerConfigResponse response.
* @since 2.0.0
*/
public static AggregatedDevOpsCommandResponse of(final List<CommandResponse<?>> commandResponses,
final String responsesType,
final HttpStatus httpStatus,
final DittoHeaders dittoHeaders) {
final String responsesType,
final HttpStatus httpStatus,
final DittoHeaders dittoHeaders) {

final var jsonRepresentation = buildJsonRepresentation(commandResponses, dittoHeaders);
return new AggregatedDevOpsCommandResponse(jsonRepresentation, responsesType, httpStatus, dittoHeaders);
Expand All @@ -99,30 +99,30 @@ public static AggregatedDevOpsCommandResponse of(final List<CommandResponse<?>>
* Returns a new instance of {@code AggregatedDevOpsCommandResponse}.
*
* @param aggregatedResponses the aggregated {@link DevOpsCommandResponse}s as a JsonObject.
* @param responsesType the responses type of the responses to expect.
* @param httpStatus the HTTP status to send back as response status.
* @param dittoHeaders the headers of the request.
* @param responsesType the responses type of the responses to expect.
* @param httpStatus the HTTP status to send back as response status.
* @param dittoHeaders the headers of the request.
* @return the new RetrieveLoggerConfigResponse response.
* @since 2.0.0
*/
public static AggregatedDevOpsCommandResponse of(final JsonObject aggregatedResponses,
final String responsesType,
final HttpStatus httpStatus,
final DittoHeaders dittoHeaders) {
final String responsesType,
final HttpStatus httpStatus,
final DittoHeaders dittoHeaders) {

return new AggregatedDevOpsCommandResponse(aggregatedResponses, responsesType, httpStatus, dittoHeaders);
}

/**
* Creates a response to a {@code AggregatedDevOpsCommandResponse} command from a JSON string.
*
* @param jsonString contains the data of the AggregatedDevOpsCommandResponse command.
* @param jsonString contains the data of the AggregatedDevOpsCommandResponse command.
* @param dittoHeaders the headers of the request.
* @return the AggregatedDevOpsCommandResponse command which is based on the dta of {@code jsonString}.
* @throws NullPointerException if {@code jsonString} is {@code null}.
* @throws IllegalArgumentException if {@code jsonString} is empty.
* @throws NullPointerException if {@code jsonString} is {@code null}.
* @throws IllegalArgumentException if {@code jsonString} is empty.
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonString} was not in the expected
* format.
* format.
*/
public static AggregatedDevOpsCommandResponse fromJson(final String jsonString, final DittoHeaders dittoHeaders) {
return fromJson(JsonFactory.newObject(jsonString), dittoHeaders);
Expand All @@ -131,15 +131,15 @@ public static AggregatedDevOpsCommandResponse fromJson(final String jsonString,
/**
* Creates a response to a {@code AggregatedDevOpsCommandResponse} command from a JSON object.
*
* @param jsonObject the JSON object of which the response is to be created.
* @param jsonObject the JSON object of which the response is to be created.
* @param dittoHeaders the headers of the preceding command.
* @return the response.
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected
* format.
* format.
*/
public static AggregatedDevOpsCommandResponse fromJson(final JsonObject jsonObject,
final DittoHeaders dittoHeaders) {
final DittoHeaders dittoHeaders) {

return JSON_DESERIALIZER.deserialize(jsonObject, dittoHeaders);
}
Expand Down Expand Up @@ -168,8 +168,8 @@ public JsonValue getEntity(final JsonSchemaVersion schemaVersion) {

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> thePredicate) {
final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> thePredicate) {

super.appendPayload(jsonObjectBuilder, schemaVersion, thePredicate);

Expand All @@ -179,7 +179,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
}

private static JsonObject buildJsonRepresentation(final List<CommandResponse<?>> commandResponses,
final DittoHeaders dittoHeaders) {
final DittoHeaders dittoHeaders) {

final var schemaVersion = dittoHeaders.getSchemaVersion().orElse(JsonSchemaVersion.LATEST);
final var builder = JsonObject.newBuilder();
Expand All @@ -189,8 +189,8 @@ private static JsonObject buildJsonRepresentation(final List<CommandResponse<?>>
final var key = String.format("/%s/%s", calculateServiceName(cmdR), calculateInstance(cmdR, i++));
// include both regular and special fields for devops command responses
final JsonValue responseJson;
if (cmdR instanceof ExecutePiggybackCommandResponse) {
responseJson = ((ExecutePiggybackCommandResponse) cmdR).getResponse();
if (cmdR instanceof ExecutePiggybackCommandResponse response) {
responseJson = response.getResponse();
} else {
responseJson = cmdR.toJson(schemaVersion, FieldType.regularOrSpecial());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveLoggerConfig;
import org.eclipse.ditto.base.api.devops.signals.commands.RetrieveLoggerConfigResponse;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.exceptions.AskException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand Down Expand Up @@ -121,8 +122,8 @@ private DevOpsCommandsActor(final LoggingFacade loggingFacade, final String serv
* Creates Akka configuration object Props for this Actor.
*
* @param loggingFacade a facade providing logging functionality.
* @param serviceName name of the microservice.
* @param instance instance number of the microservice instance.
* @param serviceName name of the microservice.
* @param instance instance number of the microservice instance.
* @return the Akka configuration Props object.
*/
public static Props props(final LoggingFacade loggingFacade, final String serviceName, final String instance) {
Expand Down Expand Up @@ -163,8 +164,7 @@ public Config getConfig() {
* @param command the initial DevOpsCommand to handle
*/
private void handleInitialDevOpsCommand(final DevOpsCommand<?> command) {
final var responseCorrelationActorSupplier =
getResponseCorrelationActorSupplier(command);
final var responseCorrelationActorSupplier = getResponseCorrelationActorSupplier(command);
if (isExecutePiggybackCommandToPubSubMediator(command)) {
executeAsPiggybackCommandToPubSubMediator(command, responseCorrelationActorSupplier);
} else {
Expand Down Expand Up @@ -216,7 +216,7 @@ private boolean isExecutePiggybackCommandToPubSubMediator(final DevOpsCommand<?>
}

private void executeAsPiggybackCommandToPubSubMediator(final DevOpsCommand<?> command,
final Supplier<ActorRef> responseCorrelationActor) {
final Supplier<ActorRef> responseCorrelationActor) {

tryInterpretAsDirectPublication(command, publish -> {
logger.withCorrelationId(command)
Expand All @@ -233,7 +233,7 @@ private void executeAsPiggybackCommandToPubSubMediator(final DevOpsCommand<?> co
}

private void executeAsIndirectPiggybackCommandToPubSubMediator(final DevOpsCommand<?> command,
final Supplier<ActorRef> responseCorrelationActor) {
final Supplier<ActorRef> responseCorrelationActor) {

final String topic;
final Optional<String> commandServiceNameOpt = command.getServiceName();
Expand Down Expand Up @@ -262,11 +262,10 @@ private void executeAsIndirectPiggybackCommandToPubSubMediator(final DevOpsComma
}

private void tryInterpretAsDirectPublication(final DevOpsCommand<?> command,
final Consumer<DistributedPubSubMediator.Publish> onSuccess,
final Consumer<DevOpsErrorResponse> onError) {
final Consumer<DistributedPubSubMediator.Publish> onSuccess,
final Consumer<DevOpsErrorResponse> onError) {

if (command instanceof ExecutePiggybackCommand) {
final ExecutePiggybackCommand executePiggyback = (ExecutePiggybackCommand) command;
if (command instanceof ExecutePiggybackCommand executePiggyback) {
final DittoHeaders dittoHeaders = executePiggyback.getDittoHeaders();
deserializePiggybackCommand(executePiggyback,
jsonifiable -> {
Expand Down Expand Up @@ -301,12 +300,12 @@ private static boolean isGroupTopic(final DittoHeaders dittoHeaders) {

private void handleDevOpsCommandViaPubSub(final DevOpsCommandViaPubSub devOpsCommandViaPubSub) {
final DevOpsCommand<?> wrappedCommand = devOpsCommandViaPubSub.wrappedCommand;
if (wrappedCommand instanceof ChangeLogLevel) {
handleChangeLogLevel((ChangeLogLevel) wrappedCommand);
} else if (wrappedCommand instanceof RetrieveLoggerConfig) {
handleRetrieveLoggerConfig((RetrieveLoggerConfig) wrappedCommand);
} else if (wrappedCommand instanceof ExecutePiggybackCommand) {
handleExecutePiggyBack((ExecutePiggybackCommand) wrappedCommand);
if (wrappedCommand instanceof ChangeLogLevel changeLogLevel) {
handleChangeLogLevel(changeLogLevel);
} else if (wrappedCommand instanceof RetrieveLoggerConfig retrieveLoggerConfig) {
handleRetrieveLoggerConfig(retrieveLoggerConfig);
} else if (wrappedCommand instanceof ExecutePiggybackCommand executePiggybackCommand) {
handleExecutePiggyBack(executePiggybackCommand);
}
}

Expand Down Expand Up @@ -346,35 +345,38 @@ private void handleExecutePiggyBack(final ExecutePiggybackCommand command) {
// to be streamed to sender
sender.tell(result, getSelf());
return;
} else if (result instanceof CommandResponse<?>) {
final CommandResponse<?> response = (CommandResponse<?>) result;
} else if (result instanceof CommandResponse<?> response) {
executePiggybackCommandResponse =
ExecutePiggybackCommandResponse.of(serviceName, instance, response.getHttpStatus(),
response.toJson(), response.getDittoHeaders());
} else if (result instanceof DittoRuntimeException) {
final var exception = (DittoRuntimeException) result;
} else if (result instanceof DittoRuntimeException exception) {
executePiggybackCommandResponse =
ExecutePiggybackCommandResponse.of(serviceName, instance, exception.getHttpStatus(),
exception.toJson(), exception.getDittoHeaders());
} else if (result instanceof Jsonifiable<?>) {
final Jsonifiable<?> response = (Jsonifiable<?>) result;
} else if (result instanceof Jsonifiable<?> response) {
executePiggybackCommandResponse =
ExecutePiggybackCommandResponse.of(serviceName, instance, HttpStatus.CONFLICT,
response.toJson(), DittoHeaders.empty());
} else {
final Object toReport = result != null ? result : error;
} else if (result != null) {
executePiggybackCommandResponse =
ExecutePiggybackCommandResponse.of(serviceName, instance, HttpStatus.CONFLICT,
JsonValue.of(Objects.toString(toReport)), DittoHeaders.empty());
JsonValue.of(Objects.toString(result)), DittoHeaders.empty());
} else {
executePiggybackCommandResponse = ExecutePiggybackCommandResponse.of(serviceName, instance, HttpStatus.CONFLICT,
AskException.fromMessage(error.getMessage(), DittoHeaders.empty()).toJson(), DittoHeaders.empty());
}
sender.tell(executePiggybackCommandResponse, getSelf());
});
},
dittoRuntimeException -> sender.tell(dittoRuntimeException, getSelf()));
dittoRuntimeException -> {
final ExecutePiggybackCommandResponse devOpsErrorResponse = ExecutePiggybackCommandResponse.of(serviceName, instance,
dittoRuntimeException.getHttpStatus(), dittoRuntimeException.toJson(), dittoRuntimeException.getDittoHeaders());
sender.tell(devOpsErrorResponse, getSelf());
});
}

private void deserializePiggybackCommand(final ExecutePiggybackCommand command,
final Consumer<Jsonifiable<?>> onSuccess, final Consumer<DittoRuntimeException> onError) {
final Consumer<Jsonifiable<?>> onSuccess, final Consumer<DittoRuntimeException> onError) {

final JsonObject piggybackCommandJson = command.getPiggybackCommand();
@Nullable final String piggybackCommandType = piggybackCommandJson.getValue(Command.JsonFields.TYPE)
Expand Down Expand Up @@ -424,7 +426,7 @@ private static final class PubSubSubscriberActor extends AbstractActor {

@SuppressWarnings("unused")
private PubSubSubscriberActor(final ActorRef pubSubMediator, final String serviceName, final String instance,
final String... pubSubTopicsToSubscribeTo) {
final String... pubSubTopicsToSubscribeTo) {

Arrays.stream(pubSubTopicsToSubscribeTo).forEach(topic ->
subscribeToDevOpsTopic(pubSubMediator, topic, serviceName, instance));
Expand All @@ -434,18 +436,18 @@ private PubSubSubscriberActor(final ActorRef pubSubMediator, final String servic
* @return the Akka configuration Props object.
*/
static Props props(final ActorRef pubSubMediator,
final String serviceName,
final String instance,
final String... pubSubTopicsToSubscribeTo) {
final String serviceName,
final String instance,
final String... pubSubTopicsToSubscribeTo) {

return Props.create(PubSubSubscriberActor.class, pubSubMediator, serviceName, instance,
pubSubTopicsToSubscribeTo);
}

private void subscribeToDevOpsTopic(final ActorRef pubSubMediator,
final String topic,
final String serviceName,
final String instance) {
final String topic,
final String serviceName,
final String instance) {

pubSubMediator.tell(DistPubSubAccess.subscribe(topic, getSelf()), getSelf());
pubSubMediator.tell(DistPubSubAccess.subscribe(String.join(":", topic, serviceName), getSelf()), getSelf());
Expand Down Expand Up @@ -502,8 +504,8 @@ private static final class DevOpsCommandResponseCorrelationActor extends Abstrac

@SuppressWarnings("unused")
private DevOpsCommandResponseCorrelationActor(final ActorRef devOpsCommandSender,
final DevOpsCommand<?> devOpsCommand,
final int expectedResponses) {
final DevOpsCommand<?> devOpsCommand,
final int expectedResponses) {

this.devOpsCommandSender = devOpsCommandSender;
this.devOpsCommand = devOpsCommand;
Expand All @@ -528,7 +530,7 @@ private static boolean isAggregateResults(final DittoHeaders dittoHeaders) {
* @return the Akka configuration Props object.
*/
static Props props(final ActorRef devOpsCommandSender, final DevOpsCommand<?> devOpsCommand,
final int expectedResponses) {
final int expectedResponses) {
return Props.create(DevOpsCommandResponseCorrelationActor.class, devOpsCommandSender, devOpsCommand,
expectedResponses);
}
Expand Down

0 comments on commit 8f5e9d7

Please sign in to comment.