Skip to content

Commit

Permalink
removed unused method in SignalTypeFormatException;
Browse files Browse the repository at this point in the history
small code format changes;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Nov 18, 2021
1 parent db9e5e1 commit 9c07dd0
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ private Sink<Object, NotUsed> dispatchMapped() {

private void dispatchMapped(final InboundMappingOutcomes outcomes) {
final var sender = outcomes.getSender();
final var dispatchResponsesAndSearchCommands = dispatchResponsesAndSearchCommands(sender, outcomes);
final var dispatchResponsesAndSearchCommands =
dispatchResponsesAndSearchCommands(sender, outcomes);
final var actualOutcomes = outcomes.getOutcomes();
final var ackRequestingSignalCount = actualOutcomes.stream()
.map(this::eval)
Expand Down Expand Up @@ -433,6 +434,7 @@ private int dispatchIncomingSignal(final IncomingSignal incomingSignal) {
} catch (final DittoRuntimeException e) {
handleErrorDuringStartingOfAckregator(e, signal.getDittoHeaders(), sender);
}

return 1;
} else {
if (sender != null && isLive(signal)) {
Expand All @@ -454,13 +456,15 @@ private int dispatchIncomingSignal(final IncomingSignal incomingSignal) {
ConnectivityInternalErrorException.newBuilder()
.cause(new ClassCastException(errorMessage))
.build();

return ConnectivityErrorResponse.of(dre, originalHeaders);
}
})
.thenAccept(response -> sender.tell(response, ActorRef.noSender()));
} else {
proxyActor.tell(signal, sender);
}

return 0;
}
}
Expand Down Expand Up @@ -491,6 +495,7 @@ private void startAckregatorAndForwardSignal(final EntityId entityId,
},
ackregator -> {
proxyActor.tell(signal, ackregator);

return null;
});
}
Expand Down Expand Up @@ -532,11 +537,13 @@ private void handleErrorDuringStartingOfAckregator(final DittoRuntimeException e
private <T> Stream<T> forwardToClientActor(final Signal<?> signal, @Nullable final ActorRef sender) {
// wrap response or search command for dispatching by entity ID
clientActor.tell(InboundSignal.of(signal), sender);

return Stream.empty();
}

private <T> Stream<T> forwardToConnectionActor(final CreateSubscription command, @Nullable final ActorRef sender) {
connectionActor.tell(command, sender);

return Stream.empty();
}

Expand All @@ -554,6 +561,7 @@ private <T> Stream<T> forwardAcknowledgement(final Acknowledgement ack,
outcomes.getExternalMessage());
result = Stream.empty();
}

return result;
}

Expand All @@ -576,6 +584,7 @@ private <T> Stream<T> forwardAcknowledgements(final Acknowledgements acks,
} else {
result = forwardToClientActor(acks, outboundMessageMappingProcessorActor);
}

return result;
}

Expand All @@ -596,6 +605,7 @@ private static TopicPathBuilder newTopicPathBuilder(final WithEntityId withEntit
} else {
builder.twin();
}

return builder;
}

Expand All @@ -615,6 +625,7 @@ private ActorRef getReturnAddress(final ActorRef sender, final boolean isAcksReq
result = ActorRef.noSender();
}
}

return result;
}

Expand Down Expand Up @@ -645,6 +656,7 @@ private Signal<?> appendConnectionAcknowledgementsToSignal(final ExternalMessage
.toBuilder()
.acknowledgementRequests(combinedRequestedAcks)
.build());

return RequestedAcksFilter.filterAcknowledgements(signalWithCombinedAckRequests,
message,
filter,
Expand Down Expand Up @@ -734,6 +746,7 @@ private DittoHeaders applyInboundHeaderMapping(final Signal<?> signal,
if (appendRandomCorrelationId && extraInternalHeaders.getCorrelationId().isEmpty()) {
builder.randomCorrelationId();
}

return builder;
}

Expand All @@ -749,13 +762,15 @@ private <T extends CommandResponse<T>> T appendConnectionIdToAcknowledgementOrRe
.toBuilder()
.putHeader(DittoHeaderDefinition.CONNECTION_ID.getKey(), connection.getId().toString())
.build();

return commandResponse.setDittoHeaders(newHeaders);
}

private Acknowledgements appendConnectionIdToAcknowledgements(final Acknowledgements acknowledgements) {
final List<Acknowledgement> acksList = acknowledgements.stream()
.map(this::appendConnectionIdToAcknowledgementOrResponse)
.collect(Collectors.toList());

// Uses EntityId and StatusCode from input acknowledges expecting these were set when Acknowledgements was created
return Acknowledgements.of(acknowledgements.getEntityId(), acksList, acknowledgements.getHttpStatus(),
acknowledgements.getDittoHeaders());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ private static AddConnectionLogEntry getAddConnectionLogEntry(final ConnectionId
final var logEntry = LogEntryFactory.getLogEntryForFailedCommandResponseRoundTrip(failure.getCommand(),
failure.getCommandResponse(),
failure.getDetailMessage());

return AddConnectionLogEntry.newInstance(connectionId, logEntry);
}

Expand Down Expand Up @@ -235,6 +236,7 @@ private void handleCommand(final Command<?> command) {
private void setDefaultTimeoutExceptionSupplier(final WithDittoHeaders command) {
timeoutExceptionSupplier = () -> {
final var actorContext = getContext();

return GatewayCommandTimeoutException.newBuilder(actorContext.getReceiveTimeout())
.dittoHeaders(command.getDittoHeaders())
.build();
Expand All @@ -243,12 +245,14 @@ private void setDefaultTimeoutExceptionSupplier(final WithDittoHeaders command)

private Void onAggregatedResponseOrError(final Object responseOrError) {
getSelf().tell(responseOrError, ActorRef.noSender());

return null;
}

private Void handleCommandWithAckregator(final Signal<?> command, final ActorRef aggregator) {
logger.debug("Got <{}>. Telling the target actor about it.", command);
proxyActor.tell(command, aggregator);

return null;
}

Expand All @@ -259,6 +263,7 @@ private Void handleCommandWithoutAckregator(final Signal<?> command) {
} else {
handleCommandAndAcceptImmediately(command);
}

return null;
}

Expand Down Expand Up @@ -510,6 +515,7 @@ private static HttpResponse buildResponseWithoutHeadersFromDittoRuntimeException
result = responseWithoutHeaders.withEntity(CONTENT_TYPE_JSON,
ByteString.fromString(exception.toJsonString()));
}

return result;
}

Expand Down Expand Up @@ -593,7 +599,9 @@ private HttpResponse createCommandResponse(final DittoHeaders dittoHeaders, fina
response -> enhanceResponseWithExternalDittoHeaders(response, dittoHeaders);
final UnaryOperator<HttpResponse> modifyResponseOperator = this::modifyResponse;
final var addHeaders = addExternalDittoHeaders.andThen(modifyResponseOperator);
final var addBodyIfEntityExists = createBodyAddingResponseMapper(dittoHeaders, withOptionalEntity);
final var addBodyIfEntityExists =
createBodyAddingResponseMapper(dittoHeaders, withOptionalEntity);

return addBodyIfEntityExists.apply(addHeaders.apply(createHttpResponse(httpStatus)));
}

Expand All @@ -606,6 +614,7 @@ private static UnaryOperator<HttpResponse> createBodyAddingResponseMapper(final
}
final var schemaVersion = dittoHeaders.getSchemaVersion()
.orElse(dittoHeaders.getImplementedSchemaVersion());

return withOptionalEntity.getEntity(schemaVersion)
.map(entity -> addEntityAccordingToContentType(response, entity.toString(),
getContentType(dittoHeaders)))
Expand All @@ -632,8 +641,8 @@ protected HttpResponse modifyResponse(final HttpResponse response) {
protected Uri getUriForLocationHeader(final HttpRequest request,
final EntityId entityId,
final JsonPointer resourcePath) {

final var supplier = new UriForLocationHeaderSupplier(request, entityId, resourcePath);

return supplier.get();
}

Expand All @@ -660,6 +669,7 @@ private Acknowledgements mapAcknowledgementsForHttp(final Acknowledgements acks)
acks.getDittoHeaders()
);
}

return result;
}

Expand All @@ -671,6 +681,7 @@ private boolean isResponseRequired() {
} else {
result = true;
}

return result;
}

Expand All @@ -685,11 +696,13 @@ private Acknowledgement setResponseLocationForAcknowledgement(final Acknowledgem
.build());
}
}

return acknowledgement;
}

private static boolean shallAcceptImmediately(final WithDittoHeaders withDittoHeaders) {
final DittoHeaders dittoHeaders = withDittoHeaders.getDittoHeaders();

return !dittoHeaders.isResponseRequired() && dittoHeaders.getAcknowledgementRequests().isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ private Function<Acknowledgements, ThingErrorResponse> getDefaultGetAsTimeoutErr
final var gatewayCommandTimeoutException = GatewayCommandTimeoutException.newBuilder(timeout)
.dittoHeaders(aggregatedAcknowledgements.getDittoHeaders())
.build();

return ThingErrorResponse.of(thingId, gatewayCommandTimeoutException);
};
}
Expand Down Expand Up @@ -218,12 +219,12 @@ private MatchingValidationResult validateResponse(final CommandResponse<?> comma
// Non-live responses are supposed to be valid as they are generated by Ditto itself.
result = MatchingValidationResult.success();
}

return result;
}

private MatchingValidationResult tryToValidateLiveResponse(final Command<?> command,
final CommandResponse<?> commandResponse) {

try {
return validateLiveResponse(command, commandResponse);
} catch (final ConnectionIdInvalidException e) {
Expand All @@ -238,14 +239,16 @@ private MatchingValidationResult tryToValidateLiveResponse(final Command<?> comm
final var dittoHeadersWithoutConnectionId = DittoHeaders.newBuilder(commandResponse.getDittoHeaders())
.removeHeader(DittoHeaderDefinition.CONNECTION_ID.getKey())
.build();

return validateLiveResponse(command, commandResponse.setDittoHeaders(dittoHeadersWithoutConnectionId));
}
}

private static MatchingValidationResult validateLiveResponse(final Command<?> command,
final CommandResponse<?> commandResponse) {
final var responseMatchingValidator =
CommandAndCommandResponseMatchingValidator.getInstance();

final var responseMatchingValidator = CommandAndCommandResponseMatchingValidator.getInstance();
return responseMatchingValidator.apply(command, commandResponse);
}

Expand All @@ -267,6 +270,7 @@ private static Optional<JsonValue> getPayload(final ThingCommandResponse<?> thin
} else {
result = Optional.empty();
}

return result;
}

Expand Down Expand Up @@ -321,6 +325,7 @@ private static Acknowledgement getAcknowledgement(final MessageCommandResponse<?
final var liveResponseAckHeaders = responseDittoHeaders.toBuilder()
.putHeaders(message.getHeaders())
.build();

return ThingAcknowledgementFactory.newAcknowledgement(LIVE_RESPONSE,
messageCommandResponse.getEntityId(),
messageCommandResponse.getHttpStatus(),
Expand All @@ -333,6 +338,7 @@ private static Optional<JsonValue> getPayload(final MessageCommandResponse<?, ?>
final var jsonMessagePayloadPointer = MessageCommandResponse.JsonFields.JSON_MESSAGE_PAYLOAD.getPointer();
final var messagePayloadPointer = jsonMessagePointer.append(jsonMessagePayloadPointer);
final var messageCommandResponseJsonObject = messageCommandResponse.toJson();

return messageCommandResponseJsonObject.getValue(messagePayloadPointer);
}

Expand Down Expand Up @@ -402,6 +408,7 @@ public static DittoHeadersSettable<?> restoreCommandConnectivityHeaders(final Di
}
requestCommandHeaders.getInboundPayloadMapper().ifPresent(enhancedHeadersBuilder::inboundPayloadMapper);
requestCommandHeaders.getReplyTarget().ifPresent(enhancedHeadersBuilder::replyTarget);

return signal.setDittoHeaders(enhancedHeadersBuilder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,15 @@ private ActorRef startAckAggregatorActor(final EntityId entityId,
headerTranslator,
responseSignalConsumer,
matchingValidationFailureConsumer);

return actorRefFactory.actorOf(props, getNextActorName(signal.getDittoHeaders()));
}

private String getNextActorName(final DittoHeaders dittoHeaders) {
final var correlationId = dittoHeaders.getCorrelationId()
.map(cid -> URLEncoder.encode(cid, StandardCharsets.UTF_8))
.orElse("_");

return String.format("ackr%x-%s", childCounter++, correlationId);
}

Expand All @@ -223,6 +225,7 @@ private static PartialFunction<Signal<?>, Signal<?>> buildAckRequestSetter(
ackRequestSetter::isApplicable,
s -> (Signal<?>) ackRequestSetter.apply(s));
}

return pfBuilder.matchAny(x -> x).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,6 @@ public static boolean isMessageCommand(@Nullable final Signal<?> signal) {
return hasTypePrefix(signal, MessageCommand.TYPE_PREFIX);
}

private static boolean hasTypePrefix(@Nullable final WithType signal, final String typePrefix) {
final boolean result;
if (null != signal) {
final var signalType = signal.getType();
result = signalType.startsWith(typePrefix);
} else {
result = false;
}
return result;
}

/**
* Indicates whether the specified signal argument is a {@link ThingCommand}.
*
Expand Down Expand Up @@ -151,6 +140,7 @@ public static boolean isChannelLive(@Nullable final WithDittoHeaders signal) {
} else {
result = false;
}

return result;
}

Expand All @@ -168,6 +158,7 @@ public static boolean isWithEntityId(@Nullable final Signal<?> signal) {
} else {
result = false;
}

return result;
}

Expand All @@ -185,6 +176,7 @@ public static Optional<EntityId> getEntityId(@Nullable final Signal<?> signal) {
} else {
result = Optional.empty();
}

return result;
}

Expand All @@ -202,6 +194,19 @@ public static Optional<String> getCorrelationId(@Nullable final Signal<?> signal
} else {
result = Optional.empty();
}

return result;
}

private static boolean hasTypePrefix(@Nullable final WithType signal, final String typePrefix) {
final boolean result;
if (null != signal) {
final var signalType = signal.getType();
result = signalType.startsWith(typePrefix);
} else {
result = false;
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public MatchingValidationResult apply(final Command<?> sentCommand, final Comman
if (result.isSuccess()) {
result = validateEntityIdsMatch(sentCommand, commandResponse);
}

return result;
}

Expand Down Expand Up @@ -181,7 +182,6 @@ private static boolean isErrorResponseType(final CommandResponse<?> commandRespo

private static boolean isSameSignalDomain(final SemanticSignalType semanticCommandType,
final SemanticSignalType semanticCommandResponseType) {

return Objects.equals(semanticCommandResponseType.getSignalDomain(), semanticCommandType.getSignalDomain());
}

Expand Down

0 comments on commit 9c07dd0

Please sign in to comment.