From 27b4032b2031feab917b58a66ae645308cacdf87 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Sun, 22 Oct 2023 19:27:32 +0200 Subject: [PATCH] Handle mixed akka/pekko protocol names --- .../pekko/remote/RemoteFeaturesSpec.scala | 3 +- remote/src/main/resources/reference.conf | 14 ++++++++ .../remote/BoundAddressesExtension.scala | 4 ++- .../pekko/remote/RemoteActorRefProvider.scala | 36 +++++++++++++------ .../apache/pekko/remote/RemoteSettings.scala | 6 ++++ .../pekko/remote/artery/ArteryTransport.scala | 6 ++-- .../transport/PekkoProtocolTransport.scala | 16 +++++---- .../remote/classic/RemoteDeathWatchSpec.scala | 3 +- 8 files changed, 64 insertions(+), 24 deletions(-) diff --git a/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala b/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala index 58e97b16811..8b7c1b9262e 100644 --- a/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala +++ b/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala @@ -260,7 +260,8 @@ abstract class RemotingFeaturesSpec(val multiNodeConfig: RemotingFeaturesConfig) remotePath, Nobody, None, - None) + None, + Set("pekko", "akka")) rar.start() rar diff --git a/remote/src/main/resources/reference.conf b/remote/src/main/resources/reference.conf index 2efc5c85e77..70fe09dda4d 100644 --- a/remote/src/main/resources/reference.conf +++ b/remote/src/main/resources/reference.conf @@ -184,6 +184,20 @@ pekko { # is 'off'. Set this to 'off' to suppress these. warn-unsafe-watch-outside-cluster = on + # When receiving requests from other remote actors, what are the valid + # prefix's to check against. Useful for when dealing with rolling cluster + # migrations with compatible systems such as Lightbend's Akka. + accept-protocol-names = ["pekko", "akka"] + + # The protocol name to use when sending requests to other remote actors. + # Useful when dealing with rolling migration, i.e. temporarily change + # the protocol name to match another compatible actor implementation + # such as Lightbend's "akka" (whilst making sure accept-protocol-names + # contains "akka") so that you can gracefully migrate all nodes to Apache + # Pekko and then change the protocol-name back to "pekko" once all + # nodes have been are running on Apache Pekko + protocol-name = "pekko" + # Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf # [Hayashibara et al]) used for remote death watch. # The default PhiAccrualFailureDetector will trigger if there are no heartbeats within diff --git a/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala b/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala index 4139ada218e..5c5d8ac0741 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala @@ -38,11 +38,13 @@ object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with class BoundAddressesExtension(val system: ExtendedActorSystem) extends Extension { + private val remoteSettings: RemoteSettings = new RemoteSettings(system.settings.config) + /** * Returns a mapping from a protocol to a set of bound addresses. */ def boundAddresses: Map[String, Set[Address]] = system.provider.asInstanceOf[RemoteActorRefProvider].transport match { - case artery: ArteryTransport => Map(ArteryTransport.ProtocolName -> Set(artery.bindAddress.address)) + case artery: ArteryTransport => Map(remoteSettings.ProtocolName -> Set(artery.bindAddress.address)) case remoting: Remoting => remoting.boundAddresses case other => throw new IllegalStateException(s"Unexpected transport type: ${other.getClass}") } diff --git a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala index e162abb1bcf..1d2a9b7c015 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala @@ -464,7 +464,8 @@ private[pekko] class RemoteActorRefProvider( val rpath = (RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements) .withUid(path.uid) - new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d)) + new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d), + remoteSettings.AcceptProtocolNames) } else { warnIfNotRemoteActorRef(path) local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async) @@ -488,7 +489,8 @@ private[pekko] class RemoteActorRefProvider( RootActorPath(address), Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.error(e, "No root guardian at [{}]", address) @@ -513,7 +515,8 @@ private[pekko] class RemoteActorRefProvider( RootActorPath(address) / elems, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -555,7 +558,8 @@ private[pekko] class RemoteActorRefProvider( rootPath, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -578,7 +582,8 @@ private[pekko] class RemoteActorRefProvider( path, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -672,7 +677,8 @@ private[pekko] class RemoteActorRef private[pekko] ( val path: ActorPath, val getParent: InternalActorRef, props: Option[Props], - deploy: Option[Deploy]) + deploy: Option[Deploy], + val acceptProtocolNames: Set[String]) extends InternalActorRef with RemoteRef { @@ -680,10 +686,17 @@ private[pekko] class RemoteActorRef private[pekko] ( throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]") remote match { - case t: ArteryTransport => - // detect mistakes such as using "pekko.tcp" with Artery - if (path.address.protocol != t.localAddress.address.protocol) - throw new IllegalArgumentException(s"Wrong protocol of [$path], expected [${t.localAddress.address.protocol}]") + case _: ArteryTransport => + // detect mistakes such as using "pekko.tcp" with Artery, also handles pekko.remote.accept-protocol-names + if (!acceptProtocolNames.contains(path.address.protocol)) { + val expectedString = if (acceptProtocolNames.size == 1) + "expected" + else + "expected one of" + + throw new IllegalArgumentException( + s"Wrong protocol of [$path], $expectedString [${acceptProtocolNames.mkString}]") + } case _ => } @volatile private[remote] var cachedAssociation: artery.Association = null @@ -697,7 +710,8 @@ private[pekko] class RemoteActorRef private[pekko] ( s.headOption match { case None => this case Some("..") => getParent.getChild(name) - case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None) + case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None, + acceptProtocolNames = acceptProtocolNames) } } diff --git a/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala b/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala index dd2fe5b8fdd..83472de3ed4 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala @@ -199,6 +199,12 @@ final class RemoteSettings(val config: Config) { @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") val Adapters: Map[String, String] = configToMap(getConfig("pekko.remote.classic.adapters")) + val ProtocolName: String = getString("pekko.remote.protocol-name") + + val AcceptProtocolNames: Set[String] = + immutableSeq(getStringList("pekko.remote.accept-protocol-names")).toSet.requiring(_.nonEmpty, + "accept-protocol-names must be non empty") + private def transportNames: immutable.Seq[String] = immutableSeq(getStringList("pekko.remote.classic.enabled-transports")) diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala index fb66fba6a47..44103d42eda 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala @@ -393,12 +393,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr val (port, boundPort) = bindInboundStreams() _localAddress = UniqueAddress( - Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port), + Address(provider.remoteSettings.ProtocolName, system.name, settings.Canonical.Hostname, port), AddressUidExtension(system).longAddressUid) _addresses = Set(_localAddress.address) _bindAddress = UniqueAddress( - Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort), + Address(provider.remoteSettings.ProtocolName, system.name, settings.Bind.Hostname, boundPort), AddressUidExtension(system).longAddressUid) flightRecorder.transportUniqueAddressSet(_localAddress) @@ -954,8 +954,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr */ private[remote] object ArteryTransport { - val ProtocolName = "pekko" - // Note that the used version of the header format for outbound messages is defined in // `ArterySettings.Version` because that may depend on configuration settings. // This is the highest supported version on receiving (decoding) side. diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala index 5a8a47e2ba9..8a33f54cc1b 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala @@ -69,11 +69,12 @@ private[remote] class PekkoProtocolSettings(config: Config) { .getMillisDuration("pekko.remote.classic.handshake-timeout") .requiring(_ > Duration.Zero, "handshake-timeout must be > 0") } + + val PekkoScheme: String = new RemoteSettings(config).ProtocolName } @nowarn("msg=deprecated") private[remote] object PekkoProtocolTransport { // Couldn't these go into the Remoting Extension/ RemoteSettings instead? - val PekkoScheme: String = "pekko" val PekkoOverhead: Int = 0 // Don't know yet val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0) @@ -122,7 +123,7 @@ private[remote] class PekkoProtocolTransport( private val codec: PekkoPduCodec) extends ActorTransportAdapter(wrappedTransport, system) { - override val addedSchemeIdentifier: String = PekkoScheme + override val addedSchemeIdentifier: String = new RemoteSettings(system.settings.config).ProtocolName override def managementCommand(cmd: Any): Future[Boolean] = wrappedTransport.managementCommand(cmd) @@ -229,8 +230,9 @@ private[remote] class PekkoProtocolHandle( _wrappedHandle: AssociationHandle, val handshakeInfo: HandshakeInfo, private val stateActor: ActorRef, - private val codec: PekkoPduCodec) - extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, PekkoScheme) { + private val codec: PekkoPduCodec, + override val addedSchemeIdentifier: String) + extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, addedSchemeIdentifier) { override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload)) @@ -713,7 +715,8 @@ private[remote] class ProtocolStateActor( wrappedHandle, handshakeInfo, self, - codec)) + codec, + settings.PekkoScheme)) readHandlerPromise.future } @@ -733,7 +736,8 @@ private[remote] class ProtocolStateActor( wrappedHandle, handshakeInfo, self, - codec))) + codec, + settings.PekkoScheme))) readHandlerPromise.future } diff --git a/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala index c710966b905..c448719f3e6 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala @@ -134,7 +134,8 @@ pekko.actor.warn-about-java-serializer-usage = off extinctPath, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = Set("pekko", "akka")) val probe = TestProbe() probe.watch(extinctRef)