Skip to content

Wip 2053 actorbased remote drewhk #652

Closed
wants to merge 54 commits into from

5 participants

@drewhk
Akka Project member
drewhk commented Aug 28, 2012

Do not merge! Just for review.

The important stuff is in ActorManagedRemoting. TransportConnector is the abstraction layer over the underlying transport medium. NettyConnector is a first-iteration implementation of the former, and DummyConnector is a Connector used for testing.

Beware: No docs yet, some tests are failing.

Endre Sándor... added some commits Aug 20, 2012
Endre Sándor Varga Removed blocking calls from RemoteTransport API. (Only Address is blo…
…cking now)
a7f8252
Endre Sándor Varga Added a separate trait for the helper methods that were inherited fro…
…m RemoteTransport before
df005f7
Endre Sándor Varga Added a separate trait for the helper methods that were inherited fro…
…m RemoteTransport before
4262dde
Endre Sándor Varga Added a prototype for actor managed remoting layer 2e025a4
Endre Sándor Varga Refactored TransportProvider
 - renamed to TransportConnector
 - expects an ActorRef instead of callbacks
 - transport medium now uses host and port instead of addresses
8a587ef
Endre Sándor Varga Reverted original test back 30c38d8
Endre Sándor Varga Removed copyright text 48c7972
Endre Sándor Varga Added base class for testing remotes using the old RemoteCommunicatio…
…nSpec test case
7eaab88
Endre Sándor Varga Added interoperability test with old NettySupport 6a4ae9d
Endre Sándor Varga Added first attempt of a Netty Connector f2dace4
Endre Sándor Varga Implemented connect and send in NettyConnector.scala 46d7a6c
Endre Sándor Varga TransportConnector now loads via configuration 8182a06
Endre Sándor Varga Added support for connector settings, implemented skeleton for endpoi…
…nt connect policy (Pass, Failed)
7ce7398
Endre Sándor Varga Changed tests to include connector configuration c50ea87
Endre Sándor Varga Shutted down some test -- will refactor it later 6c86cbc
Endre Sándor Varga Added logging and event notification. Some is missing from headActor 8bc7a0b
Endre Sándor Varga Added logging and event notification. Some is missing from headActor e133001
Endre Sándor Varga Encapsulated endpoint table management operations in a separate class 2ad932c
@rkuhn rkuhn commented on an outdated diff Aug 28, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ headActor = null
+ transport = null
+ }
+ } catch {
+ case e: akka.pattern.AskTimeoutException // the actor wasn't stopped within 5 seconds
+ }
+ }
+ }
+
+ // Start assumes that it cannot be followed by another start() without having a shutdown() first
+ def start() {
+ if (headActor eq null) {
+ transport = loadTransport
+ headActor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new HeadActor(provider, transport, managedRemoteSettings)), HeadActorName)
+
+ val timeout = new Timeout(5 seconds)
@rkuhn
Akka Project member
rkuhn added a note Aug 28, 2012

these things tend to want to be configurable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn and 2 others commented on an outdated diff Aug 28, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ val endpoint = context.actorOf(Props(new EndpointActor(
+ provider,
+ address,
+ remote,
+ connector,
+ transport,
+ settings,
+ handleOption)))
+ context.watch(endpoint)
+ }
+
+ // TODO: implement this and make configurable
+ private def retryLatchOpen(timeOfFailure: Long) = false
+
+ override def postStop() {
+ // TODO: All the children actors are stopped already?
@rkuhn
Akka Project member
rkuhn added a note Aug 28, 2012

yes, postStop of parent is called after all postStop of children have run

@drewhk
Akka Project member
drewhk added a note Aug 28, 2012

After all the postStop of the children have been called, or after all the postStop finished?

@bantonsson
Akka Project member
bantonsson added a note Aug 28, 2012

The child tells the parent that it has terminated after it has run its postStop. And the parent proceeds to run its own postStop after all children has told it that they have terminated.

Does that answer your question?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn commented on an outdated diff Aug 28, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ import akka.actor.SupervisorStrategy._
+ import actmote.TransportConnector.IncomingConnection
+ import HeadActor._
+
+ val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
+ val log = Logging(context.system.eventStream, "HeadActor")
+
+ private var address: Address = _
+ // Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections
+ // will be not part of this map!
+ val endpoints = new EndpointRegistry()
+
+ private var transport: RemoteTransport = _
+
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
+ case _: EndpointException Stop
@rkuhn
Akka Project member
rkuhn added a note Aug 28, 2012

This means that all other exceptions are escalated, leading to a restart of the headActor, which will due to default preRestart terminate all connections. I know you are thinking about definitions of restart semantics for the connection actors, and once that is done it needs to be hooked in here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn and 1 other commented on an outdated diff Aug 28, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+
+ @volatile var activityLog = List[Activity]()
+
+ @volatile var droppingSet = Set[Address]()
+ @volatile var rejectSet = Set[Address]()
+ @volatile var crashSet = Set[Address]()
+
+ def addressToHostAndPort(address: Address) = (address.host, address.port) match {
+ case (Some(host), Some(port)) HostAndPort(address.host.get, address.port.get)
+ case _ throw new IllegalArgumentException("DummyConnector only supports addresses with hostname and port specified")
+ }
+
+ def logicalLinkToNetworkLink(link: (Address, Address)): (HostAndPort, HostAndPort) = addressToHostAndPort(link._1) -> addressToHostAndPort(link._2)
+
+ def silentDrop(source: Address) {
+ droppingSet += source
@rkuhn
Akka Project member
rkuhn added a note Aug 28, 2012

adding @volatile does not make it race-free without using CAS loops; I'd recommend a normal ConcurrentHashMap or similar

@drewhk
Akka Project member
drewhk added a note Aug 28, 2012

Totally true, I realized that it was wrong, but I forgot about concurrent collections.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn commented on an outdated diff Aug 28, 2012
...main/scala/akka/remote/actmote/MessageEncodings.scala
@@ -0,0 +1,50 @@
+package akka.remote.actmote
+
+import akka.remote.RemoteProtocol._
+import akka.actor._
+import akka.serialization.Serialization
+import akka.remote.MessageSerializer
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: Varga Endre Sándor
+ * Date: 2012.08.20.
+ * Time: 14:58
+ * To change this template use File | Settings | File Templates.
@rkuhn
Akka Project member
rkuhn added a note Aug 28, 2012

probably the easiest way of not forgetting to remove these is to switch them off in IDEA ;-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn commented on an outdated diff Aug 28, 2012
...in/scala/akka/remote/actmote/TransportConnector.scala
+
+object TransportConnector {
+
+ import akka.remote.RemoteMessage
+
+ sealed trait ConnectorEvent
+ case class MessageArrived(msg: RemoteMessage) extends ConnectorEvent
+ case class IncomingConnection(handle: TransportConnectorHandle)
+ case class ConnectionInitialized(handle: TransportConnectorHandle) extends ConnectorEvent
+ case class ConnectionFailed(reason: Throwable) extends ConnectorEvent
+ case class Disconnected(handle: TransportConnectorHandle) extends ConnectorEvent
+}
+
+abstract class TransportConnector(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) {
+
+ import akka.actor.Address
@rkuhn
Akka Project member
rkuhn added a note Aug 28, 2012

we usually put such imports at the top-level, unless lexical scope really matters (for implicits) or they are "unusual"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn commented on an outdated diff Aug 28, 2012
...in/scala/akka/remote/actmote/TransportConnector.scala
+ import akka.remote.RemoteMessage
+
+ sealed trait ConnectorEvent
+ case class MessageArrived(msg: RemoteMessage) extends ConnectorEvent
+ case class IncomingConnection(handle: TransportConnectorHandle)
+ case class ConnectionInitialized(handle: TransportConnectorHandle) extends ConnectorEvent
+ case class ConnectionFailed(reason: Throwable) extends ConnectorEvent
+ case class Disconnected(handle: TransportConnectorHandle) extends ConnectorEvent
+}
+
+abstract class TransportConnector(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) {
+
+ import akka.actor.Address
+
+ def responsibleActor: ActorRef
+ def responsibleActor_=(actor: ActorRef): Unit
@rkuhn
Akka Project member
rkuhn added a note Aug 28, 2012

keep an eye out for Java API: we generally do not use (partly) symbolic method names on interfaces which can reasonably be implemented by user code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn commented on an outdated diff Aug 28, 2012
...in/scala/akka/remote/actmote/TransportConnector.scala
+
+ def responsibleActor: ActorRef
+ def responsibleActor_=(actor: ActorRef): Unit
+ def address: Address
+ // TODO: Document that all retry logic MUST be in connect
+ def connect(remote: Address, responsibleActorForConnection: ActorRef = responsibleActor): Unit
+ def shutdown(): Unit
+}
+
+trait TransportConnectorHandle {
+
+ import akka.actor.{ Address, ActorRef }
+ import akka.remote.RemoteActorRef
+
+ def responsibleActor: ActorRef
+ def responsibleActor_=(actor: ActorRef): Unit
@rkuhn
Akka Project member
rkuhn added a note Aug 28, 2012

didn't look to closely at the usage: why does the handle need to be mutable?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn commented on the diff Aug 28, 2012
...la/akka/remote/actmote/ActorManagedRemotingSpec.scala
+ remoteTransport.shutdown
+ DummyTransportMedium.clear
+ }
+ }
+
+ import DummyTransportMedium.SendAttempt
+ import DummyTransportMedium.ConnectionAttempt
+
+ "Actor based remoting" must {
+
+ "return the local address" in withCleanTransport {
+ assert(transportUnderTest.address.toString === "akka://ActorManagedRemotingSpec@localhost:12345")
+ }
+
+ "connect to remote address at the first send" in withCleanTransport {
+ remoteReference ! "discard"
@rkuhn
Akka Project member
rkuhn added a note Aug 28, 2012

maybe assert that no connection was there before the send

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Endre Sándor... added some commits Aug 28, 2012
Endre Sándor Varga Added documentation to the TransportConnector SPI. 73aed33
Endre Sándor Varga small fix to make it compile b1711ca
Endre Sándor Varga Updated documentation
 - added documentation for TransportConnectorHandle
 - prettified formatting
dab8779
Endre Sándor Varga Updated documenation
 - corrected typos
 - clarified disconnect semantics
 - unified some terminology
0c60a61
Endre Sándor Varga Removed remaining async API 4aed575
Endre Sándor Varga Removed setters and getters for responsible actors -- they are accept…
…ed as parameters for listen() and open() respectively
2d1a72a
Endre Sándor Varga Added settings for startup and shutdown timeouts ceb32a7
Endre Sándor Varga Added logging 05b5637
Endre Sándor Varga Implemented retry latch using Deadline and Duration c83d133
Endre Sándor Varga Moved message dispatch (receive) logic from endpoint actors to connec…
…tors
220ce6f
Endre Sándor Varga Reformat by Scalariform e393a08
Endre Sándor Varga DummyConnector and NettyConnector is now updated to the new Transport…
…Connector interface and compiles
428fc09
Endre Sándor Varga - Changed to use the Stash
- Recovery strategy is either
 - Latched: drop everything until latch opens again
 - Restart: keep queue, and restart
b63e620
Endre Sándor Varga - Endpoint is no longer an FSM, but uses become()/unbecome()
 - postRestart cleans the injected handle (that is already invalid)
0abe84a
Endre Sándor Varga Reduced coupling between EndpointActor and its collaborators by refac…
…toring LifeCycleNotificationHelper
dab52fc
Endre Sándor Varga - Removed RemoteTransport as a dependency from EndpointActor and Hea…
…dActor

 - Fixed bug when comparing Duration directly to 0
a8faee2
Endre Sándor Varga Settings now use absolute path 87352f0
Endre Sándor Varga Fixed minor bugs:
 - unnecessary protected method in LifeCycleNotifier
 - Send has a proper toString method
 - endpoint is now initialized after restart
 - ConnectionFailes messages are now handled
d3c0867
Endre Sándor Varga Updated documentation of TransportConnector 8ffc966
Endre Sándor Varga EndpointActors now use default-stash-dispatcher d4d9a72
Endre Sándor Varga EndpointActor no longer retries messages on a failed write 83c28ce
Endre Sándor Varga Unhandled messages now considered fatal errors 64ff62a
Endre Sándor Varga Fixed restart problem -- preStart was called unintentionally through …
…super.postRestart
60e8381
Endre Sándor Varga Added test cases for endpoint actor bce6c1a
Endre Sándor Varga Refactoring to enable parallel tests d84d8c1
Endre Sándor Varga Updated ActorManagedRemotingSpec
 - Refactored DummyTransportConnector to be able to run in parallel with other tests
 - Updated ActorManagedRemotingSpec to conform with new TransportConnector semantics
 - Fixed several bugs in ActorManagedRemoting itself
e3b36c0
Endre Sándor Varga Updated TransportConnector API to signal backoff 378d6d1
Endre Sándor Varga Implemented pruning of EndpointRegistry a2efdac
Endre Sándor Varga Added simple backoff mechanism for flow control events 6bbb50a
Endre Sándor Varga Establishing outbound connections are now non-blocking 772f93c
Endre Sándor Varga More asynchronous handling of restart (one blocking call is still rem…
…aining)

Proper handling of errors in NettyConnectorHandle
2d05296
Endre Sándor Varga Handle now filled with correct remote address 7c0d61a
Endre Sándor Varga Fixed problem with too early blocking of server pipeline (with setRea…
…dable(false)). Now it accepts the first Akka protocol messages and blocks it _after_ they were received
57a7c7a
Endre Sándor Varga Updated test cases to use dynamic port. Currently does not work with …
…remote deployment tests (those fail)
4ca04d0
Endre Sándor Varga Fixed handle initialization ordering for incoming connections 6b97ffe
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+import akka.event._
+import akka.pattern.ask
+import akka.pattern.gracefulStop
+import akka.remote._
+import akka.util.Timeout
+import scala.concurrent.Await
+import scala.concurrent.Future
+import scala.concurrent.util.duration._
+import util.control.NonFatal
+import concurrent.util.{ Duration, Deadline }
+import scala.Some
+import actmote.TransportConnector._
+
+class ActorManagedRemoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
+
+ val HeadActorName = "remoteTransportHeadActor"
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Always place constants in objects and not in classes, it's good style to do so since it doesn't add a field for every instance. (In this case the number of instances will be few/one but it's how we do it)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+import scala.concurrent.Await
+import scala.concurrent.Future
+import scala.concurrent.util.duration._
+import util.control.NonFatal
+import concurrent.util.{ Duration, Deadline }
+import scala.Some
+import actmote.TransportConnector._
+
+class ActorManagedRemoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
+
+ val HeadActorName = "remoteTransportHeadActor"
+ val managedRemoteSettings = new ActorManagedRemotingSettings(provider.remoteSettings.config)
+
+ @volatile var headActor: ActorRef = _
+ @volatile var address: Address = _
+ @volatile var connector: TransportConnector = _
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Are these intended to be public?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+import scala.concurrent.util.duration._
+import util.control.NonFatal
+import concurrent.util.{ Duration, Deadline }
+import scala.Some
+import actmote.TransportConnector._
+
+class ActorManagedRemoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
+
+ val HeadActorName = "remoteTransportHeadActor"
+ val managedRemoteSettings = new ActorManagedRemotingSettings(provider.remoteSettings.config)
+
+ @volatile var headActor: ActorRef = _
+ @volatile var address: Address = _
+ @volatile var connector: TransportConnector = _
+
+ def loadConnector = {
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Is this intended to be public?
For reasons of readability and quick breakage if inference changes, always use explicit return types on methods.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ val HeadActorName = "remoteTransportHeadActor"
+ val managedRemoteSettings = new ActorManagedRemotingSettings(provider.remoteSettings.config)
+
+ @volatile var headActor: ActorRef = _
+ @volatile var address: Address = _
+ @volatile var connector: TransportConnector = _
+
+ def loadConnector = {
+ val fqn = managedRemoteSettings.Connector
+ val args = Seq(
+ classOf[ExtendedActorSystem] -> system,
+ classOf[RemoteActorRefProvider] -> provider)
+
+ system.dynamicAccess.createInstanceFor[TransportConnector](fqn, args) match {
+ case Left(problem) {
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

No need for the braces here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ def loadConnector = {
+ val fqn = managedRemoteSettings.Connector
+ val args = Seq(
+ classOf[ExtendedActorSystem] -> system,
+ classOf[RemoteActorRefProvider] -> provider)
+
+ system.dynamicAccess.createInstanceFor[TransportConnector](fqn, args) match {
+ case Left(problem) {
+ throw new RemoteTransportException("Could not load transport connector " + fqn, problem)
+ }
+ case Right(connector) connector
+ }
+ }
+
+ lazy val log: LoggingAdapter = Logging(system.eventStream, "ActorManagedRemoting(" + address + ")")
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Why is this public and lazy?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ throw new RemoteTransportException("Could not load transport connector " + fqn, problem)
+ }
+ case Right(connector) connector
+ }
+ }
+
+ lazy val log: LoggingAdapter = Logging(system.eventStream, "ActorManagedRemoting(" + address + ")")
+
+ def shutdown() {
+ if (headActor != null) {
+ try {
+ val stopped: Future[Boolean] = gracefulStop(headActor, 5 seconds)(system)
+ // the actor has been stopped
+ if (Await.result(stopped, managedRemoteSettings.ShutdownTimeout)) {
+ headActor = null
+ connector = null
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

We most likely want to do the nulling-out in a finally block

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ lazy val log: LoggingAdapter = Logging(system.eventStream, "ActorManagedRemoting(" + address + ")")
+
+ def shutdown() {
+ if (headActor != null) {
+ try {
+ val stopped: Future[Boolean] = gracefulStop(headActor, 5 seconds)(system)
+ // the actor has been stopped
+ if (Await.result(stopped, managedRemoteSettings.ShutdownTimeout)) {
+ headActor = null
+ connector = null
+ log.info("Remoting stopped successfully")
+ }
+
+ } catch {
+ case e: akka.pattern.AskTimeoutException log.warning("Shutdown timed out")
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Use j.u.c.TimeoutException (supertype of AskTimeoutException)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ if (Await.result(stopped, managedRemoteSettings.ShutdownTimeout)) {
+ headActor = null
+ connector = null
+ log.info("Remoting stopped successfully")
+ }
+
+ } catch {
+ case e: akka.pattern.AskTimeoutException log.warning("Shutdown timed out")
+ case NonFatal(e) log.error(e, "Shutdown failed")
+ }
+ }
+ }
+
+ // Start assumes that it cannot be followed by another start() without having a shutdown() first
+ def start() {
+ log.info("Starting remoting")
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

This should move in after the if-check

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ case NonFatal(e) log.error(e, "Shutdown failed")
+ }
+ }
+ }
+
+ // Start assumes that it cannot be followed by another start() without having a shutdown() first
+ def start() {
+ log.info("Starting remoting")
+ if (headActor eq null) {
+ connector = loadConnector
+ // NOTE: Notifier will use the logger of this class
+ val notifier = new DefaultLifeCycleNotifier(provider.remoteSettings, this, system.eventStream, log)
+ headActor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new HeadActor(provider.remoteSettings, connector, managedRemoteSettings, notifier)), HeadActorName)
+
+ val timeout = new Timeout(managedRemoteSettings.StartupTimeout)
+ val addressFuture = headActor.ask(Listen(this))(timeout).mapTo[Address]
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Not needed to have the addressFuture out here, just inline it into the Await.result(...) call

@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

implicit val timeout instead of param

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ // Start assumes that it cannot be followed by another start() without having a shutdown() first
+ def start() {
+ log.info("Starting remoting")
+ if (headActor eq null) {
+ connector = loadConnector
+ // NOTE: Notifier will use the logger of this class
+ val notifier = new DefaultLifeCycleNotifier(provider.remoteSettings, this, system.eventStream, log)
+ headActor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new HeadActor(provider.remoteSettings, connector, managedRemoteSettings, notifier)), HeadActorName)
+
+ val timeout = new Timeout(managedRemoteSettings.StartupTimeout)
+ val addressFuture = headActor.ask(Listen(this))(timeout).mapTo[Address]
+
+ try {
+ this.address = Await.result(addressFuture, timeout.duration)
+ } catch {
+ case e: akka.pattern.AskTimeoutException throw new RemoteTransportException("Startup timed out", e)
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

j.u.c.TimeoutException

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+}
+
+trait LifeCycleNotifier {
+ def remoteClientError(reason: Throwable, remoteAddress: Address): Unit
+ def remoteClientDisconnected(remoteAddress: Address): Unit
+ def remoteClientConnected(remoteAddress: Address): Unit
+ def remoteClientStarted(remoteAddress: Address): Unit
+ def remoteClientShutdown(remoteAddress: Address): Unit
+
+ def remoteServerStarted(): Unit
+ def remoteServerShutdown(): Unit
+ def remoteServerError(reason: Throwable): Unit
+ def remoteServerClientConnected(remoteAddress: Address)
+ def remoteServerClientDisconnected(remoteAddress: Address)
+ def remoteServerClientClosed(remoteAddress: Address)
+}
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Looks like a good contract, needs some ScalaDoc that describes what it does and what it means. (I'd suggest the name "reporter" instead of "notifier")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ def remoteServerError(reason: Throwable): Unit
+ def remoteServerClientConnected(remoteAddress: Address)
+ def remoteServerClientDisconnected(remoteAddress: Address)
+ def remoteServerClientClosed(remoteAddress: Address)
+}
+
+class DefaultLifeCycleNotifier(remoteSettings: RemoteSettings, remoteTransport: RemoteTransport, eventStream: EventStream, log: LoggingAdapter) extends LifeCycleNotifier {
+ //private def useUntrustedMode = remoteSettings.UntrustedMode
+ private def logRemoteLifeCycleEvents = remoteSettings.LogRemoteLifeCycleEvents
+
+ protected def notifyListeners(message: RemoteLifeCycleEvent): Unit = {
+ eventStream.publish(message)
+ if (logRemoteLifeCycleEvents) log.log(message.logLevel, "{}", message)
+ }
+
+ def remoteClientError(reason: Throwable, remoteAddress: Address) { notifyListeners(RemoteClientError(reason, remoteTransport, remoteAddress)) }
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Always use "override" so that you get compilation errors when you remove methods on the interface.
For 1-line methods the recommended format is: def ... = : Type = ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ def remoteClientDisconnected(remoteAddress: Address): Unit
+ def remoteClientConnected(remoteAddress: Address): Unit
+ def remoteClientStarted(remoteAddress: Address): Unit
+ def remoteClientShutdown(remoteAddress: Address): Unit
+
+ def remoteServerStarted(): Unit
+ def remoteServerShutdown(): Unit
+ def remoteServerError(reason: Throwable): Unit
+ def remoteServerClientConnected(remoteAddress: Address)
+ def remoteServerClientDisconnected(remoteAddress: Address)
+ def remoteServerClientClosed(remoteAddress: Address)
+}
+
+class DefaultLifeCycleNotifier(remoteSettings: RemoteSettings, remoteTransport: RemoteTransport, eventStream: EventStream, log: LoggingAdapter) extends LifeCycleNotifier {
+ //private def useUntrustedMode = remoteSettings.UntrustedMode
+ private def logRemoteLifeCycleEvents = remoteSettings.LogRemoteLifeCycleEvents
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

I'd remove this method and use remoteSettings.LogRemoveLifeCycleEvents in the notifyListeners-method directly.

@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

yes, or import remoteSettings.LogRemoteLifeCycleEvents

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ def remoteClientError(reason: Throwable, remoteAddress: Address) { notifyListeners(RemoteClientError(reason, remoteTransport, remoteAddress)) }
+ def remoteClientDisconnected(remoteAddress: Address) { notifyListeners(RemoteClientDisconnected(remoteTransport, remoteAddress)) }
+ def remoteClientConnected(remoteAddress: Address) { notifyListeners(RemoteClientConnected(remoteTransport, remoteAddress)) }
+ def remoteClientStarted(remoteAddress: Address) { notifyListeners(RemoteClientStarted(remoteTransport, remoteAddress)) }
+ def remoteClientShutdown(remoteAddress: Address) { notifyListeners(RemoteClientShutdown(remoteTransport, remoteAddress)) }
+
+ def remoteServerStarted() { notifyListeners(RemoteServerStarted(remoteTransport)) }
+ def remoteServerShutdown() { notifyListeners(RemoteServerShutdown(remoteTransport)) }
+ def remoteServerError(reason: Throwable) { notifyListeners(RemoteServerError(reason, remoteTransport)) }
+ def remoteServerClientConnected(remoteAddress: Address) { notifyListeners(RemoteServerClientConnected(remoteTransport, Some(remoteAddress))) }
+ def remoteServerClientDisconnected(remoteAddress: Address) { notifyListeners(RemoteServerClientDisconnected(remoteTransport, Some(remoteAddress))) }
+ def remoteServerClientClosed(remoteAddress: Address) { notifyListeners(RemoteServerClientClosed(remoteTransport, Some(remoteAddress))) }
+}
+
+private[actmote] sealed trait RemotingCommand
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Why is this placed here and not in an object?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ def remoteClientError(reason: Throwable, remoteAddress: Address) { notifyListeners(RemoteClientError(reason, remoteTransport, remoteAddress)) }
+ def remoteClientDisconnected(remoteAddress: Address) { notifyListeners(RemoteClientDisconnected(remoteTransport, remoteAddress)) }
+ def remoteClientConnected(remoteAddress: Address) { notifyListeners(RemoteClientConnected(remoteTransport, remoteAddress)) }
+ def remoteClientStarted(remoteAddress: Address) { notifyListeners(RemoteClientStarted(remoteTransport, remoteAddress)) }
+ def remoteClientShutdown(remoteAddress: Address) { notifyListeners(RemoteClientShutdown(remoteTransport, remoteAddress)) }
+
+ def remoteServerStarted() { notifyListeners(RemoteServerStarted(remoteTransport)) }
+ def remoteServerShutdown() { notifyListeners(RemoteServerShutdown(remoteTransport)) }
+ def remoteServerError(reason: Throwable) { notifyListeners(RemoteServerError(reason, remoteTransport)) }
+ def remoteServerClientConnected(remoteAddress: Address) { notifyListeners(RemoteServerClientConnected(remoteTransport, Some(remoteAddress))) }
+ def remoteServerClientDisconnected(remoteAddress: Address) { notifyListeners(RemoteServerClientDisconnected(remoteTransport, Some(remoteAddress))) }
+ def remoteServerClientClosed(remoteAddress: Address) { notifyListeners(RemoteServerClientClosed(remoteTransport, Some(remoteAddress))) }
+}
+
+private[actmote] sealed trait RemotingCommand
+private[actmote] case class Listen(transport: RemoteTransport)
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Listen does not extend RemotingCommand (which I assume it should?)

Also, I prefer to pass the transport in the constructor instead of in a message (in Props).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ def remoteClientConnected(remoteAddress: Address) { notifyListeners(RemoteClientConnected(remoteTransport, remoteAddress)) }
+ def remoteClientStarted(remoteAddress: Address) { notifyListeners(RemoteClientStarted(remoteTransport, remoteAddress)) }
+ def remoteClientShutdown(remoteAddress: Address) { notifyListeners(RemoteClientShutdown(remoteTransport, remoteAddress)) }
+
+ def remoteServerStarted() { notifyListeners(RemoteServerStarted(remoteTransport)) }
+ def remoteServerShutdown() { notifyListeners(RemoteServerShutdown(remoteTransport)) }
+ def remoteServerError(reason: Throwable) { notifyListeners(RemoteServerError(reason, remoteTransport)) }
+ def remoteServerClientConnected(remoteAddress: Address) { notifyListeners(RemoteServerClientConnected(remoteTransport, Some(remoteAddress))) }
+ def remoteServerClientDisconnected(remoteAddress: Address) { notifyListeners(RemoteServerClientDisconnected(remoteTransport, Some(remoteAddress))) }
+ def remoteServerClientClosed(remoteAddress: Address) { notifyListeners(RemoteServerClientClosed(remoteTransport, Some(remoteAddress))) }
+}
+
+private[actmote] sealed trait RemotingCommand
+private[actmote] case class Listen(transport: RemoteTransport)
+private[actmote] case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand {
+ override def toString = "Remote message " + senderOption.getOrElse("UNKNOWN") + " -> " + recipient
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Instead of the getOrElse you could simply do + senderOption and it will be Some(path) or None which is just as informative.

@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

2.10 string interpolation?

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ def remoteServerStarted() { notifyListeners(RemoteServerStarted(remoteTransport)) }
+ def remoteServerShutdown() { notifyListeners(RemoteServerShutdown(remoteTransport)) }
+ def remoteServerError(reason: Throwable) { notifyListeners(RemoteServerError(reason, remoteTransport)) }
+ def remoteServerClientConnected(remoteAddress: Address) { notifyListeners(RemoteServerClientConnected(remoteTransport, Some(remoteAddress))) }
+ def remoteServerClientDisconnected(remoteAddress: Address) { notifyListeners(RemoteServerClientDisconnected(remoteTransport, Some(remoteAddress))) }
+ def remoteServerClientClosed(remoteAddress: Address) { notifyListeners(RemoteServerClientClosed(remoteTransport, Some(remoteAddress))) }
+}
+
+private[actmote] sealed trait RemotingCommand
+private[actmote] case class Listen(transport: RemoteTransport)
+private[actmote] case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand {
+ override def toString = "Remote message " + senderOption.getOrElse("UNKNOWN") + " -> " + recipient
+}
+
+object HeadActor {
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

We might want to brainstorm about a new name :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+private[actmote] case class Listen(transport: RemoteTransport)
+private[actmote] case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand {
+ override def toString = "Remote message " + senderOption.getOrElse("UNKNOWN") + " -> " + recipient
+}
+
+object HeadActor {
+ sealed trait EndpointPolicy
+ case class Pass(endpoint: ActorRef) extends EndpointPolicy
+ case class Latched(timeOfFailure: Deadline) extends EndpointPolicy
+ //case class PassPassive(endpointOption: Option[ActorRef])
+
+ case object Prune
+
+ class EndpointRegistry {
+ private val addressToEndpointAndPolicy = scala.collection.mutable.Map[Address, EndpointPolicy]()
+ private val endpointToAddress = scala.collection.mutable.Map[ActorRef, Address]()
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

Always use vars-to-immutable-datastructure unless you're using a Concurrent datastructure: why?

  • You can freely share the data structure if it is immutable, and changing the value locally keeps mutability to the smallest possible scope.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 4, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ sealed trait EndpointPolicy
+ case class Pass(endpoint: ActorRef) extends EndpointPolicy
+ case class Latched(timeOfFailure: Deadline) extends EndpointPolicy
+ //case class PassPassive(endpointOption: Option[ActorRef])
+
+ case object Prune
+
+ class EndpointRegistry {
+ private val addressToEndpointAndPolicy = scala.collection.mutable.Map[Address, EndpointPolicy]()
+ private val endpointToAddress = scala.collection.mutable.Map[ActorRef, Address]()
+
+ def getEndpointWithPolicy(address: Address) = addressToEndpointAndPolicy.get(address)
+
+ def prune(pruneAge: Duration) {
+ addressToEndpointAndPolicy.retain {
+ case (address, Pass(_)) true
@viktorklang
Akka Project member
viktorklang added a note Sep 4, 2012

no need to match "address", use _

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ case class Pass(endpoint: ActorRef) extends EndpointPolicy
+ case class Latched(timeOfFailure: Deadline) extends EndpointPolicy
+ //case class PassPassive(endpointOption: Option[ActorRef])
+
+ case object Prune
+
+ class EndpointRegistry {
+ private val addressToEndpointAndPolicy = scala.collection.mutable.Map[Address, EndpointPolicy]()
+ private val endpointToAddress = scala.collection.mutable.Map[ActorRef, Address]()
+
+ def getEndpointWithPolicy(address: Address) = addressToEndpointAndPolicy.get(address)
+
+ def prune(pruneAge: Duration) {
+ addressToEndpointAndPolicy.retain {
+ case (address, Pass(_)) true
+ case (address, Latched(timeOfFaiure)) (Deadline.now + pruneAge).isOverdue()
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

I don't understand this line

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ def getEndpointWithPolicy(address: Address) = addressToEndpointAndPolicy.get(address)
+
+ def prune(pruneAge: Duration) {
+ addressToEndpointAndPolicy.retain {
+ case (address, Pass(_)) true
+ case (address, Latched(timeOfFaiure)) (Deadline.now + pruneAge).isOverdue()
+ }
+ }
+
+ def registerEndpoint(address: Address, endpoint: ActorRef) {
+ addressToEndpointAndPolicy += address -> Pass(endpoint)
+ endpointToAddress += endpoint -> address
+ }
+
+ def markFailed(endpoint: ActorRef, timeOfFailure: Deadline) {
+ val address = endpointToAddress(endpoint)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

what if endpoint doesn't exist in the map, should throw exception here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ }
+
+ def markFailed(endpoint: ActorRef, timeOfFailure: Deadline) {
+ val address = endpointToAddress(endpoint)
+ endpointToAddress.remove(endpoint)
+ addressToEndpointAndPolicy(address) = Latched(timeOfFailure)
+ }
+
+ def markPass(endpoint: ActorRef) {
+ val address = endpointToAddress(endpoint)
+ addressToEndpointAndPolicy(address) = Pass(endpoint)
+ }
+
+ def removeIfNotLatched(endpoint: ActorRef) {
+ if (endpointToAddress.get(endpoint).isDefined) {
+ val address = endpointToAddress(endpoint)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Double work, on the line above the address is returned in an option, use a pattern-match on the returned value.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ }
+
+ def markPass(endpoint: ActorRef) {
+ val address = endpointToAddress(endpoint)
+ addressToEndpointAndPolicy(address) = Pass(endpoint)
+ }
+
+ def removeIfNotLatched(endpoint: ActorRef) {
+ if (endpointToAddress.get(endpoint).isDefined) {
+ val address = endpointToAddress(endpoint)
+ addressToEndpointAndPolicy.get(address) match {
+ case Some(Latched(_)) //Leave it be. It contains only the last failure time, but not the endpoint ref
+ case _ addressToEndpointAndPolicy.remove(address)
+ }
+ // The endpoint is already stopped, always remove it
+ endpointToAddress.remove(endpoint)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Since you always do remove you might as well start the method with:

endpointToAddress.remove(endpoint) match {
case Some(address) => addressToEnd....
case None => ...
}

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012

True! Nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ }
+ // The endpoint is already stopped, always remove it
+ endpointToAddress.remove(endpoint)
+ }
+ }
+ }
+}
+
+class HeadActor(
+ val remoteSettings: RemoteSettings,
+ val connector: TransportConnector,
+ val settings: ActorManagedRemotingSettings,
+ val notifier: LifeCycleNotifier) extends Actor {
+
+ import akka.actor.SupervisorStrategy._
+ import actmote.TransportConnector.IncomingConnection
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

These two imports should go at the top of the file

@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

and (then) it should not be a relative import

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ }
+ }
+}
+
+class HeadActor(
+ val remoteSettings: RemoteSettings,
+ val connector: TransportConnector,
+ val settings: ActorManagedRemotingSettings,
+ val notifier: LifeCycleNotifier) extends Actor {
+
+ import akka.actor.SupervisorStrategy._
+ import actmote.TransportConnector.IncomingConnection
+ import HeadActor._
+
+ val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
+ val eventStream = context.system.eventStream
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

why put it in a val here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ }
+ }
+ }
+}
+
+class HeadActor(
+ val remoteSettings: RemoteSettings,
+ val connector: TransportConnector,
+ val settings: ActorManagedRemotingSettings,
+ val notifier: LifeCycleNotifier) extends Actor {
+
+ import akka.actor.SupervisorStrategy._
+ import actmote.TransportConnector.IncomingConnection
+ import HeadActor._
+
+ val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

why val here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ val endpoints = new EndpointRegistry()
+
+ private var transport: RemoteTransport = _
+ private var startupFuture: ActorRef = _
+
+ private val retryLatchEnabled = settings.RetryLatchClosedFor.length > 0
+ val pruneInterval: Duration = if (retryLatchEnabled) settings.RetryLatchClosedFor * 2 else Duration.Zero
+
+ private def failureStrategy = if (!retryLatchEnabled) {
+ // This strategy keeps all the messages in the stash of the endpoint so restart will transfer the queue
+ // to the restarted endpoint -- thus no messages are lost
+ Restart
+ } else {
+ // This strategy throws away all the messages enqueued in the endpoint (in its stash), registers the time of failure,
+ // keeps throwing away messages until the retry latch becomes open (RetryLatchClosedFor)
+ endpoints.markFailed(sender, Deadline.now)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

why deadline.now?

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

is it a timestamp?
System.currentTimeMillis
or if it's only for later measure a duration System.nanoTime

@viktorklang
Akka Project member
viktorklang added a note Sep 6, 2012

Or if it really is a Deadline, it should be in the future, then you just need to check isOverdue and then evict.

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ val pruneInterval: Duration = if (retryLatchEnabled) settings.RetryLatchClosedFor * 2 else Duration.Zero
+
+ private def failureStrategy = if (!retryLatchEnabled) {
+ // This strategy keeps all the messages in the stash of the endpoint so restart will transfer the queue
+ // to the restarted endpoint -- thus no messages are lost
+ Restart
+ } else {
+ // This strategy throws away all the messages enqueued in the endpoint (in its stash), registers the time of failure,
+ // keeps throwing away messages until the retry latch becomes open (RetryLatchClosedFor)
+ endpoints.markFailed(sender, Deadline.now)
+ Stop
+ }
+
+ override val supervisorStrategy = OneForOneStrategy() {
+ case _: EndpointException failureStrategy
+ case NonFatal(_) failureStrategy
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

I think EndpointException is not a fatal exception so you don't need the "_: EndpointException"

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ } else {
+ // This strategy throws away all the messages enqueued in the endpoint (in its stash), registers the time of failure,
+ // keeps throwing away messages until the retry latch becomes open (RetryLatchClosedFor)
+ endpoints.markFailed(sender, Deadline.now)
+ Stop
+ }
+
+ override val supervisorStrategy = OneForOneStrategy() {
+ case _: EndpointException failureStrategy
+ case NonFatal(_) failureStrategy
+ }
+
+ override def preStart() {
+ // Prune old latch entries (if latching is enabled) to avoid memleaks
+ if (retryLatchEnabled) {
+ context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)(context.system.dispatcher)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Why use the default dispatcher and not the actors' dispatcher?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ }
+
+ override val supervisorStrategy = OneForOneStrategy() {
+ case _: EndpointException failureStrategy
+ case NonFatal(_) failureStrategy
+ }
+
+ override def preStart() {
+ // Prune old latch entries (if latching is enabled) to avoid memleaks
+ if (retryLatchEnabled) {
+ context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)(context.system.dispatcher)
+ }
+ }
+
+ def receive = {
+ case Listen(transport) {
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

no need for braces

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ override def preStart() {
+ // Prune old latch entries (if latching is enabled) to avoid memleaks
+ if (retryLatchEnabled) {
+ context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)(context.system.dispatcher)
+ }
+ }
+
+ def receive = {
+ case Listen(transport) {
+ this.transport = transport
+ startupFuture = sender
+ connector.listen(self)
+ }
+
+ case ConnectorInitialized(address) {
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

no need for braces

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ import akka.actor.SupervisorStrategy._
+ import actmote.TransportConnector.IncomingConnection
+ import HeadActor._
+
+ val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
+ val eventStream = context.system.eventStream
+ val log = Logging(eventStream, "HeadActor")
+
+ private var address: Address = _
+ // Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections
+ // will be not part of this map!
+ val endpoints = new EndpointRegistry()
+
+ private var transport: RemoteTransport = _
+ private var startupFuture: ActorRef = _
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Name seems wrong here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ def receive = {
+ case Listen(transport) {
+ this.transport = transport
+ startupFuture = sender
+ connector.listen(self)
+ }
+
+ case ConnectorInitialized(address) {
+ this.address = address
+ startupFuture ! address
+ notifier.remoteServerStarted()
+ }
+
+ case ConnectorFailed(reason) {
+ notifier.remoteServerError(reason)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Order of notifications and other code in these cases seem inconsistent

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ val endpoint = createEndpoint(recipientAddress, None)
+ endpoints.markPass(endpoint)
+ endpoint ! s
+ } else {
+ log.warning("Endpoint failed earlier and retry latch is not open yet; dropping message: {}", message)
+ extendedSystem.deadLetters ! message
+ }
+ case None {
+ val endpoint = createEndpoint(recipientAddress, None)
+ endpoints.registerEndpoint(recipientAddress, endpoint)
+ endpoint ! s
+ }
+ }
+ }
+
+ case IncomingConnection(handle) {
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Historical nomenclature has been "inbound" & "outbound"

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ }
+
+ case Prune {
+ endpoints.prune(settings.RetryLatchClosedFor)
+ }
+ }
+
+ private def createEndpoint(remoteAddress: Address, handleOption: Option[TransportConnectorHandle]) = {
+ val endpoint = context.actorOf(Props(new EndpointActor(
+ notifier,
+ connector,
+ remoteSettings,
+ settings,
+ address,
+ remoteAddress,
+ handleOption)).withDispatcher("akka.actor.default-stash-dispatcher"))
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Add a TODO to switch to a different dispatcher

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
@rkuhn
Akka Project member
rkuhn added a note Sep 5, 2012

one more thing: I think it would make messages more easy to understand if these actors had names corresponding to the address they are serving

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ notifier.remoteServerShutdown()
+ } catch {
+ case NonFatal(e) {
+ notifier.remoteServerError(e)
+ log.error(e, "Unable to shut down the underlying TransportConnector")
+ }
+ }
+ }
+
+}
+
+object EndpointActor {
+ case object AttemptConnect
+}
+
+class EndpointException(remoteAddress: Address, msg: String, cause: Throwable) extends Exception(msg + "; remoteAddress = " + remoteAddress, cause)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

A RuntimeException is probably more appropriate here

@rkuhn
Akka Project member
rkuhn added a note Sep 5, 2012

or an AkkaException even

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ }
+ }
+ }
+
+}
+
+object EndpointActor {
+ case object AttemptConnect
+}
+
+class EndpointException(remoteAddress: Address, msg: String, cause: Throwable) extends Exception(msg + "; remoteAddress = " + remoteAddress, cause)
+class EndpointWriteException(remoteAddress: Address, msg: String, cause: Throwable) extends EndpointException(remoteAddress, msg, cause)
+class EndpointCloseException(remoteAddress: Address, msg: String, cause: Throwable) extends EndpointException(remoteAddress, msg, cause)
+class EndpointOpenException(remoteAddress: Address, msg: String, cause: Throwable) extends EndpointException(remoteAddress, msg, cause)
+class EndpointConnectionFailedException(remoteAddress: Address, msg: String, cause: Throwable) extends EndpointException(remoteAddress, msg, cause)
+class EndpointConnectorProtocolViolated(remoteAddress: Address, msg: String) extends EndpointException(remoteAddress, msg, null)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Why create all these types?

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
@rkuhn
Akka Project member
rkuhn added a note Sep 5, 2012

having different types for different failures (which are then actually distinguished by the actor) is definitely good style, because then we can fully use supervisorStrategy to decide what to do

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ val remoteSettings: RemoteSettings,
+ val settings: ActorManagedRemotingSettings,
+ val address: Address,
+ val remoteAddress: Address,
+ private var handleOption: Option[TransportConnectorHandle]) extends Actor with Stash {
+
+ import EndpointActor._
+
+ val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
+ val eventStream = extendedSystem.eventStream
+
+ val log = Logging(eventStream, "EndpointActor(remote = " + remoteAddress + ")")
+ val isServer = handleOption.isDefined
+
+ // TODO: Propagate it to the stash size
+ val queueLimit: Int = settings.PreConnectBufferSize
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Why the local vla?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ handle.open(self)
+ notifier.remoteServerClientConnected(remoteAddress)
+ context.become(connected)
+ }
+ case None {
+ notifier.remoteClientStarted(remoteAddress)
+ // We retry passive connections as well
+ self ! AttemptConnect
+ }
+ }
+ }
+
+ // Unconnected state
+ def receive = {
+ case AttemptConnect attemptConnect()
+ case s @ Send(msg, _, _) stash()
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

this match is unrelated to the action

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ } catch {
+ case NonFatal(reason) {
+ notifyError(reason)
+ log.error(reason, "failure while shutting down endpoint for [{}]", remoteAddress)
+ }
+ }
+ }
+
+ override def unhandled(message: Any) {
+ throw new EndpointConnectorProtocolViolated(remoteAddress, "Endpoint <-> Connector protocol violated; unexpected message: " + message)
+ }
+
+ private def backoff() {
+ backingOff = true
+ context.system.scheduler.scheduleOnce(settings.FlowControlBackoff) { backingOff = false }(context.dispatcher)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

This breaks actor encapsulation by writing to an internal field from outside of the receive

@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

could you send yourself a message instead?

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...kka/remote/actmote/ActorManagedRemotingSettings.scala
@@ -0,0 +1,17 @@
+package akka.remote.actmote
+
+import com.typesafe.config.Config
+import concurrent.util.Duration
+
+private[akka] class ActorManagedRemotingSettings(config: Config) {
+
+ import config._
+ val Connector: String = getString("akka.remote.managed.connector")
+ val UsePassiveConnections: Boolean = getBoolean("akka.remote.managed.use-passive-connections")
+ val StartupTimeout: Duration = Duration.fromNanos(getNanoseconds("akka.remote.managed.startup-timeout"))
+ val ShutdownTimeout: Duration = Duration.fromNanos(getNanoseconds("akka.remote.managed.shutdown-timeout"))
+ val PreConnectBufferSize: Int = getInt("akka.remote.managed.preconnect-buffer-size") //TODO: rename
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Use getBytes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
@@ -0,0 +1,267 @@
+package akka.remote.actmote
+
+import akka.remote._
+import actmote.DummyHandle.HandleState
+import akka.remote.netty.NettySettings
+import akka.actor._
+import akka.remote.RemoteProtocol._
+import akka.serialization.Serialization
+import actmote.TransportConnector._
+
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Pausing the review here, it's 3am..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ }
+
+ // This is not supported as these will be removed from API
+ //case ShutdownEndpoint(address) ⇒
+ //case RestartEndpoint(address) ⇒
+ case s @ Send(message, senderOption, recipientRef) {
+ val recipientAddress = recipientRef.path.address
+
+ endpoints.getEndpointWithPolicy(recipientAddress) match {
+ case Some(Pass(endpoint)) endpoint ! s
+ case Some(Latched(timeOfFailure)) if (retryLatchOpen(timeOfFailure)) {
+ val endpoint = createEndpoint(recipientAddress, None)
+ endpoints.markPass(endpoint)
+ endpoint ! s
+ } else {
+ log.warning("Endpoint failed earlier and retry latch is not open yet; dropping message: {}", message)
@rkuhn
Akka Project member
rkuhn added a note Sep 5, 2012

I think we talked about it, but just to make sure that it does not get lost: don't log messages which go to deadLetters

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn rkuhn commented on the diff Sep 5, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ case ConnectorFailed(reason) {
+ notifier.remoteServerError(reason)
+ startupFuture ! Status.Failure(reason)
+ }
+
+ // This is not supported as these will be removed from API
+ //case ShutdownEndpoint(address) ⇒
+ //case RestartEndpoint(address) ⇒
+ case s @ Send(message, senderOption, recipientRef) {
+ val recipientAddress = recipientRef.path.address
+
+ endpoints.getEndpointWithPolicy(recipientAddress) match {
+ case Some(Pass(endpoint)) endpoint ! s
+ case Some(Latched(timeOfFailure)) if (retryLatchOpen(timeOfFailure)) {
+ val endpoint = createEndpoint(recipientAddress, None)
+ endpoints.markPass(endpoint)
@rkuhn
Akka Project member
rkuhn added a note Sep 5, 2012

this will not work: the newly created endpoint is not yet known to the registry

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+
+ @volatile var responsibleActor: ActorRef = _
+
+ private var _isTerminated = false
+ private var handles = List[DummyHandle]() //TODO: unsynchronized
+
+ def isTerminated = lock synchronized {
+ _isTerminated && handles.forall(_.state == DummyHandle.Closed)
+ }
+
+ val dummyMedium = DummyTransportMedium()
+
+ // Access should be synchronized?
+
+ override def listen(responsibleActor: ActorRef) {
+ this.responsibleActor = responsibleActor
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

this should be idempotent if it only can be called once

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+ private var handles = List[DummyHandle]() //TODO: unsynchronized
+
+ def isTerminated = lock synchronized {
+ _isTerminated && handles.forall(_.state == DummyHandle.Closed)
+ }
+
+ val dummyMedium = DummyTransportMedium()
+
+ // Access should be synchronized?
+
+ override def listen(responsibleActor: ActorRef) {
+ this.responsibleActor = responsibleActor
+ responsibleActor ! ConnectorInitialized(address)
+ }
+
+ val address = Address("akka", provider.remoteSettings.systemName, settings.Hostname, settings.PortSelector) // Does not handle dynamic ports, used only for testing
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Constants like these are always nice to group at the top of the class

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+ // Access should be synchronized?
+
+ override def listen(responsibleActor: ActorRef) {
+ this.responsibleActor = responsibleActor
+ responsibleActor ! ConnectorInitialized(address)
+ }
+
+ val address = Address("akka", provider.remoteSettings.systemName, settings.Hostname, settings.PortSelector) // Does not handle dynamic ports, used only for testing
+
+ dummyMedium.registerTransport(address, this)
+
+ override def shutdown() {
+ // Remove locks if possible
+ lock.synchronized {
+ if (_isTerminated) throw new IllegalStateException("Cannot shutdown: already terminated")
+ //handles foreach { _.close }
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

Why commented out?

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+
+ dummyMedium.registerTransport(address, this)
+
+ override def shutdown() {
+ // Remove locks if possible
+ lock.synchronized {
+ if (_isTerminated) throw new IllegalStateException("Cannot shutdown: already terminated")
+ //handles foreach { _.close }
+ _isTerminated = true
+ }
+ }
+
+ override def connect(remote: Address, responsibleActorForConnection: ActorRef) {
+ if (_isTerminated) throw new IllegalStateException("Cannot connect: already terminated")
+
+ dummyMedium.logConnect(address -> remote)
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

why Tuple2 instead of 2 params?

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

You can use named parameters instead:

def connect(from: Address, to: Address) = …

connect(from = ..., to = ...)
@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+
+ val address = Address("akka", provider.remoteSettings.systemName, settings.Hostname, settings.PortSelector) // Does not handle dynamic ports, used only for testing
+
+ dummyMedium.registerTransport(address, this)
+
+ override def shutdown() {
+ // Remove locks if possible
+ lock.synchronized {
+ if (_isTerminated) throw new IllegalStateException("Cannot shutdown: already terminated")
+ //handles foreach { _.close }
+ _isTerminated = true
+ }
+ }
+
+ override def connect(remote: Address, responsibleActorForConnection: ActorRef) {
+ if (_isTerminated) throw new IllegalStateException("Cannot connect: already terminated")
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

_isTerminated accessed outside of lock and isn't volatile. Also, name is confusing, perhaps something like _shutdownRequested

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+ // Remove locks if possible
+ lock.synchronized {
+ if (_isTerminated) throw new IllegalStateException("Cannot shutdown: already terminated")
+ //handles foreach { _.close }
+ _isTerminated = true
+ }
+ }
+
+ override def connect(remote: Address, responsibleActorForConnection: ActorRef) {
+ if (_isTerminated) throw new IllegalStateException("Cannot connect: already terminated")
+
+ dummyMedium.logConnect(address -> remote)
+
+ // Instead of using different sets, use a map of options and patter matching instead of ifs
+ if (dummyMedium.shouldCrash(address)) {
+ throw new NullPointerException
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

?

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+ }
+ }
+
+ override def connect(remote: Address, responsibleActorForConnection: ActorRef) {
+ if (_isTerminated) throw new IllegalStateException("Cannot connect: already terminated")
+
+ dummyMedium.logConnect(address -> remote)
+
+ // Instead of using different sets, use a map of options and patter matching instead of ifs
+ if (dummyMedium.shouldCrash(address)) {
+ throw new NullPointerException
+ } else if (dummyMedium.shouldReject(address)) {
+ responsibleActorForConnection ! ConnectionFailed(new IllegalStateException("Rejected"))
+ } else if (!dummyMedium.shouldDrop(address)) {
+ dummyMedium.registerConnection(address -> remote, this, responsibleActorForConnection) match {
+ case Some((localHandle, remoteHandle)) {
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

braces not needed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+ }
+
+ override def connect(remote: Address, responsibleActorForConnection: ActorRef) {
+ if (_isTerminated) throw new IllegalStateException("Cannot connect: already terminated")
+
+ dummyMedium.logConnect(address -> remote)
+
+ // Instead of using different sets, use a map of options and patter matching instead of ifs
+ if (dummyMedium.shouldCrash(address)) {
+ throw new NullPointerException
+ } else if (dummyMedium.shouldReject(address)) {
+ responsibleActorForConnection ! ConnectionFailed(new IllegalStateException("Rejected"))
+ } else if (!dummyMedium.shouldDrop(address)) {
+ dummyMedium.registerConnection(address -> remote, this, responsibleActorForConnection) match {
+ case Some((localHandle, remoteHandle)) {
+ handles ::= localHandle
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

A Sequence might not be a proper data structure for this, a Set perhaps?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...ala/akka/remote/actmote/DummyTransportConnector.scala
+ if (_isTerminated) throw new IllegalStateException("Cannot connect: already terminated")
+
+ dummyMedium.logConnect(address -> remote)
+
+ // Instead of using different sets, use a map of options and patter matching instead of ifs
+ if (dummyMedium.shouldCrash(address)) {
+ throw new NullPointerException
+ } else if (dummyMedium.shouldReject(address)) {
+ responsibleActorForConnection ! ConnectionFailed(new IllegalStateException("Rejected"))
+ } else if (!dummyMedium.shouldDrop(address)) {
+ dummyMedium.registerConnection(address -> remote, this, responsibleActorForConnection) match {
+ case Some((localHandle, remoteHandle)) {
+ handles ::= localHandle
+ responsibleActorForConnection ! ConnectionInitialized(localHandle)
+ }
+ case None responsibleActorForConnection ! ConnectionFailed(new IllegalArgumentException("Remote address does not reachable"))
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

I'm skipping the rest of the Dummy* as I assume this code will be dropped later in the process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...main/scala/akka/remote/actmote/MessageEncodings.scala
@@ -0,0 +1,43 @@
+package akka.remote.actmote
+
+import akka.remote.RemoteProtocol._
+import akka.actor._
+import akka.serialization.Serialization
+import akka.remote.MessageSerializer
+
+trait MessageEncodings {
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

I propose "RemoteMessageEncoder"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...c/main/scala/akka/remote/actmote/NettyConnector.scala
@@ -0,0 +1,525 @@
+package akka.remote.actmote
+
+import akka.actor._
+import akka.remote._
+import actmote.TransportConnector._
+import actmote.TransportConnector.ConnectionFailed
+import actmote.TransportConnector.ConnectionInitialized
+import actmote.TransportConnector.ConnectorFailed
+import actmote.TransportConnector.ConnectorInitialized
+import actmote.TransportConnector.IncomingConnection
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

All of these imports are superfluous (after ._)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...c/main/scala/akka/remote/actmote/NettyConnector.scala
@@ -0,0 +1,525 @@
+package akka.remote.actmote
+
+import akka.actor._
+import akka.remote._
+import actmote.TransportConnector._
+import actmote.TransportConnector.ConnectionFailed
+import actmote.TransportConnector.ConnectionInitialized
+import actmote.TransportConnector.ConnectorFailed
+import actmote.TransportConnector.ConnectorInitialized
+import actmote.TransportConnector.IncomingConnection
+import akka.remote.netty._
+import org.jboss.netty.util._
+import org.jboss.netty.channel._
+import akka.event.Logging
+import group._
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

We never use relative imports, they are confusing and order-dependent

@drewhk
Akka Project member
drewhk added a note Sep 5, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...c/main/scala/akka/remote/actmote/NettyConnector.scala
+import group._
+import org.jboss.netty.handler.timeout._
+import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
+import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
+import com.google.protobuf.MessageLite
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder
+import java.net.{ InetAddress, InetSocketAddress }
+import org.jboss.netty.bootstrap.{ ClientBootstrap, ServerBootstrap }
+import akka.remote.RemoteProtocol._
+import socket.nio.{ NioClientSocketChannelFactory, NioServerSocketChannelFactory }
+import java.util.concurrent.{ TimeUnit, Executors }
+import util.control.NonFatal
+import scala.Some
+import akka.actor.DeadLetter
+import org.jboss.netty.handler.ssl.SslHandler
+import scala.Some
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

scala._ is always imported by default, and here scala.Some is imported explicitly, twice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 5, 2012
...c/main/scala/akka/remote/actmote/NettyConnector.scala
+class ConnectionCancelledException(msg: String) extends Exception(msg)
+class CleanShutdownFailedException extends Exception("Connector was unable to cleanly shut down")
+
+class NettyConnector(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends TransportConnector(_system, _provider) with MessageEncodings {
+ @volatile var responsibleActor: ActorRef = _
+ @volatile var address: Address = _
+
+ private[akka] val settings = new NettySettings(provider.remoteSettings.config.getConfig("akka.remote.netty"), provider.remoteSettings.systemName)
+
+ // TODO replace by system.scheduler
+ private val timer: HashedWheelTimer = new HashedWheelTimer(system.threadFactory)
+ lazy val log = Logging(system.eventStream, "NettyConnector")
+ /**
+ * Backing scaffolding for the default implementation of NettyRemoteSupport.createPipeline.
+ */
+ object PipelineFactory {
@viktorklang
Akka Project member
viktorklang added a note Sep 5, 2012

object-in-class why?

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
@viktorklang
Akka Project member
viktorklang added a note Sep 6, 2012

It's not the fact that it is an object that is a problem, but why is it an object within a class?

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
@viktorklang
Akka Project member
viktorklang added a note Sep 6, 2012

If it was like that already then just leave it be, I just got curious :-)

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
@viktorklang
Akka Project member
viktorklang added a note Sep 6, 2012

Best solution is probably to pull it out and use the same version from both the NettyConnector and the NettyRemoteTransport

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@viktorklang viktorklang commented on the diff Sep 6, 2012
...c/main/scala/akka/remote/actmote/NettyConnector.scala
+import com.google.protobuf.MessageLite
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder
+import java.net.{ InetAddress, InetSocketAddress }
+import org.jboss.netty.bootstrap.{ ClientBootstrap, ServerBootstrap }
+import akka.remote.RemoteProtocol._
+import socket.nio.{ NioClientSocketChannelFactory, NioServerSocketChannelFactory }
+import java.util.concurrent.{ TimeUnit, Executors }
+import util.control.NonFatal
+import scala.Some
+import akka.actor.DeadLetter
+import org.jboss.netty.handler.ssl.SslHandler
+import scala.Some
+import akka.actor.DeadLetter
+
+private[akka] object ChannelHandle extends ChannelLocal[NettyConnectorHandle] {
+ override def initialValue(channel: Channel) = null
@viktorklang
Akka Project member
viktorklang added a note Sep 6, 2012

Isn't the initial value always null?

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
@@ -0,0 +1,487 @@
+package akka.remote.actmote
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

I guess actmote is just a temporary code name?

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ classOf[ExtendedActorSystem] -> system,
+ classOf[RemoteActorRefProvider] -> provider)
+
+ system.dynamicAccess.createInstanceFor[TransportConnector](fqn, args) match {
+ case Left(problem) {
+ throw new RemoteTransportException("Could not load transport connector " + fqn, problem)
+ }
+ case Right(connector) connector
+ }
+ }
+
+ lazy val log: LoggingAdapter = Logging(system.eventStream, "ActorManagedRemoting(" + address + ")")
+
+ def shutdown() {
+ if (headActor != null) {
+ try {
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012
if (headActor != null) try {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ classOf[RemoteActorRefProvider] -> provider)
+
+ system.dynamicAccess.createInstanceFor[TransportConnector](fqn, args) match {
+ case Left(problem) {
+ throw new RemoteTransportException("Could not load transport connector " + fqn, problem)
+ }
+ case Right(connector) connector
+ }
+ }
+
+ lazy val log: LoggingAdapter = Logging(system.eventStream, "ActorManagedRemoting(" + address + ")")
+
+ def shutdown() {
+ if (headActor != null) {
+ try {
+ val stopped: Future[Boolean] = gracefulStop(headActor, 5 seconds)(system)
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

magic value: 5 seconds
same as managedRemoteSettings.ShutdownTimeout?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ } catch {
+ case e: akka.pattern.AskTimeoutException log.warning("Shutdown timed out")
+ case NonFatal(e) log.error(e, "Shutdown failed")
+ }
+ }
+ }
+
+ // Start assumes that it cannot be followed by another start() without having a shutdown() first
+ def start() {
+ log.info("Starting remoting")
+ if (headActor eq null) {
+ connector = loadConnector
+ // NOTE: Notifier will use the logger of this class
+ val notifier = new DefaultLifeCycleNotifier(provider.remoteSettings, this, system.eventStream, log)
+ headActor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new HeadActor(provider.remoteSettings, connector, managedRemoteSettings, notifier)), HeadActorName)
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

what if multiple threads do start at the same time?
do we need to support that?

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ case e: akka.pattern.AskTimeoutException log.warning("Shutdown timed out")
+ case NonFatal(e) log.error(e, "Shutdown failed")
+ }
+ }
+ }
+
+ // Start assumes that it cannot be followed by another start() without having a shutdown() first
+ def start() {
+ log.info("Starting remoting")
+ if (headActor eq null) {
+ connector = loadConnector
+ // NOTE: Notifier will use the logger of this class
+ val notifier = new DefaultLifeCycleNotifier(provider.remoteSettings, this, system.eventStream, log)
+ headActor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new HeadActor(provider.remoteSettings, connector, managedRemoteSettings, notifier)), HeadActorName)
+
+ val timeout = new Timeout(managedRemoteSettings.StartupTimeout)
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

use apply of the case class Timeout instead of new

val timeout = Timeout(
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+ protected def notifyListeners(message: RemoteLifeCycleEvent): Unit = {
+ eventStream.publish(message)
+ if (logRemoteLifeCycleEvents) log.log(message.logLevel, "{}", message)
+ }
+
+ def remoteClientError(reason: Throwable, remoteAddress: Address) { notifyListeners(RemoteClientError(reason, remoteTransport, remoteAddress)) }
+ def remoteClientDisconnected(remoteAddress: Address) { notifyListeners(RemoteClientDisconnected(remoteTransport, remoteAddress)) }
+ def remoteClientConnected(remoteAddress: Address) { notifyListeners(RemoteClientConnected(remoteTransport, remoteAddress)) }
+ def remoteClientStarted(remoteAddress: Address) { notifyListeners(RemoteClientStarted(remoteTransport, remoteAddress)) }
+ def remoteClientShutdown(remoteAddress: Address) { notifyListeners(RemoteClientShutdown(remoteTransport, remoteAddress)) }
+
+ def remoteServerStarted() { notifyListeners(RemoteServerStarted(remoteTransport)) }
+ def remoteServerShutdown() { notifyListeners(RemoteServerShutdown(remoteTransport)) }
+ def remoteServerError(reason: Throwable) { notifyListeners(RemoteServerError(reason, remoteTransport)) }
+ def remoteServerClientConnected(remoteAddress: Address) { notifyListeners(RemoteServerClientConnected(remoteTransport, Some(remoteAddress))) }
+ def remoteServerClientDisconnected(remoteAddress: Address) { notifyListeners(RemoteServerClientDisconnected(remoteTransport, Some(remoteAddress))) }
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

I think we have a convention to use max 120 chars lines.
Not followed everywhere though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+object HeadActor {
+ sealed trait EndpointPolicy
+ case class Pass(endpoint: ActorRef) extends EndpointPolicy
+ case class Latched(timeOfFailure: Deadline) extends EndpointPolicy
+ //case class PassPassive(endpointOption: Option[ActorRef])
+
+ case object Prune
+
+ class EndpointRegistry {
+ private val addressToEndpointAndPolicy = scala.collection.mutable.Map[Address, EndpointPolicy]()
+ private val endpointToAddress = scala.collection.mutable.Map[ActorRef, Address]()
+
+ def getEndpointWithPolicy(address: Address) = addressToEndpointAndPolicy.get(address)
+
+ def prune(pruneAge: Duration) {
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

: Unit = {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ // TODO: Propagate it to the stash size
+ val queueLimit: Int = settings.PreConnectBufferSize
+ private var backingOff = false
+
+ private def notifyError(reason: Throwable) {
+ if (isServer) {
+ notifier.remoteServerError(reason)
+ } else {
+ notifier.remoteClientError(reason, remoteAddress)
+ }
+ }
+
+ override def postRestart(reason: Throwable) {
+ // Clear handle to force reconnect
+ handleOption = None
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

so, after a restart you don't want to use the handleOption passed in to the constructor, but still use isServer derived from the constructor param?

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
.../scala/akka/remote/actmote/ActorManagedRemoting.scala
+
+ if (!isServer) {
+ notifier.remoteClientDisconnected(remoteAddress)
+ notifier.remoteClientShutdown(remoteAddress)
+ }
+
+ } catch {
+ case NonFatal(reason) {
+ notifyError(reason)
+ log.error(reason, "failure while shutting down endpoint for [{}]", remoteAddress)
+ }
+ }
+ }
+
+ override def unhandled(message: Any) {
+ throw new EndpointConnectorProtocolViolated(remoteAddress, "Endpoint <-> Connector protocol violated; unexpected message: " + message)
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

perhaps call super.unhandled first?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
...in/scala/akka/remote/actmote/TransportConnector.scala
+ * @param remoteMessage the incoming message to be dispatched to the correct actor
+ * @param log
+ */
+ // TODO: Think about the visibility of this method
+ final def dispatchMessage(remoteMessage: RemoteMessage, log: LoggingAdapter): Unit = {
+ val useUntrustedMode = provider.remoteSettings.UntrustedMode
+ val log = provider.log
+ val remoteDaemon = provider.remoteDaemon
+
+ remoteMessage.recipient match {
+ case `remoteDaemon`
+ if (provider.remoteSettings.LogReceive) log.debug("received daemon message {}", remoteMessage)
+ remoteMessage.payload match {
+ case m @ (_: DaemonMsg | _: Terminated)
+ try remoteDaemon ! m catch {
+ case e: Exception log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender)
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

do we need catch around !
in that case it should perhaps be case NonFatal(e)

@viktorklang
Akka Project member
viktorklang added a note Sep 6, 2012

no, ! should not throw exceptions in 2.1

@drewhk
Akka Project member
drewhk added a note Sep 6, 2012
@viktorklang
Akka Project member
viktorklang added a note Sep 6, 2012

Sure thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
...in/scala/akka/remote/actmote/TransportConnector.scala
+ */
+ // TODO: Think about the visibility of this method
+ final def dispatchMessage(remoteMessage: RemoteMessage, log: LoggingAdapter): Unit = {
+ val useUntrustedMode = provider.remoteSettings.UntrustedMode
+ val log = provider.log
+ val remoteDaemon = provider.remoteDaemon
+
+ remoteMessage.recipient match {
+ case `remoteDaemon`
+ if (provider.remoteSettings.LogReceive) log.debug("received daemon message {}", remoteMessage)
+ remoteMessage.payload match {
+ case m @ (_: DaemonMsg | _: Terminated)
+ try remoteDaemon ! m catch {
+ case e: Exception log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender)
+ }
+ case x log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender)
@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

rely on unhandled instead?

@patriknw
Akka Project member
patriknw added a note Sep 6, 2012

sorry, this was not an actor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw commented on the diff Sep 6, 2012
...in/scala/akka/remote/actmote/TransportConnector.scala
+import akka.event.LoggingAdapter
+
+// TODO: Have a TestKit for Connectors
+
+/**
+ * Contains all the event classes that a [[akka.remote.actmote.TransportConnector]] or
+ * [[akka.remote.actmote.TransportConnectorHandle]] may send to their corresponding actors.
+ */
+object TransportConnector {
+
+ /**
+ * Base trait for all the connector events.
+ */
+ sealed trait ConnectorEvent
+