Skip to content

Commit

Permalink
add JmsOperationTimedOutException to user-indicated-errors-base;
Browse files Browse the repository at this point in the history
add catch block for JmsOperationTimedOutException in AmqpClientActor and log a warning in this special case;
fix sonar issues;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed May 5, 2022
1 parent d7a1672 commit 5396893
Show file tree
Hide file tree
Showing 94 changed files with 239 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.atteo.classindex.ClassIndex;
Expand Down Expand Up @@ -106,7 +105,7 @@ private static Class<? extends ConnectionConfigProvider> findProviderClass(
final List<Class<? extends ConnectionConfigProvider>> candidates =
StreamSupport.stream(subclasses.spliterator(), false)
.filter(classPredicate)
.collect(Collectors.toList());
.toList();

if (candidates.size() == 1) {
return candidates.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ private static List<MessageMapperExtension> loadMessageMapperExtensions(final Dy
final ClassTag<MessageMapperExtension> tag =
scala.reflect.ClassTag$.MODULE$.apply(MessageMapperExtension.class);
return dynamicAccess.createInstanceFor(clazz, List$.MODULE$.empty(), tag).get();
}).collect(Collectors.toList());
}).toList();
}

private static List<Class<? extends MessageMapperExtension>> loadMessageMapperExtensionClasses() {
return StreamSupport.stream(ClassIndex.getSubclasses(MessageMapperExtension.class).spliterator(), false)
.collect(Collectors.toList());
.toList();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -67,7 +66,7 @@ public List<MessageMapper> getMappers(final PayloadMapping payloadMapping) {
return payloadMapping.getMappings().stream()
.map(this::resolveMessageMapper)
.map(resolvedMapper -> null == resolvedMapper ? defaultMapper : resolvedMapper)
.collect(Collectors.toList());
.toList();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
Expand Down Expand Up @@ -119,7 +118,7 @@ public List<ExternalMessage> map(final Adaptable adaptable) {

return externalMessages.stream()
.map(markAsResponse)
.collect(Collectors.toList());
.toList();
}

private <T> List<T> checkMaxMappedMessagesLimit(final List<T> mappingResult, final int maxMappedMessages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
final List<ActorRef> childrenToAsk = StreamSupport.stream(getContext().getChildren().spliterator(), false)
.filter(child -> noAddressReportingChildNamePatterns.stream()
.noneMatch(p -> p.matcher(child.path().name()).matches()))
.collect(Collectors.toList());
.toList();
final ConnectivityStatus clientConnectionStatus = data.getConnectionStatus();
if (childrenToAsk.size() != expectedNumberOfChildren) {
if (clientConnectionStatus.isFailure() || clientConnectionStatus == ConnectivityStatus.UNKNOWN) {
Expand Down Expand Up @@ -1730,7 +1730,7 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final ProtocolAdapter proto
proxyActorSelection, protocolAdapter, logger);
outboundMappingProcessors = IntStream.range(0, processorPoolSize)
.mapToObj(i -> OutboundMappingProcessor.of(settings))
.collect(Collectors.toList());
.toList();
} catch (final DittoRuntimeException dre) {
connectionLogger.failure("Failed to start message mapping processor due to: {0}", dre.getMessage());
logger.info("Got DittoRuntimeException during initialization of MessageMappingProcessor: {} {} - desc: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
final Duration askTimeout = acknowledgementConfig.getCollectorFallbackAskTimeout();
// Ask response collector actor to get the collected responses in a future
Patterns.ask(responseCollector, ResponseCollectorActor.query(), askTimeout).thenCompose(output -> {
if (output instanceof ResponseCollectorActor.Output) {
return CompletableFuture.completedFuture((ResponseCollectorActor.Output) output);
} else if (output instanceof Throwable) {
if (output instanceof ResponseCollectorActor.Output responseCollectorActorOutput) {
return CompletableFuture.completedFuture(responseCollectorActorOutput);
} else if (output instanceof Throwable throwable) {
log().debug("Patterns.ask failed. ResponseCollector=<{}>", responseCollector);
return CompletableFuture.failedFuture((Throwable) output);
return CompletableFuture.failedFuture(throwable);
} else {
log().error("Expect ResponseCollectorActor.Output, got: <{}>. ResponseCollector=<{}>", output,
responseCollector);
Expand Down Expand Up @@ -288,8 +288,8 @@ private static boolean someFailedResponseRequiresRedelivery(final Collection<Com
}

private static Stream<? extends CommandResponse<?>> extractAggregatedResponses(final CommandResponse<?> response) {
if (response instanceof Acknowledgements) {
return ((Acknowledgements) response).stream();
if (response instanceof Acknowledgements acknowledgements) {
return acknowledgements.stream();
} else {
return Stream.of(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected BasePublisherActor(final Connection connection,
responseDroppedMonitor = connectionMonitorRegistry.forResponseDropped(connection);
responsePublishedMonitor = connectionMonitorRegistry.forResponsePublished(connection);
responseAcknowledgedMonitor = connectionMonitorRegistry.forResponseAcknowledged(connection);
replyTargets = connection.getSources().stream().map(Source::getReplyTarget).collect(Collectors.toList());
replyTargets = connection.getSources().stream().map(Source::getReplyTarget).toList();
acknowledgementSizeBudget = connectionConfig.getAcknowledgementConfig().getIssuedMaxBytes();
logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this)
.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId());
Expand Down Expand Up @@ -189,8 +189,8 @@ private void sendMultiMappedOutboundSignal(final OutboundSignal.MultiMapped mult
final Collection<Acknowledgement> acknowledgements = new ArrayList<>();
final Collection<CommandResponse<?>> nonAcknowledgements = new ArrayList<>();
responsesList.forEach(response -> {
if (response instanceof Acknowledgement) {
acknowledgements.add((Acknowledgement) response);
if (response instanceof Acknowledgement acknowledgement) {
acknowledgements.add(acknowledgement);
} else {
nonAcknowledgements.add(response);
}
Expand Down Expand Up @@ -258,7 +258,7 @@ private static <T> CompletionStage<List<T>> aggregateNonNullFutures(final Comple
.thenApply(aVoid -> Arrays.stream(futures)
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
.toList());
}

private Acknowledgements appendConnectionId(final Acknowledgements acknowledgements) {
Expand All @@ -269,7 +269,7 @@ private static Acknowledgements appendConnectionIdToAcknowledgements(final Ackno
final ConnectionId connectionId) {
final List<Acknowledgement> acksList = acknowledgements.stream()
.map(ack -> appendConnectionIdToAcknowledgementOrResponse(ack, connectionId))
.collect(Collectors.toList());
.toList();
// Uses EntityId and StatusCode from input acknowledges expecting these were set when Acknowledgements was created
return Acknowledgements.of(acknowledgements.getEntityId(), acksList, acknowledgements.getHttpStatus(),
acknowledgements.getDittoHeaders());
Expand Down Expand Up @@ -323,7 +323,7 @@ private Stream<SendingOrDropped> sendMappedOutboundSignal(final OutboundSignal.M
outboundTargets);
return getSendingContextForTarget(outbound, target);
})
.collect(Collectors.toList()));
.toList());


if (sendingContexts.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.concurrent.NotThreadSafe;

Expand Down Expand Up @@ -89,7 +88,7 @@ public void clear() {
public List<ActorRef> getOtherActors(final ActorRef clientActor) {
return sortedRefs.stream()
.filter(actorRef -> !actorRef.equals(clientActor))
.collect(Collectors.toList());
.toList();
}

/**
Expand Down Expand Up @@ -130,7 +129,7 @@ public List<ActorRef> getSortedRefs() {
}

private static List<ActorRef> sort(final Map<ActorPath, ActorRef> refsByPath) {
return refsByPath.values().stream().sorted(ActorRef::compareTo).collect(Collectors.toList());
return refsByPath.values().stream().sorted(ActorRef::compareTo).toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
import java.text.MessageFormat;
import java.util.function.Predicate;

import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;

/**
* Customizable converter from publisher exceptions to {@link Acknowledgement}s.
Expand Down Expand Up @@ -68,8 +68,8 @@ public final Acknowledgement convertException(final Exception exception,
checkNotNull(dittoHeaders, "dittoHeaders");

final Acknowledgement result;
if (exception instanceof DittoRuntimeException) {
result = convertDittoRuntimeException((DittoRuntimeException) exception, label, entityId, dittoHeaders);
if (exception instanceof DittoRuntimeException dittoRuntimeException) {
result = convertDittoRuntimeException(dittoRuntimeException, label, entityId, dittoHeaders);
} else {
result = convertGenericException(exception, label, entityId, dittoHeaders);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ private Sink<Object, NotUsed> getSink() {
}

private static boolean isInboundMappingOutcomesWithError(final Object streamingElement) {
return streamingElement instanceof InboundMappingOutcomes &&
((InboundMappingOutcomes) streamingElement).hasError();
return streamingElement instanceof InboundMappingOutcomes inboundMappingOutcomes &&
inboundMappingOutcomes.hasError();
}

private Sink<Object, NotUsed> onDittoRuntimeException() {
Expand Down Expand Up @@ -337,8 +337,7 @@ public Optional<Signal<?>> onError(final String mapperId,
final Optional<Signal<?>> result;
logger.debug("OnError mapperId=<{}> exception=<{}> topicPath=<{}> message=<{}>",
mapperId, e, topicPath, message);
if (e instanceof DittoRuntimeException) {
final var dittoRuntimeException = (DittoRuntimeException) e;
if (e instanceof DittoRuntimeException dittoRuntimeException) {
if (isIllegalAdaptableException(dittoRuntimeException)) {
final var illegalAdaptableException = (IllegalAdaptableException) dittoRuntimeException;
if (isAboutInvalidLiveResponse(illegalAdaptableException)) {
Expand Down Expand Up @@ -871,7 +870,7 @@ private <T extends CommandResponse<T>> T appendConnectionIdToAcknowledgementOrRe
private Acknowledgements appendConnectionIdToAcknowledgements(final Acknowledgements acknowledgements) {
final List<Acknowledgement> acksList = acknowledgements.stream()
.map(this::appendConnectionIdToAcknowledgementOrResponse)
.collect(Collectors.toList());
.toList();

// Uses EntityId and StatusCode from input acknowledges expecting these were set when Acknowledgements was created
return Acknowledgements.of(acknowledgements.getEntityId(), acksList, acknowledgements.getHttpStatus(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ public int hashCode() {

@Override
public boolean equals(final Object o) {
if (o instanceof InboundMappingOutcomes) {
final InboundMappingOutcomes that = (InboundMappingOutcomes) o;
if (o instanceof InboundMappingOutcomes that) {
return Objects.equals(outcomes, that.outcomes) &&
Objects.equals(externalMessage, that.externalMessage) &&
Objects.equals(error, that.error) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -138,7 +137,7 @@ List<MappingOutcome<MappedInboundExternalMessage>> process(final ExternalMessage
(msg, header) -> msg.withHeader(header.getKey(), header.getValue()));
return mappingTimer.overall(() -> mappers.stream()
.flatMap(mapper -> runMapper(mapper, externalMessageWithTraceContext, mappingTimer))
.collect(Collectors.toList())
.toList()
);
}

Expand Down Expand Up @@ -227,8 +226,8 @@ private static DittoRuntimeException toDittoRuntimeException(final Throwable err
e)
);

if (error instanceof WithDittoHeaders) {
final DittoHeaders existingHeaders = ((WithDittoHeaders) error).getDittoHeaders();
if (error instanceof WithDittoHeaders withDittoHeaders) {
final DittoHeaders existingHeaders = withDittoHeaders.getDittoHeaders();
final DittoHeaders mergedHeaders = bestEffortHeaders.toBuilder().putHeaders(existingHeaders)
.build();
return dittoRuntimeException.setDittoHeaders(mergedHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
Expand Down Expand Up @@ -111,7 +110,7 @@ List<ExternalMessage> outboundPayload(final String mapper, final Supplier<List<E
return timed(startedTimer, () -> {
final List<ExternalMessage> externalMessages = supplier.get();
return externalMessages.stream().map(em -> DittoTracing.propagateContext(context, em,
(msg, entry) -> msg.withHeader(entry.getKey(), entry.getValue()))).collect(Collectors.toList());
(msg, entry) -> msg.withHeader(entry.getKey(), entry.getValue()))).toList();
});
}

Expand All @@ -130,7 +129,7 @@ List<Adaptable> inboundPayload(final String mapper, final Supplier<List<Adaptabl
final List<Adaptable> adaptables = supplier.get();
return adaptables.stream()
.map(a -> DittoTracing.propagateContext(context, a))
.collect(Collectors.toList());
.toList();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotDeclaredException;
Expand Down Expand Up @@ -164,7 +163,7 @@ private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final
return signal.setDittoHeaders(signal.getDittoHeaders().toBuilder()
.acknowledgementRequests(ackRequests.stream()
.filter(request -> targetIssuedAcks.contains(request.getLabel()))
.collect(Collectors.toList()))
.toList())
.build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ private static Optional<JsonFieldSelector> getExtraFields(final ExpressionResolv
.map(expressionResolver::resolve)
.flatMap(PipelineElement::toStream)
.map(JsonPointer::of)
.collect(Collectors.toList()))
.toList())
.filter(jsonPointers -> !jsonPointers.isEmpty())
.map(JsonFactory::newFieldSelector)
.map(ThingFieldSelector::fromJsonFieldSelector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -69,7 +68,7 @@ private Resolvers() {
private static final List<Placeholder<?>> PLACEHOLDERS = Collections.unmodifiableList(
RESOLVER_CREATORS.stream()
.map(ResolverCreator::getPlaceholder)
.collect(Collectors.toList())
.toList()
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -160,7 +159,7 @@ public boolean allExpectedResponsesArrived() {
public List<CommandResponse<?>> getFailedResponses() {
return commandResponses.stream()
.filter(Output::isFailedResponse)
.collect(Collectors.toList());
.toList();
}

@Override
Expand Down Expand Up @@ -219,7 +218,7 @@ public int hashCode() {

@Override
public boolean equals(final Object other) {
return other instanceof SetCount && ((SetCount) other).count == count;
return other instanceof SetCount setCount && setCount.count == count;
}
}
}
Loading

0 comments on commit 5396893

Please sign in to comment.