Skip to content

Commit

Permalink
update to Scala 2.13.12 again and fix root cause, using `CompletableF…
Browse files Browse the repository at this point in the history
…uture.completedStage` instead of `CompletableFuture.completedFuture`

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Sep 19, 2023
1 parent 2468b79 commit 43fc871
Show file tree
Hide file tree
Showing 71 changed files with 551 additions and 599 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint.ExtensionId.ExtensionIdConfig;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;

import org.apache.pekko.actor.ActorSystem;

public final class SignalTransformers implements DittoExtensionPoint, SignalTransformer {

private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(SignalTransformers.class);
Expand All @@ -57,7 +56,7 @@ private SignalTransformers(final ActorSystem actorSystem, final Config config) {

@Override
public CompletionStage<Signal<?>> apply(final Signal<?> signal) {
CompletionStage<Signal<?>> prior = CompletableFuture.completedStage(signal);
CompletionStage<Signal<?>> prior = CompletableFuture.completedFuture(signal);
for (final SignalTransformer signalTransformer : transformers) {
prior = prior.thenCompose(signalTransformer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public CompletionStage<Signal<?>> apply(final Signal<?> signal) {
} else {
result = signal;
}
return CompletableFuture.completedStage(result);
return CompletableFuture.completedFuture(result);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

<properties>
<scala.version>2.13</scala.version> <!-- for scala libraries the scala version is used in their artifactId -->
<scala.full.version>2.13.10</scala.full.version>
<scala.full.version>2.13.12</scala.full.version>
<scala-parser-combinators.version>1.1.2</scala-parser-combinators.version>
<scala-java8-compat.version>1.0.2</scala-java8-compat.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.connectivity.model.ConnectionId;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;

/**
* Default implementation of {@link ConnectionConfigProvider} which simply builds and returns a
* {@link ConnectivityConfig}.
Expand All @@ -48,7 +47,7 @@ public CompletionStage<Void> registerForConnectivityConfigChanges(final Connecti
@Nullable final DittoHeaders dittoHeaders, final ActorRef subscriber) {

// nothing to do, config changes are not supported by the default implementation
return CompletableFuture.completedStage(null);
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.signaltransformer.SignalTransformer;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.CreateConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.ModifyConnection;

import com.typesafe.config.Config;

import org.apache.pekko.actor.ActorSystem;

/**
* Transforms a ModifyConnection into a CreateConnection if the connection does not exist already.
*/
Expand Down Expand Up @@ -55,7 +54,7 @@ public CompletionStage<Signal<?>> apply(final Signal<?> signal) {
}
});
} else {
return CompletableFuture.completedStage(signal);
return CompletableFuture.completedFuture(signal);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@

import javax.annotation.Nullable;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.AbstractFSMWithStash;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.actor.CoordinatedShutdown;
import org.apache.pekko.actor.FSM;
import org.apache.pekko.actor.OneForOneStrategy;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.japi.pf.FSMStateFunctionBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Sink;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.FatalPubSubException;
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
Expand Down Expand Up @@ -118,12 +138,12 @@
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.edge.service.streaming.StreamingSubscriptionManager;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
Expand All @@ -134,27 +154,6 @@

import com.typesafe.config.Config;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.AbstractFSMWithStash;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.actor.CoordinatedShutdown;
import org.apache.pekko.actor.FSM;
import org.apache.pekko.actor.OneForOneStrategy;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.japi.pf.FSMStateFunctionBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Sink;

/**
* Base class for ClientActors which implement the connection handling for various connectivity protocols.
* <p>
Expand Down Expand Up @@ -2075,7 +2074,7 @@ private CompletionStage<Void> subscribeAndDeclareAcknowledgementLabels(final boo
// only resub for connection; the initial subscription happened in preStart independent of publisher actors
final CompletionStage<Void> connectionSub = resubscribe
? connectionPubSub.subscribe(connection.getId(), getSelf(), true).thenApply(unused -> null)
: CompletableFuture.completedStage(null);
: CompletableFuture.completedFuture(null);
return subscribe.thenCompose(unused -> declare).thenCompose(unused -> connectionSub);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@

import javax.annotation.Nullable;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.QueueOfferResult;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
Expand Down Expand Up @@ -74,9 +87,9 @@
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.edge.service.placeholders.ThingJsonPlaceholder;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.pekko.controlflow.AbstractGraphActor;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
Expand All @@ -103,19 +116,6 @@
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.events.ThingEventToThingConverter;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.QueueOfferResult;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;

Expand Down Expand Up @@ -473,7 +473,7 @@ private CompletionStage<Collection<OutboundSignalWithSender>> enrichAndFilterSig
outboundSignal.getSource())
)
.map(partialThingCompletionStage -> partialThingCompletionStage.thenApply(outboundSignal::setExtra))
.orElse(CompletableFuture.completedStage(outboundSignal))
.orElse(CompletableFuture.completedFuture(outboundSignal))
.thenApply(outboundSignalWithExtra -> applyFilter(outboundSignalWithExtra, filteredTopic))
.exceptionally(error -> {
logger.withCorrelationId(outboundSignal.getSource())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.FSM;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.pf.FSMStateFunctionBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.JmsSession;
Expand Down Expand Up @@ -79,17 +89,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.FSM;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.pf.FSMStateFunctionBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.javadsl.Sink;

/**
* Actor which manages a connection to an AMQP 1.0 server using the Qpid JMS client.
* This actor delegates interaction with the JMS client to a child actor because the JMS client blocks in most cases
Expand Down Expand Up @@ -432,7 +431,7 @@ private CompletionStage<Done> startCommandConsumers(final List<ConsumerData> con
return completionStage.thenApply(object -> Done.getInstance()).exceptionally(t -> Done.getInstance());
} else {
logger.debug("Not starting consumers, no sources were configured");
return CompletableFuture.completedStage(Done.getInstance());
return CompletableFuture.completedFuture(Done.getInstance());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.config.http.HttpProxyConfig;
import org.eclipse.ditto.connectivity.model.Connection;
Expand All @@ -44,15 +53,6 @@

import com.typesafe.config.Config;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import scala.util.Try;

/**
Expand Down Expand Up @@ -129,7 +129,7 @@ protected CompletionStage<Status.Status> doTestConnection(final TestConnection t
@Override
protected CompletionStage<Void> stopConsuming() {
// nothing to do: HTTP connections do not consume.
return CompletableFuture.completedStage(null);
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@

import javax.annotation.Nullable;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.PoisonPill;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.pf.FSMStateFunctionBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.BaseClientState;
Expand Down Expand Up @@ -60,16 +70,6 @@
import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource;
import com.typesafe.config.Config;

import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.PoisonPill;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.pf.FSMStateFunctionBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import scala.concurrent.ExecutionContextExecutor;

/**
Expand Down Expand Up @@ -218,7 +218,7 @@ protected CompletionStage<Status.Status> doTestConnection(final TestConnection t
@Override
protected CompletionStage<Void> stopConsuming() {
if (genericMqttClient == null) {
return CompletableFuture.completedStage(null);
return CompletableFuture.completedFuture(null);
} else {
final var mqttTopicFilters =
getSourceAddresses().map(MqttTopicFilter::of).toArray(MqttTopicFilter[]::new);
Expand Down

0 comments on commit 43fc871

Please sign in to comment.