Skip to content

Commit

Permalink
=clt #18232 Serializer for ClusterSingleton
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Aug 18, 2015
1 parent 7693f1e commit 33e05ea
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 5 deletions.
13 changes: 13 additions & 0 deletions akka-cluster-tools/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,16 @@ akka.cluster.singleton-proxy {
buffer-size = 1000
}
# //#singleton-proxy-config

# Serializer for cluster ClusterSingleton messages
akka.actor {
serializers {
akka-singleton = "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer"
}
serialization-bindings {
"akka.cluster.singleton.ClusterSingletonMessage" = akka-singleton
}
serialization-identifiers {
"akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" = 14
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ final class ClusterSingletonManagerSettings(
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
}

/**
* Marker trait for remote messages with special serializer.
*/
sealed trait ClusterSingletonMessage extends Serializable

object ClusterSingletonManager {

/**
Expand All @@ -136,33 +141,33 @@ object ClusterSingletonManager {
/**
* INTERNAL API
*/
private object Internal {
private[akka] object Internal {
/**
* Sent from new oldest to previous oldest to initiate the
* hand-over process. `HandOverInProgress` and `HandOverDone`
* are expected replies.
*/
case object HandOverToMe
case object HandOverToMe extends ClusterSingletonMessage
/**
* Confirmation by the previous oldest that the hand
* over process, shut down of the singleton actor, has
* started.
*/
case object HandOverInProgress
case object HandOverInProgress extends ClusterSingletonMessage
/**
* Confirmation by the previous oldest that the singleton
* actor has been terminated and the hand-over process is
* completed.
*/
case object HandOverDone
case object HandOverDone extends ClusterSingletonMessage
/**
* Sent from from previous oldest to new oldest to
* initiate the normal hand-over process.
* Especially useful when new node joins and becomes
* oldest immediately, without knowing who was previous
* oldest.
*/
case object TakeOverFromMe
case object TakeOverFromMe extends ClusterSingletonMessage

final case class HandOverRetry(count: Int)
final case class TakeOverRetry(count: Int)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.singleton.protobuf

import akka.actor.ExtendedActorSystem
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverDone
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverInProgress
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverToMe
import akka.cluster.singleton.ClusterSingletonManager.Internal.TakeOverFromMe
import akka.serialization.BaseSerializer
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest

/**
* INTERNAL API: Serializer of ClusterSingleton messages.
* It is actually not using protobuf, but if we add more messages to
* the ClusterSingleton we want to make protobuf representations of them.
*/
private[akka] class ClusterSingletonMessageSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest with BaseSerializer {

private lazy val serialization = SerializationExtension(system)

private val HandOverToMeManifest = "A"
private val HandOverInProgressManifest = "B"
private val HandOverDoneManifest = "C"
private val TakeOverFromMeManifest = "D"

private val emptyByteArray = Array.empty[Byte]

private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] AnyRef](
HandOverToMeManifest -> { _ HandOverToMe },
HandOverInProgressManifest -> { _ HandOverInProgress },
HandOverDoneManifest -> { _ HandOverDone },
TakeOverFromMeManifest -> { _ TakeOverFromMe })

override def manifest(obj: AnyRef): String = obj match {
case HandOverToMe HandOverToMeManifest
case HandOverInProgress HandOverInProgressManifest
case HandOverDone HandOverDoneManifest
case TakeOverFromMe TakeOverFromMeManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}

override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case HandOverToMe emptyByteArray
case HandOverInProgress emptyByteArray
case HandOverDone emptyByteArray
case TakeOverFromMe emptyByteArray
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}

override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
fromBinaryMap.get(manifest) match {
case Some(f) f(bytes)
case None throw new IllegalArgumentException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.singleton.protobuf

import akka.actor.ExtendedActorSystem
import akka.testkit.AkkaSpec
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverDone
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverInProgress
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverToMe
import akka.cluster.singleton.ClusterSingletonManager.Internal.TakeOverFromMe

class ClusterSingletonMessageSerializerSpec extends AkkaSpec {

val serializer = new ClusterSingletonMessageSerializer(system.asInstanceOf[ExtendedActorSystem])

def checkSerialization(obj: AnyRef): Unit = {
val blob = serializer.toBinary(obj)
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
ref should ===(obj)
}

"ClusterSingletonMessages" must {

"be serializable" in {
checkSerialization(HandOverDone)
checkSerialization(HandOverInProgress)
checkSerialization(HandOverToMe)
checkSerialization(TakeOverFromMe)
}
}
}

0 comments on commit 33e05ea

Please sign in to comment.