Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip nat friendly remote support #396

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions akka-remote/src/main/resources/reference.conf
Expand Up @@ -69,6 +69,13 @@ akka {
# If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged # If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged
log-sent-messages = off log-sent-messages = off


# public-addresses will allow incoming messages where the destination host and port in the message does not
# match the host and port of this node, but is present in public-addresses.
#
# Useful in cases where the actor system is for example, behind an ELB on EC2.

public-addresses = []

# Each property is annotated with (I) or (O) or (I&O), where I stands for “inbound” and O for “outbound” connections. # Each property is annotated with (I) or (O) or (I&O), where I stands for “inbound” and O for “outbound” connections.
# The NettyRemoteTransport always starts the server role to allow inbound connections, and it starts # The NettyRemoteTransport always starts the server role to allow inbound connections, and it starts
# active client connections whenever sending to a destination which is not yet connected; if configured # active client connections whenever sending to a destination which is not yet connected; if configured
Expand Down
10 changes: 5 additions & 5 deletions akka-remote/src/main/scala/akka/remote/RemoteSettings.scala
Expand Up @@ -6,17 +6,17 @@ package akka.remote
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import java.net.InetAddress import collection.JavaConverters._
import akka.config.ConfigurationException
import scala.collection.JavaConverters._
import akka.actor.Address
import akka.actor.AddressFromURIString


class RemoteSettings(val config: Config, val systemName: String) { class RemoteSettings(val config: Config, val systemName: String) {

import config._ import config._

val RemoteTransport = getString("akka.remote.transport") val RemoteTransport = getString("akka.remote.transport")
val LogReceive = getBoolean("akka.remote.log-received-messages") val LogReceive = getBoolean("akka.remote.log-received-messages")
val LogSend = getBoolean("akka.remote.log-sent-messages") val LogSend = getBoolean("akka.remote.log-sent-messages")
val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
val UntrustedMode = getBoolean("akka.remote.untrusted-mode") val UntrustedMode = getBoolean("akka.remote.untrusted-mode")
val PublicAddresses = getStringList("akka.remote.public-addresses").asScala.toSet

} }
27 changes: 24 additions & 3 deletions akka-remote/src/main/scala/akka/remote/RemoteTransport.scala
Expand Up @@ -5,13 +5,12 @@
package akka.remote package akka.remote


import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressFromURIString, Address, ActorSystemImpl, ActorSystem, ActorRef }
import akka.dispatch.SystemMessage import akka.dispatch.SystemMessage
import akka.event.{ LoggingAdapter, Logging } import akka.event.{ LoggingAdapter, Logging }
import akka.AkkaException import akka.AkkaException
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol._
import akka.dispatch.ChildTerminated import akka.actor._


/** /**
* Remote life-cycle events. * Remote life-cycle events.
Expand All @@ -32,6 +31,7 @@ case class RemoteClientError(
@transient @BeanProperty remote: RemoteTransport, @transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.ErrorLevel override def logLevel = Logging.ErrorLevel

override def toString = override def toString =
"RemoteClientError@" + remoteAddress + ": Error[" + cause + "]" "RemoteClientError@" + remoteAddress + ": Error[" + cause + "]"
} }
Expand All @@ -40,6 +40,7 @@ case class RemoteClientDisconnected(
@transient @BeanProperty remote: RemoteTransport, @transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.DebugLevel override def logLevel = Logging.DebugLevel

override def toString = override def toString =
"RemoteClientDisconnected@" + remoteAddress "RemoteClientDisconnected@" + remoteAddress
} }
Expand All @@ -48,6 +49,7 @@ case class RemoteClientConnected(
@transient @BeanProperty remote: RemoteTransport, @transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.DebugLevel override def logLevel = Logging.DebugLevel

override def toString = override def toString =
"RemoteClientConnected@" + remoteAddress "RemoteClientConnected@" + remoteAddress
} }
Expand All @@ -56,6 +58,7 @@ case class RemoteClientStarted(
@transient @BeanProperty remote: RemoteTransport, @transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.InfoLevel override def logLevel = Logging.InfoLevel

override def toString = override def toString =
"RemoteClientStarted@" + remoteAddress "RemoteClientStarted@" + remoteAddress
} }
Expand All @@ -64,6 +67,7 @@ case class RemoteClientShutdown(
@transient @BeanProperty remote: RemoteTransport, @transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.InfoLevel override def logLevel = Logging.InfoLevel

override def toString = override def toString =
"RemoteClientShutdown@" + remoteAddress "RemoteClientShutdown@" + remoteAddress
} }
Expand All @@ -74,6 +78,7 @@ case class RemoteClientWriteFailed(
@transient @BeanProperty remote: RemoteTransport, @transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.WarningLevel override def logLevel = Logging.WarningLevel

override def toString = override def toString =
"RemoteClientWriteFailed@" + remoteAddress + "RemoteClientWriteFailed@" + remoteAddress +
": MessageClass[" + (if (request ne null) request.getClass.getName else "no message") + ": MessageClass[" + (if (request ne null) request.getClass.getName else "no message") +
Expand All @@ -88,13 +93,15 @@ trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent
case class RemoteServerStarted( case class RemoteServerStarted(
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.InfoLevel override def logLevel = Logging.InfoLevel

override def toString = override def toString =
"RemoteServerStarted@" + remote "RemoteServerStarted@" + remote
} }


case class RemoteServerShutdown( case class RemoteServerShutdown(
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.InfoLevel override def logLevel = Logging.InfoLevel

override def toString = override def toString =
"RemoteServerShutdown@" + remote "RemoteServerShutdown@" + remote
} }
Expand All @@ -103,6 +110,7 @@ case class RemoteServerError(
@BeanProperty val cause: Throwable, @BeanProperty val cause: Throwable,
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.ErrorLevel override def logLevel = Logging.ErrorLevel

override def toString = override def toString =
"RemoteServerError@" + remote + "] Error[" + cause + "]" "RemoteServerError@" + remote + "] Error[" + cause + "]"
} }
Expand All @@ -111,6 +119,7 @@ case class RemoteServerClientConnected(
@transient @BeanProperty remote: RemoteTransport, @transient @BeanProperty remote: RemoteTransport,
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel override def logLevel = Logging.DebugLevel

override def toString = override def toString =
"RemoteServerClientConnected@" + remote + "RemoteServerClientConnected@" + remote +
": Client[" + clientAddress.getOrElse("no address") + "]" ": Client[" + clientAddress.getOrElse("no address") + "]"
Expand All @@ -120,6 +129,7 @@ case class RemoteServerClientDisconnected(
@transient @BeanProperty remote: RemoteTransport, @transient @BeanProperty remote: RemoteTransport,
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel override def logLevel = Logging.DebugLevel

override def toString = override def toString =
"RemoteServerClientDisconnected@" + remote + "RemoteServerClientDisconnected@" + remote +
": Client[" + clientAddress.getOrElse("no address") + "]" ": Client[" + clientAddress.getOrElse("no address") + "]"
Expand All @@ -129,6 +139,7 @@ case class RemoteServerClientClosed(
@transient @BeanProperty remote: RemoteTransport, @transient @BeanProperty remote: RemoteTransport,
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel override def logLevel = Logging.DebugLevel

override def toString = override def toString =
"RemoteServerClientClosed@" + remote + "RemoteServerClientClosed@" + remote +
": Client[" + clientAddress.getOrElse("no address") + "]" ": Client[" + clientAddress.getOrElse("no address") + "]"
Expand Down Expand Up @@ -183,7 +194,7 @@ abstract class RemoteTransport {
*/ */
def restartClientConnection(address: Address): Boolean def restartClientConnection(address: Address): Boolean


/** Methods that needs to be implemented by a transport **/ /**Methods that needs to be implemented by a transport **/


protected[akka] def send(message: Any, protected[akka] def send(message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
Expand Down Expand Up @@ -288,9 +299,19 @@ trait RemoteMarshallingOps {
case AddressFromURIString(address) if address == provider.transport.address ⇒ case AddressFromURIString(address) if address == provider.transport.address ⇒
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
r.!(remoteMessage.payload)(remoteMessage.sender) r.!(remoteMessage.payload)(remoteMessage.sender)
case ActorPathExtractor(natAddress, elements) if natAddress.system == system.name ⇒
if (allow(natAddress)) system.actorFor(elements).tell(remoteMessage.payload, remoteMessage.sender)
else log.error("Firewall: dropping message {} for non-local recipient {} at {} local is {}", remoteMessage.payload, r, address, provider.transport.address)
case r ⇒ log.error("dropping message {} for non-local recipient {} at {} local is {}", remoteMessage.payload, r, address, provider.transport.address) case r ⇒ log.error("dropping message {} for non-local recipient {} at {} local is {}", remoteMessage.payload, r, address, provider.transport.address)
} }
case r ⇒ log.error("dropping message {} for non-local recipient {} of type {}", remoteMessage.payload, r, if (r ne null) r.getClass else "null") case r ⇒ log.error("dropping message {} for non-local recipient {} of type {}", remoteMessage.payload, r, if (r ne null) r.getClass else "null")
} }
} }

def allow(natAddress: Address): Boolean = {
val settings = provider.remoteSettings //have to do this to do the import or else err "stable identifier required"
import settings.PublicAddresses
if (natAddress.host.isEmpty || natAddress.port.isEmpty) false //Partial addresses are never OK
else PublicAddresses.nonEmpty && PublicAddresses.contains(natAddress.host.get + ":" + natAddress.port.get)
}
} }
@@ -0,0 +1,111 @@
package akka.remote

import akka.testkit._
import akka.dispatch.Await

import NATFirewallRemoteActorMultiJvmSpec._
import com.typesafe.config.ConfigFactory
import akka.actor.{ActorSystem, Actor, Props}
import akka.pattern.ask
import akka.util.duration._
import java.util.concurrent.TimeoutException


object NATFirewallRemoteActorMultiJvmSpec {

def NrOfNodes = 3

class SomeActor extends Actor with Serializable {
def receive = {
case "hi" ⇒ sender ! "hello"
}
}

def setup(addresses: String, host: String, port: Int): ActorSystem = {
val config = ConfigFactory.parseString("""
akka{
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote{
transport = "akka.remote.netty.NettyRemoteTransport"
public-addresses = [%s]
netty {
hostname = "%s"
port = %d
}
}
}
""".format(addresses, host, port))
val system = ActorSystem("nat", config)
system.actorOf(Props[SomeActor], "service-hello")
system

}


}

//empty public-addresses
class NATFirewallRemoteActorMultiJvmNode1 extends AkkaSpec(setup("", "0.0.0.0", 2552)) with MultiJvmSync {

val nodes = NrOfNodes

"___" must {
"___" in {
barrier("start")

barrier("done")
}
}
}

//public-addresses specified
class NATFirewallRemoteActorMultiJvmNode2 extends AkkaSpec(setup(""""127.0.0.1:3663"""", "0.0.0.0", 3663)) with MultiJvmSync {

val nodes = NrOfNodes

"___" must {
"___" in {
barrier("start")

barrier("done")
}
}
}

class NATFirewallRemoteActorMultiJvmNode3 extends AkkaSpec(setup("", "127.0.0.1", 6996)) with MultiJvmSync with DefaultTimeout {


val nodes = NrOfNodes

"NAT Firewall" must {
"allow or dissalow messages properly in" in {
barrier("start")
val actor1 = system.actorFor("akka://nat@127.0.0.1:2552/user/service-hello")
val actor2 = system.actorFor("akka://nat@127.0.0.1:3663/user/service-hello")

evaluating {
Await.result(actor1 ? "hi", 250 millis).asInstanceOf[String]
} must produce[TimeoutException]

Await.result(actor2 ? "hi", 250 millis).asInstanceOf[String] must be("hello")


val actor5 = system.actorFor("akka://notnat@127.0.0.1:2552/user/service-hello")
val actor6 = system.actorFor("akka://notnat@127.0.0.1:3663/user/service-hello")


evaluating {
Await.result(actor5 ? "hi", 250 millis).asInstanceOf[String]
} must produce[TimeoutException]

evaluating {
Await.result(actor6 ? "hi", 250 millis).asInstanceOf[String]
} must produce[TimeoutException]

barrier("done")
}
}
}

1 change: 1 addition & 0 deletions project/plugins.sbt
Expand Up @@ -12,3 +12,4 @@ resolvers ++= Seq(
"coda" at "http://repo.codahale.com") "coda" at "http://repo.codahale.com")


addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.1") addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.1")