Skip to content

Commit

Permalink
serializer for Optional, #21911
Browse files Browse the repository at this point in the history
* it's safe to add it to the serialization-bindings, because it had
  no previous binding (doesn't implement java.io.Serializable)

(cherry picked from commit bcce11e)
  • Loading branch information
patriknw committed Dec 1, 2016
1 parent 448c12d commit 1ef2d19
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
2 changes: 2 additions & 0 deletions akka-remote/src/main/resources/reference.conf
Expand Up @@ -47,6 +47,8 @@ akka {
# This com.google.protobuf serialization binding is only used if the class can be loaded,
# i.e. com.google.protobuf dependency has been added in the application project.
"com.google.protobuf.GeneratedMessage" = proto

"java.util.Optional" = akka-misc
}

# For the purpose of preserving protocol backward compatibility these bindings are not
Expand Down
Expand Up @@ -7,6 +7,7 @@ import akka.actor._
import akka.protobuf.ByteString
import akka.remote.{ ContainerFormats, RemoteWatcher }
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
import java.util.Optional

class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {

Expand All @@ -22,6 +23,7 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
case identity: ActorIdentity serializeActorIdentity(identity)
case Some(value) serializeSome(value)
case None ParameterlessSerializedMessage
case o: Optional[_] serializeOptional(o)
case r: ActorRef serializeActorRef(r)
case s: Status.Success serializeStatusSuccess(s)
case f: Status.Failure serializeStatusFailure(f)
Expand Down Expand Up @@ -60,6 +62,16 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
.build()
.toByteArray

private def serializeOptional(opt: Optional[_]): Array[Byte] = {
if (opt.isPresent)
ContainerFormats.Option.newBuilder()
.setValue(payloadSupport.payloadBuilder(opt.get))
.build()
.toByteArray
else
ParameterlessSerializedMessage
}

private def serializeActorRef(ref: ActorRef): Array[Byte] =
actorRefBuilder(ref).build().toByteArray

Expand Down Expand Up @@ -95,6 +107,7 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private val StatusFailureManifest = "E"
private val ThrowableManifest = "F"
private val ActorRefManifest = "G"
private val OptionalManifest = "H"
private val PoisonPillManifest = "P"
private val KillManifest = "K"
private val RemoteWatcherHBManifest = "RWHB"
Expand All @@ -110,18 +123,19 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
ThrowableManifest throwableSupport.deserializeThrowable,
ActorRefManifest deserializeActorRefBytes,
OptionManifest deserializeOption,
OptionalManifest deserializeOptional,
PoisonPillManifest ((_) PoisonPill),
KillManifest ((_) Kill),
RemoteWatcherHBManifest ((_) RemoteWatcher.Heartbeat),
RemoteWatcherHBRespManifest deserializeHeartbeatRsp,
ActorInitializationExceptionManifest deserializeActorInitializationException
)
ActorInitializationExceptionManifest deserializeActorInitializationException)

override def manifest(o: AnyRef): String =
o match {
case _: Identify IdentifyManifest
case _: ActorIdentity ActorIdentityManifest
case _: Option[Any] OptionManifest
case _: Optional[_] OptionalManifest
case _: ActorRef ActorRefManifest
case _: Status.Success StatusSuccessManifest
case _: Status.Failure StatusFailureManifest
Expand Down Expand Up @@ -174,6 +188,15 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
}
}

private def deserializeOptional(bytes: Array[Byte]): Optional[Any] = {
if (bytes.length == 0)
Optional.empty()
else {
val optionProto = ContainerFormats.Option.parseFrom(bytes)
Optional.of(payloadSupport.deserializePayload(optionProto.getValue))
}
}

private def deserializeStatusSuccess(bytes: Array[Byte]): Status.Success =
Status.Success(payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes)))

Expand All @@ -197,8 +220,7 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
ActorInitializationException(
if (serializedEx.hasActor) ref else null,
reconstructedMessage,
payloadSupport.deserializePayload(serializedEx.getCause).asInstanceOf[Throwable]
)
payloadSupport.deserializePayload(serializedEx.getCause).asInstanceOf[Throwable])
}

}
Expand Up @@ -11,6 +11,7 @@ import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory

import scala.util.control.NoStackTrace
import java.util.Optional

object MiscMessageSerializerSpec {
val serializationTestOverrides =
Expand Down Expand Up @@ -77,11 +78,12 @@ class MiscMessageSerializerSpec extends AkkaSpec(MiscMessageSerializerSpec.testC
"ActorRef" ref,
"Some" Some("value"),
"None" None,
"Optional.present" Optional.of("value2"),
"Optional.empty" Optional.empty(),
"Kill" Kill,
"PoisonPill" PoisonPill,
"RemoteWatcher.Heartbeat" RemoteWatcher.Heartbeat,
"RemoteWatcher.HertbeatRsp" RemoteWatcher.HeartbeatRsp(65537)
).foreach {
"RemoteWatcher.HertbeatRsp" RemoteWatcher.HeartbeatRsp(65537)).foreach {
case (scenario, item)
s"resolve serializer for $scenario" in {
val serializer = SerializationExtension(system)
Expand Down

0 comments on commit 1ef2d19

Please sign in to comment.