Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Wip nat friendly remote support #396

Closed
wants to merge 7 commits into from

3 participants

@ticktock

updates based on Review of pull request #394

@viktorklang
Owner

Alright! So all that's missing now is tests. Create a couple of multi-jvm tests to make sure it works?

@ticktock

Cool, thanks Viktor, will create some tests.

@ticktock

Hey Viktor,

First pass at testing...thoughts?

ticktock@0d28ad3

@viktorklang
Owner

Hey Scott,

I thought about this some more, and I think I might have gotten carried away here.

Wouldn't we be able to solve the problem by just defining a global address and a local address? So you bind to the local address and you receive things going both to the local and the global address?

@viktorklang
Owner

Or is there a case where you'd need multiple public addresses? And what about the promiscuous mode, does that make sense in a real-world setting? (accepting any inbound message)

@viktorklang
Owner

Clsoing this pull request since Scott & I agreed this could be solved by having the notion of a global and a local address, but it needs to be carefully implemented so that it covers all bases (Addresses are tricky beasts to say the least)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
7 akka-remote/src/main/resources/reference.conf
@@ -69,6 +69,13 @@ akka {
# If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged
log-sent-messages = off
+ # public-addresses will allow incoming messages where the destination host and port in the message does not
+ # match the host and port of this node, but is present in public-addresses.
+ #
+ # Useful in cases where the actor system is for example, behind an ELB on EC2.
+
+ public-addresses = []
+
# Each property is annotated with (I) or (O) or (I&O), where I stands for “inbound” and O for “outbound” connections.
# The NettyRemoteTransport always starts the server role to allow inbound connections, and it starts
# active client connections whenever sending to a destination which is not yet connected; if configured
View
10 akka-remote/src/main/scala/akka/remote/RemoteSettings.scala
@@ -6,17 +6,17 @@ package akka.remote
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
-import java.net.InetAddress
-import akka.config.ConfigurationException
-import scala.collection.JavaConverters._
-import akka.actor.Address
-import akka.actor.AddressFromURIString
+import collection.JavaConverters._
class RemoteSettings(val config: Config, val systemName: String) {
+
import config._
+
val RemoteTransport = getString("akka.remote.transport")
val LogReceive = getBoolean("akka.remote.log-received-messages")
val LogSend = getBoolean("akka.remote.log-sent-messages")
val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
val UntrustedMode = getBoolean("akka.remote.untrusted-mode")
+ val PublicAddresses = getStringList("akka.remote.public-addresses").asScala.toSet
+
}
View
27 akka-remote/src/main/scala/akka/remote/RemoteTransport.scala
@@ -5,13 +5,12 @@
package akka.remote
import scala.reflect.BeanProperty
-import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressFromURIString, Address, ActorSystemImpl, ActorSystem, ActorRef }
import akka.dispatch.SystemMessage
import akka.event.{ LoggingAdapter, Logging }
import akka.AkkaException
import akka.serialization.Serialization
import akka.remote.RemoteProtocol._
-import akka.dispatch.ChildTerminated
+import akka.actor._
/**
* Remote life-cycle events.
@@ -32,6 +31,7 @@ case class RemoteClientError(
@transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
+
override def toString =
"RemoteClientError@" + remoteAddress + ": Error[" + cause + "]"
}
@@ -40,6 +40,7 @@ case class RemoteClientDisconnected(
@transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.DebugLevel
+
override def toString =
"RemoteClientDisconnected@" + remoteAddress
}
@@ -48,6 +49,7 @@ case class RemoteClientConnected(
@transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.DebugLevel
+
override def toString =
"RemoteClientConnected@" + remoteAddress
}
@@ -56,6 +58,7 @@ case class RemoteClientStarted(
@transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.InfoLevel
+
override def toString =
"RemoteClientStarted@" + remoteAddress
}
@@ -64,6 +67,7 @@ case class RemoteClientShutdown(
@transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.InfoLevel
+
override def toString =
"RemoteClientShutdown@" + remoteAddress
}
@@ -74,6 +78,7 @@ case class RemoteClientWriteFailed(
@transient @BeanProperty remote: RemoteTransport,
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.WarningLevel
+
override def toString =
"RemoteClientWriteFailed@" + remoteAddress +
": MessageClass[" + (if (request ne null) request.getClass.getName else "no message") +
@@ -88,6 +93,7 @@ trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent
case class RemoteServerStarted(
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.InfoLevel
+
override def toString =
"RemoteServerStarted@" + remote
}
@@ -95,6 +101,7 @@ case class RemoteServerStarted(
case class RemoteServerShutdown(
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.InfoLevel
+
override def toString =
"RemoteServerShutdown@" + remote
}
@@ -103,6 +110,7 @@ case class RemoteServerError(
@BeanProperty val cause: Throwable,
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
+
override def toString =
"RemoteServerError@" + remote + "] Error[" + cause + "]"
}
@@ -111,6 +119,7 @@ case class RemoteServerClientConnected(
@transient @BeanProperty remote: RemoteTransport,
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
+
override def toString =
"RemoteServerClientConnected@" + remote +
": Client[" + clientAddress.getOrElse("no address") + "]"
@@ -120,6 +129,7 @@ case class RemoteServerClientDisconnected(
@transient @BeanProperty remote: RemoteTransport,
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
+
override def toString =
"RemoteServerClientDisconnected@" + remote +
": Client[" + clientAddress.getOrElse("no address") + "]"
@@ -129,6 +139,7 @@ case class RemoteServerClientClosed(
@transient @BeanProperty remote: RemoteTransport,
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
+
override def toString =
"RemoteServerClientClosed@" + remote +
": Client[" + clientAddress.getOrElse("no address") + "]"
@@ -183,7 +194,7 @@ abstract class RemoteTransport {
*/
def restartClientConnection(address: Address): Boolean
- /** Methods that needs to be implemented by a transport **/
+ /**Methods that needs to be implemented by a transport **/
protected[akka] def send(message: Any,
senderOption: Option[ActorRef],
@@ -288,9 +299,19 @@ trait RemoteMarshallingOps {
case AddressFromURIString(address) if address == provider.transport.address
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
r.!(remoteMessage.payload)(remoteMessage.sender)
+ case ActorPathExtractor(natAddress, elements) if natAddress.system == system.name
+ if (allow(natAddress)) system.actorFor(elements).tell(remoteMessage.payload, remoteMessage.sender)
+ else log.error("Firewall: dropping message {} for non-local recipient {} at {} local is {}", remoteMessage.payload, r, address, provider.transport.address)
case r log.error("dropping message {} for non-local recipient {} at {} local is {}", remoteMessage.payload, r, address, provider.transport.address)
}
case r log.error("dropping message {} for non-local recipient {} of type {}", remoteMessage.payload, r, if (r ne null) r.getClass else "null")
}
}
+
+ def allow(natAddress: Address): Boolean = {
+ val settings = provider.remoteSettings //have to do this to do the import or else err "stable identifier required"
+ import settings.PublicAddresses
+ if (natAddress.host.isEmpty || natAddress.port.isEmpty) false //Partial addresses are never OK
+ else PublicAddresses.nonEmpty && PublicAddresses.contains(natAddress.host.get + ":" + natAddress.port.get)
+ }
}
View
111 akka-remote/src/multi-jvm/scala/akka/remote/NATFirewallRemoteActorMultiJvmSpec.scala
@@ -0,0 +1,111 @@
+package akka.remote
+
+import akka.testkit._
+import akka.dispatch.Await
+
+import NATFirewallRemoteActorMultiJvmSpec._
+import com.typesafe.config.ConfigFactory
+import akka.actor.{ActorSystem, Actor, Props}
+import akka.pattern.ask
+import akka.util.duration._
+import java.util.concurrent.TimeoutException
+
+
+object NATFirewallRemoteActorMultiJvmSpec {
+
+ def NrOfNodes = 3
+
+ class SomeActor extends Actor with Serializable {
+ def receive = {
+ case "hi" sender ! "hello"
+ }
+ }
+
+ def setup(addresses: String, host: String, port: Int): ActorSystem = {
+ val config = ConfigFactory.parseString("""
+ akka{
+ actor {
+ provider = "akka.remote.RemoteActorRefProvider"
+ }
+ remote{
+ transport = "akka.remote.netty.NettyRemoteTransport"
+ public-addresses = [%s]
+ netty {
+ hostname = "%s"
+ port = %d
+ }
+ }
+ }
+ """.format(addresses, host, port))
+ val system = ActorSystem("nat", config)
+ system.actorOf(Props[SomeActor], "service-hello")
+ system
+
+ }
+
+
+}
+
+//empty public-addresses
+class NATFirewallRemoteActorMultiJvmNode1 extends AkkaSpec(setup("", "0.0.0.0", 2552)) with MultiJvmSync {
+
+ val nodes = NrOfNodes
+
+ "___" must {
+ "___" in {
+ barrier("start")
+
+ barrier("done")
+ }
+ }
+}
+
+//public-addresses specified
+class NATFirewallRemoteActorMultiJvmNode2 extends AkkaSpec(setup(""""127.0.0.1:3663"""", "0.0.0.0", 3663)) with MultiJvmSync {
+
+ val nodes = NrOfNodes
+
+ "___" must {
+ "___" in {
+ barrier("start")
+
+ barrier("done")
+ }
+ }
+}
+
+class NATFirewallRemoteActorMultiJvmNode3 extends AkkaSpec(setup("", "127.0.0.1", 6996)) with MultiJvmSync with DefaultTimeout {
+
+
+ val nodes = NrOfNodes
+
+ "NAT Firewall" must {
+ "allow or dissalow messages properly in" in {
+ barrier("start")
+ val actor1 = system.actorFor("akka://nat@127.0.0.1:2552/user/service-hello")
+ val actor2 = system.actorFor("akka://nat@127.0.0.1:3663/user/service-hello")
+
+ evaluating {
+ Await.result(actor1 ? "hi", 250 millis).asInstanceOf[String]
+ } must produce[TimeoutException]
+
+ Await.result(actor2 ? "hi", 250 millis).asInstanceOf[String] must be("hello")
+
+
+ val actor5 = system.actorFor("akka://notnat@127.0.0.1:2552/user/service-hello")
+ val actor6 = system.actorFor("akka://notnat@127.0.0.1:3663/user/service-hello")
+
+
+ evaluating {
+ Await.result(actor5 ? "hi", 250 millis).asInstanceOf[String]
+ } must produce[TimeoutException]
+
+ evaluating {
+ Await.result(actor6 ? "hi", 250 millis).asInstanceOf[String]
+ } must produce[TimeoutException]
+
+ barrier("done")
+ }
+ }
+}
+
View
1  project/plugins.sbt
@@ -12,3 +12,4 @@ resolvers ++= Seq(
"coda" at "http://repo.codahale.com")
addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.1")
+
Something went wrong with that request. Please try again.