Skip to content

Commit

Permalink
fix: Handle uid collision in a more graceful way (#32161)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Oct 10, 2023
1 parent 7cb2ff7 commit 85236f8
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 15 deletions.
10 changes: 8 additions & 2 deletions akka-actor/src/main/scala/akka/actor/ActorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,6 @@ private[akka] class ActorSystemImpl(
setup: ActorSystemSetup)
extends ExtendedActorSystem {

val uid: Long = ThreadLocalRandom.current.nextLong()

if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-_]*$"""))
throw new IllegalArgumentException(
"invalid ActorSystem name [" + name +
Expand All @@ -834,6 +832,14 @@ private[akka] class ActorSystemImpl(
new Settings(classLoader, config, name, setup)
}

val uid: Long = {
// to be able to test uid collisions
if (settings.config.hasPath("akka.test-only-uid"))
settings.config.getLong("akka.test-only-uid")
else
ThreadLocalRandom.current.nextLong()
}

protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteTransport
import akka.remote.UniqueAddress
import akka.remote.artery.Association.UidCollisionException
import akka.remote.artery.Decoder.InboundCompressionAccess
import akka.remote.artery.Encoder.OutboundCompressionAccess
import akka.remote.artery.InboundControlJunction.ControlMessageObserver
Expand Down Expand Up @@ -729,6 +730,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
associationRegistry.setUID(peer).completeHandshake(peer)
} catch {
case ShuttingDown => Future.successful(Done) // silence it
case exc: UidCollisionException =>
log.warning(exc.getMessage)
// quarantine will not do much since handshake not completed, but for good measures
quarantine(peer.address, Some(peer.uid), exc.getMessage, harmless = false)
Future.failed(exc)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import akka.remote.RemoteLogMarker
import akka.remote.UniqueAddress
import akka.remote.artery.ArteryTransport.AeronTerminated
import akka.remote.artery.ArteryTransport.ShuttingDown
import akka.remote.artery.Association.UidCollisionException
import akka.remote.artery.Encoder.OutboundCompressionAccess
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
Expand Down Expand Up @@ -122,6 +123,8 @@ private[remote] object Association {
streamKillSwitch: OptionVal[SharedKillSwitch],
completed: Future[Done],
stopping: OptionVal[StopSignal])

final class UidCollisionException(message: String) extends IllegalArgumentException(message) with NoStackTrace
}

/**
Expand Down Expand Up @@ -1160,7 +1163,7 @@ private[remote] class AssociationRegistry(createAssociation: Address => Associat
a
else
// make sure we don't overwrite same UID with different association
throw new IllegalArgumentException(s"UID collision old [$previous] new [$a]")
throw new UidCollisionException(s"UID collision old [$previous] new [$a]")
case _ =>
// update associationsByUid Map with the uid -> association
val newMap = currentMap.updated(peer.uid, a)
Expand Down
16 changes: 9 additions & 7 deletions akka-remote/src/main/scala/akka/remote/artery/Handshake.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private[remote] class InboundHandshake(inboundContext: InboundContext, inControl
// that the other system is alive.
inboundContext.association(from.address).associationState.lastUsedTimestamp.set(System.nanoTime())

after(inboundContext.completeHandshake(from)) { () =>
after(inboundContext.completeHandshake(from)) { _ =>
pull(in)
}
case _ =>
Expand All @@ -270,8 +270,9 @@ private[remote] class InboundHandshake(inboundContext: InboundContext, inControl

private def onHandshakeReq(from: UniqueAddress, to: Address): Unit = {
if (to == inboundContext.localAddress.address) {
after(inboundContext.completeHandshake(from)) { () =>
inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress))
after(inboundContext.completeHandshake(from)) { success =>
if (success)
inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress))
pull(in)
}
} else {
Expand All @@ -289,15 +290,16 @@ private[remote] class InboundHandshake(inboundContext: InboundContext, inControl
}
}

private def after(first: Future[Done])(thenInside: () => Unit): Unit = {
private def after(first: Future[Done])(thenInside: Boolean => Unit): Unit = {
first.value match {
case Some(_) =>
case Some(result) =>
// This in the normal case (all but the first). The future will be completed
// because handshake was already completed. Note that we send those HandshakeReq
// periodically.
thenInside()
thenInside(result.isSuccess)
case None =>
first.onComplete(_ => runInStage.invoke(thenInside))(ExecutionContexts.parasitic)
first.onComplete(result => runInStage.invoke(() => thenInside(result.isSuccess)))(
ExecutionContexts.parasitic)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ package akka.remote.artery
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.remote.RARP
import akka.remote.artery.Association.UidCollisionException
import akka.remote.artery.AssociationState.UidKnown
import akka.remote.artery.AssociationState.UidUnknown
import akka.testkit.{ EventFilter, ImplicitSender, TestActors, TestEvent, TestProbe }

class RemoteConnectionSpec extends ArteryMultiNodeSpec with ImplicitSender {
Expand All @@ -21,14 +26,181 @@ class RemoteConnectionSpec extends ArteryMultiNodeSpec with ImplicitSender {

"Remoting between systems" should {

"handle uid collision when connection TO two systems with same uid" in {
val localProbe = new TestProbe(localSystem)
val localPort = RARP(localSystem).provider.getDefaultAddress.getPort().get

val conflictUid = localSystem.asInstanceOf[ExtendedActorSystem].uid + 1
val echoName = "echoA"

val remotePort1 = freePort()
val remoteSystem1 =
newRemoteSystem(extraConfig = Some(s"""
akka.test-only-uid = $conflictUid
akka.remote.artery.canonical.port=$remotePort1
"""))
val remote1Probe = new TestProbe(remoteSystem1)

val remotePort2 = freePort()
val remoteSystem2 =
newRemoteSystem(extraConfig = Some(s"""
// same uid as remoteSystem1
akka.test-only-uid = $conflictUid
akka.remote.artery.canonical.port=$remotePort2
"""))
val remote2Probe = new TestProbe(remoteSystem2)

localProbe.expectNoMessage(2.seconds)
localSystem.actorOf(TestActors.echoActorProps, echoName)
remoteSystem1.actorOf(TestActors.echoActorProps, echoName)
remoteSystem2.actorOf(TestActors.echoActorProps, echoName)

val selectionFromLocalToRemote1 =
localSystem.actorSelection(s"akka://${remoteSystem1.name}@localhost:$remotePort1/user/$echoName")
val selectionFromLocalToRemote2 =
localSystem.actorSelection(s"akka://${remoteSystem2.name}@localhost:$remotePort2/user/$echoName")
val selectionFromRemote1ToLocal =
remoteSystem1.actorSelection(s"akka://${localSystem.name}@localhost:$localPort/user/$echoName")
val selectionFromRemote2ToLocal =
remoteSystem2.actorSelection(s"akka://${localSystem.name}@localhost:$localPort/user/$echoName")

selectionFromLocalToRemote1.tell("ping1a", localProbe.ref)
localProbe.expectMsg(500.millis, "ping1a")

selectionFromRemote1ToLocal.tell("ping1b", remote1Probe.ref)
remote1Probe.expectMsg(500.millis, "ping1b")

EventFilter[UidCollisionException]().intercept {
selectionFromLocalToRemote2.tell("ping2a", localProbe.ref)
// doesn't get through
localProbe.expectNoMessage()
}(localSystem)
RARP(localSystem).provider.transport
.asInstanceOf[ArteryTransport]
.association(RARP(remoteSystem2).provider.getDefaultAddress)
.associationState
.uniqueRemoteAddressState() shouldBe UidUnknown // handshake not completed

EventFilter[UidCollisionException]().intercept {
selectionFromRemote2ToLocal.tell("ping2b", remote2Probe.ref)
// doesn't get through in other direction
remote2Probe.expectNoMessage()
}(localSystem)

RARP(remoteSystem2).provider.transport
.asInstanceOf[ArteryTransport]
.association(RARP(localSystem).provider.getDefaultAddress)
.associationState
.uniqueRemoteAddressState() shouldBe UidKnown // handshake was completed in other direction

RARP(localSystem).provider.transport
.asInstanceOf[ArteryTransport]
.association(RARP(remoteSystem1).provider.getDefaultAddress)
.associationState
.uniqueRemoteAddressState() shouldBe UidKnown // still intact

// still works
selectionFromLocalToRemote1.tell("ping1a again", localProbe.ref)
localProbe.expectMsg(500.millis, "ping1a again")

// still works in other direction
selectionFromRemote1ToLocal.tell("ping1b again", remote1Probe.ref)
remote1Probe.expectMsg(500.millis, "ping1b again")
}

"handle uid collision when connection FROM two systems with same uid" in {
// same kind of test as above, but connection is first established from the other remote systems
val localProbe = new TestProbe(localSystem)
val localPort = RARP(localSystem).provider.getDefaultAddress.getPort().get

val conflictUid = localSystem.asInstanceOf[ExtendedActorSystem].uid + 2
val echoName = "echoB"

val remotePort1 = freePort()
val remoteSystem1 =
newRemoteSystem(extraConfig = Some(s"""
akka.test-only-uid = $conflictUid
akka.remote.artery.canonical.port=$remotePort1
"""))
val remote1Probe = new TestProbe(remoteSystem1)

val remotePort2 = freePort()
val remoteSystem2 =
newRemoteSystem(extraConfig = Some(s"""
// same uid as remoteSystem1
akka.test-only-uid = $conflictUid
akka.remote.artery.canonical.port=$remotePort2
"""))
val remote2Probe = new TestProbe(remoteSystem2)

localProbe.expectNoMessage(2.seconds)
localSystem.actorOf(TestActors.echoActorProps, echoName)
remoteSystem1.actorOf(TestActors.echoActorProps, echoName)
remoteSystem2.actorOf(TestActors.echoActorProps, echoName)

val selectionFromLocalToRemote1 =
localSystem.actorSelection(s"akka://${remoteSystem1.name}@localhost:$remotePort1/user/$echoName")
val selectionFromLocalToRemote2 =
localSystem.actorSelection(s"akka://${remoteSystem2.name}@localhost:$remotePort2/user/$echoName")
val selectionFromRemote1ToLocal =
remoteSystem1.actorSelection(s"akka://${localSystem.name}@localhost:$localPort/user/$echoName")
val selectionFromRemote2ToLocal =
remoteSystem2.actorSelection(s"akka://${localSystem.name}@localhost:$localPort/user/$echoName")

selectionFromRemote1ToLocal.tell("ping1b", remote1Probe.ref)
remote1Probe.expectMsg(500.millis, "ping1b")

selectionFromLocalToRemote1.tell("ping1a", localProbe.ref)
localProbe.expectMsg(500.millis, "ping1a")

EventFilter[UidCollisionException]().intercept {
selectionFromRemote2ToLocal.tell("ping2b", remote2Probe.ref)
// doesn't get through
localProbe.expectNoMessage()
}(localSystem)
RARP(localSystem).provider.transport
.asInstanceOf[ArteryTransport]
.association(RARP(remoteSystem2).provider.getDefaultAddress)
.associationState
.uniqueRemoteAddressState() shouldBe UidUnknown // handshake not completed

EventFilter[UidCollisionException]().intercept {
selectionFromLocalToRemote2.tell("ping2a", localProbe.ref)
// doesn't get through in other direction
remote2Probe.expectNoMessage()
}(localSystem)

RARP(remoteSystem2).provider.transport
.asInstanceOf[ArteryTransport]
.association(RARP(localSystem).provider.getDefaultAddress)
.associationState
.uniqueRemoteAddressState() shouldBe UidKnown // handshake was completed in other direction

RARP(localSystem).provider.transport
.asInstanceOf[ArteryTransport]
.association(RARP(remoteSystem1).provider.getDefaultAddress)
.associationState
.uniqueRemoteAddressState() shouldBe UidKnown // still intact

// still works
selectionFromRemote1ToLocal.tell("ping1b again", remote1Probe.ref)
remote1Probe.expectMsg(500.millis, "ping1b again")

// still works in other direction
selectionFromLocalToRemote1.tell("ping1a again", localProbe.ref)
localProbe.expectMsg(500.millis, "ping1a again")
}

"be able to connect to system even if it's not there at first" in {
muteSystem(localSystem)
val localProbe = new TestProbe(localSystem)
val echoName = "echoC"

val remotePort = freePort()

// try to talk to it before it is up
val selection = localSystem.actorSelection(s"akka://$nextGeneratedSystemName@localhost:$remotePort/user/echo")
val selection =
localSystem.actorSelection(s"akka://$nextGeneratedSystemName@localhost:$remotePort/user/$echoName")
selection.tell("ping", localProbe.ref)
localProbe.expectNoMessage(1.seconds)

Expand All @@ -37,7 +209,7 @@ class RemoteConnectionSpec extends ArteryMultiNodeSpec with ImplicitSender {

muteSystem(remoteSystem)
localProbe.expectNoMessage(2.seconds)
remoteSystem.actorOf(TestActors.echoActorProps, "echo")
remoteSystem.actorOf(TestActors.echoActorProps, echoName)

within(5.seconds) {
awaitAssert {
Expand All @@ -49,17 +221,19 @@ class RemoteConnectionSpec extends ArteryMultiNodeSpec with ImplicitSender {

"allow other system to connect even if it's not there at first" in {
val localSystem = newRemoteSystem()
val echoName = "echoD"

val localPort = port(localSystem)
muteSystem(localSystem)

val localProbe = new TestProbe(localSystem)
localSystem.actorOf(TestActors.echoActorProps, "echo")
localSystem.actorOf(TestActors.echoActorProps, echoName)

val remotePort = freePort()

// try to talk to remote before it is up
val selection = localSystem.actorSelection(s"akka://$nextGeneratedSystemName@localhost:$remotePort/user/echo")
val selection =
localSystem.actorSelection(s"akka://$nextGeneratedSystemName@localhost:$remotePort/user/$echoName")
selection.tell("ping", localProbe.ref)
localProbe.expectNoMessage(1.seconds)

Expand All @@ -70,7 +244,7 @@ class RemoteConnectionSpec extends ArteryMultiNodeSpec with ImplicitSender {
localProbe.expectNoMessage(2.seconds)
val otherProbe = new TestProbe(remoteSystem)
val otherSender = otherProbe.ref
val thisSelection = remoteSystem.actorSelection(s"akka://${localSystem.name}@localhost:$localPort/user/echo")
val thisSelection = remoteSystem.actorSelection(s"akka://${localSystem.name}@localhost:$localPort/user/$echoName")
within(5.seconds) {
awaitAssert {
thisSelection.tell("ping", otherSender)
Expand Down

0 comments on commit 85236f8

Please sign in to comment.