Skip to content

Commit

Permalink
Merge pull request #1067 from akka/wip-2940-sysmsg-serializable-rich
Browse files Browse the repository at this point in the history
Make it easer to override SystemMessage serialization. Fixes #2940
  • Loading branch information
rkuhn committed Jan 28, 2013
2 parents d486721 + 7b3ec79 commit feb413d
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Serialized objects, used for testinging serialization compatibility across
# different versions of Akka.

akka.dispatch.Create = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e437265617465bcdf9f7f2675038d0200014900037569647870000004d27671007e0003"

akka.dispatch.Recreate = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720016616b6b612e64697370617463682e52656372656174650987c65c8d378a800200014c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b787073720020616b6b612e73657269616c697a6174696f6e2e46616b655468726f7761626c6500000000000000010200014c00036d73677400124c6a6176612f6c616e672f537472696e673b787200136a6176612e6c616e672e5468726f7761626c65d5c635273977b8cb0300044c0005636175736571007e00044c000d64657461696c4d65737361676571007e00075b000a737461636b547261636574001e5b4c6a6176612f6c616e672f537461636b5472616365456c656d656e743b4c001473757070726573736564457863657074696f6e737400104c6a6176612f7574696c2f4c6973743b787071007e000b740001787572001e5b4c6a6176612e6c616e672e537461636b5472616365456c656d656e743b02462a3c3cfd2239020000787000000000737200266a6176612e7574696c2e436f6c6c656374696f6e7324556e6d6f6469666961626c654c697374fc0f2531b5ec8e100200014c00046c69737471007e000a7872002c6a6176612e7574696c2e436f6c6c656374696f6e7324556e6d6f6469666961626c65436f6c6c656374696f6e19420080cb5ef71e0200014c0001637400164c6a6176612f7574696c2f436f6c6c656374696f6e3b7870737200136a6176612e7574696c2e41727261794c6973747881d21d99c7619d03000149000473697a657870000000007704000000007871007e00147871007e000c7671007e0003"

akka.dispatch.Suspend = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720015616b6b612e64697370617463682e53757370656e6464e531d5d134b59902000078707671007e0003"

akka.dispatch.Resume = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e526573756d65dc5e646d445fcb010200014c000f63617573656442794661696c7572657400154c6a6176612f6c616e672f5468726f7761626c653b787073720020616b6b612e73657269616c697a6174696f6e2e46616b655468726f7761626c6500000000000000010200014c00036d73677400124c6a6176612f6c616e672f537472696e673b787200136a6176612e6c616e672e5468726f7761626c65d5c635273977b8cb0300044c0005636175736571007e00044c000d64657461696c4d65737361676571007e00075b000a737461636b547261636574001e5b4c6a6176612f6c616e672f537461636b5472616365456c656d656e743b4c001473757070726573736564457863657074696f6e737400104c6a6176612f7574696c2f4c6973743b787071007e000b740001787572001e5b4c6a6176612e6c616e672e537461636b5472616365456c656d656e743b02462a3c3cfd2239020000787000000000737200266a6176612e7574696c2e436f6c6c656374696f6e7324556e6d6f6469666961626c654c697374fc0f2531b5ec8e100200014c00046c69737471007e000a7872002c6a6176612e7574696c2e436f6c6c656374696f6e7324556e6d6f6469666961626c65436f6c6c656374696f6e19420080cb5ef71e0200014c0001637400164c6a6176612f7574696c2f436f6c6c656374696f6e3b7870737200136a6176612e7574696c2e41727261794c6973747881d21d99c7619d03000149000473697a657870000000007704000000007871007e00147871007e000c7671007e0003"

akka.dispatch.Terminate = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5465726d696e61746509d66ca68318700f02000078707671007e0003"

akka.dispatch.Supervise = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5375706572766973652d0b363f56ab5feb0200035a00056173796e634900037569644c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787001000009a47372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003"

akka.dispatch.ChildTerminated = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001d616b6b612e64697370617463682e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003"

akka.dispatch.Watch = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720013616b6b612e64697370617463682e57617463682e1e65bc74394fc40200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003"

akka.dispatch.Unwatch = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720015616b6b612e64697370617463682e556e776174636858501f7ee63dc2100200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003"

akka.dispatch.NoMessage = "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720018616b6b612e64697370617463682e4e6f4d65737361676524b401a3610ccb70dd02000078707671007e0003"
223 changes: 178 additions & 45 deletions akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,34 @@ import language.postfixOps

import akka.testkit.{ AkkaSpec, EventFilter }
import akka.actor._
import akka.dispatch._
import java.io._
import scala.concurrent.Await
import akka.util.Timeout
import scala.concurrent.duration._
import scala.reflect.BeanInfo
import com.google.protobuf.Message
import com.typesafe.config._
import akka.pattern.ask
import org.apache.commons.codec.binary.Hex.{ encodeHex, decodeHex }

object SerializeSpec {
object SerializationTests {

val config = """
val serializeConf = """
akka {
actor {
serializers {
test = "akka.serialization.TestSerializer"
}
serialization-bindings {
"akka.serialization.SerializeSpec$Person" = java
"akka.serialization.SerializeSpec$Address" = java
"akka.serialization.TestSerializble" = test
"akka.serialization.SerializeSpec$PlainMessage" = test
"akka.serialization.SerializeSpec$A" = java
"akka.serialization.SerializeSpec$B" = test
"akka.serialization.SerializeSpec$D" = test
"akka.serialization.SerializationTests$Person" = java
"akka.serialization.SerializationTests$Address" = java
"akka.serialization.TestSerializable" = test
"akka.serialization.SerializationTests$PlainMessage" = test
"akka.serialization.SerializationTests$A" = java
"akka.serialization.SerializationTests$B" = test
"akka.serialization.SerializationTests$D" = test
}
}
}
Expand All @@ -45,11 +48,11 @@ object SerializeSpec {

case class Record(id: Int, person: Person)

class SimpleMessage(s: String) extends TestSerializble
class SimpleMessage(s: String) extends TestSerializable

class ExtendedSimpleMessage(s: String, i: Int) extends SimpleMessage(s)

trait AnotherInterface extends TestSerializble
trait AnotherInterface extends TestSerializable

class AnotherMessage extends AnotherInterface

Expand All @@ -67,11 +70,67 @@ object SerializeSpec {
class D extends A
class E extends D

val verifySerializabilityConf = """
akka {
actor {
serialize-messages = on
serialize-creators = on
}
}
"""

class FooActor extends Actor {
def receive = {
case s: String sender ! s
}
}

class FooUntypedActor extends UntypedActor {
def onReceive(message: Any) {}
}

class NonSerializableActor(system: ActorSystem) extends Actor {
def receive = {
case s: String sender ! s
}
}

def mostlyReferenceSystem: ActorSystem = {
val referenceConf = ConfigFactory.defaultReference()
val mostlyReferenceConf = AkkaSpec.testConf.withFallback(referenceConf)
ActorSystem("SerializationSystem", mostlyReferenceConf)
}

val systemMessageMultiSerializerConf = """
akka {
actor {
serializers {
test = "akka.serialization.TestSerializer"
}
serialization-bindings {
"akka.dispatch.SystemMessage" = test
}
}
}
"""

val systemMessageClasses = List[Class[_]](
classOf[Create],
classOf[Recreate],
classOf[Suspend],
classOf[Resume],
classOf[Terminate],
classOf[Supervise],
classOf[ChildTerminated],
classOf[Watch],
classOf[Unwatch],
NoMessage.getClass)
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
import SerializeSpec._
class SerializeSpec extends AkkaSpec(SerializationTests.serializeConf) {
import SerializationTests._

val ser = SerializationExtension(system)
import ser._
Expand Down Expand Up @@ -156,15 +215,15 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {

"give warning for message with several bindings" in {
EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept {
ser.serializerFor(classOf[Both]).getClass must be(classOf[TestSerializer])
ser.serializerFor(classOf[Both]).getClass must (be(classOf[TestSerializer]) or be(classOf[JavaSerializer]))
}
}

"resolve serializer in the order of the bindings" in {
ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer])
ser.serializerFor(classOf[B]).getClass must be(classOf[TestSerializer])
EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept {
ser.serializerFor(classOf[C]).getClass must be(classOf[JavaSerializer])
ser.serializerFor(classOf[C]).getClass must (be(classOf[TestSerializer]) or be(classOf[JavaSerializer]))
}
}

Expand Down Expand Up @@ -194,36 +253,9 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
}
}

object VerifySerializabilitySpec {
val conf = """
akka {
actor {
serialize-messages = on
serialize-creators = on
}
}
"""

class FooActor extends Actor {
def receive = {
case s: String sender ! s
}
}

class FooUntypedActor extends UntypedActor {
def onReceive(message: Any) {}
}

class NonSerializableActor(system: ActorSystem) extends Actor {
def receive = {
case s: String sender ! s
}
}
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) {
import VerifySerializabilitySpec._
class VerifySerializabilitySpec extends AkkaSpec(SerializationTests.verifySerializabilityConf) {
import SerializationTests._
implicit val timeout = Timeout(5 seconds)

"verify config" in {
Expand Down Expand Up @@ -260,7 +292,85 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf)
}
}

trait TestSerializble
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ReferenceSerializationSpec extends AkkaSpec(SerializationTests.mostlyReferenceSystem) {
import SerializationTests._

val ser = SerializationExtension(system)
def serializerMustBe(toSerialize: Class[_], expectedSerializer: Class[_]) =
ser.serializerFor(toSerialize).getClass must be(expectedSerializer)

"Serialization settings from reference.conf" must {

"declare Serializable classes to be use JavaSerializer" in {
serializerMustBe(classOf[Serializable], classOf[JavaSerializer])
serializerMustBe(classOf[String], classOf[JavaSerializer])
for (smc systemMessageClasses) {
serializerMustBe(smc, classOf[JavaSerializer])
}
}

"declare Array[Byte] to use ByteArraySerializer" in {
serializerMustBe(classOf[Array[Byte]], classOf[ByteArraySerializer])
}

"not support serialization for other classes" in {
intercept[NotSerializableException] { ser.serializerFor(classOf[Object]) }
}

}
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyReferenceSystem) {
import SerializationTests._

val ser = SerializationExtension(system)

"Cross-version serialization compatibility" must {

"be preserved for SystemMessages" in {
val objs = List[(String, Any)](
("akka.dispatch.Create", Create(1234)),
("akka.dispatch.Recreate", Recreate(FakeThrowable("x"))),
("akka.dispatch.Suspend", Suspend()),
("akka.dispatch.Resume", Resume(FakeThrowable("x"))),
("akka.dispatch.Terminate", Terminate()),
("akka.dispatch.Supervise", Supervise(FakeActorRef("child"), true, 2468)),
("akka.dispatch.ChildTerminated", ChildTerminated(FakeActorRef("child"))),
("akka.dispatch.Watch", Watch(FakeActorRef("watchee"), FakeActorRef("watcher"))),
("akka.dispatch.Unwatch", Unwatch(FakeActorRef("watchee"), FakeActorRef("watcher"))),
("akka.dispatch.NoMessage", NoMessage))
val expectedConf = ConfigFactory.load("akka/serialization/serialized.conf")
for ((key, obj) objs) {
val hex = new String(encodeHex(ser.serialize(obj, obj.getClass).get))
hex must be(expectedConf.getString(key))
}
}

}
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class OverriddenSystemMessageSerializationSpec extends AkkaSpec(SerializationTests.systemMessageMultiSerializerConf) {
import SerializationTests._

val ser = SerializationExtension(system)

"Overridden SystemMessage serialization" must {

"resolve to a single serializer" in {
EventFilter.warning(start = "Multiple serializers found", occurrences = 0) intercept {
for (smc systemMessageClasses) {
ser.serializerFor(smc).getClass must be(classOf[TestSerializer])
}
}
}

}
}

trait TestSerializable

class TestSerializer extends Serializer {
def includeManifest: Boolean = false
Expand All @@ -273,3 +383,26 @@ class TestSerializer extends Serializer {

def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null
}

@SerialVersionUID(1)
case class FakeThrowable(msg: String) extends Throwable(msg) with Serializable {
override def fillInStackTrace = null
}

@SerialVersionUID(1)
case class FakeActorRef(name: String) extends InternalActorRef with ActorRefScope {
override def path = RootActorPath(Address("proto", "SomeSystem"), name)
override def forward(message: Any)(implicit context: ActorContext) = ???
override def isTerminated = ???
override def start() = ???
override def resume(causedByFailure: Throwable) = ???
override def suspend() = ???
override def restart(cause: Throwable) = ???
override def stop() = ???
override def sendSystemMessage(message: SystemMessage) = ???
override def provider = ???
override def getParent = ???
override def getChild(name: Iterator[String]) = ???
override def isLocal = ???
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = ???
}
12 changes: 11 additions & 1 deletion akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,50 +72,60 @@ private[akka] object SystemMessage {
*
* ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
*/
private[akka] sealed trait SystemMessage extends PossiblyHarmful {
private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializable {
@transient
var next: SystemMessage = _
}

/**
* INTERNAL API
*/
@SerialVersionUID(-4836972106317757555L)
private[akka] case class Create(uid: Int) extends SystemMessage // send to self from Dispatcher.register
/**
* INTERNAL API
*/
@SerialVersionUID(686735569005808256L)
private[akka] case class Recreate(cause: Throwable) extends SystemMessage // sent to self from ActorCell.restart
/**
* INTERNAL API
*/
@SerialVersionUID(7270271967867221401L)
private[akka] case class Suspend() extends SystemMessage // sent to self from ActorCell.suspend
/**
* INTERNAL API
*/
@SerialVersionUID(-2567504317093262591L)
private[akka] case class Resume(causedByFailure: Throwable) extends SystemMessage // sent to self from ActorCell.resume
/**
* INTERNAL API
*/
@SerialVersionUID(708873453777219599L)
private[akka] case class Terminate() extends SystemMessage // sent to self from ActorCell.stop
/**
* INTERNAL API
*/
@SerialVersionUID(3245747602115485675L)
private[akka] case class Supervise(child: ActorRef, async: Boolean, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start
/**
* INTERNAL API
*/
@SerialVersionUID(5513569382760799668L)
private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to supervisor from ActorCell.doTerminate
/**
* INTERNAL API
*/
@SerialVersionUID(3323205435124174788L)
private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch
/**
* INTERNAL API
*/
@SerialVersionUID(6363620903363658256L)
private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch
/**
* INTERNAL API
*/
@SerialVersionUID(-5475916034683997987L)
private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination

final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Batchable {
Expand Down

0 comments on commit feb413d

Please sign in to comment.