diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 1e3c0c0e175..7ff85ab94cd 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -47,6 +47,8 @@ akka { # This com.google.protobuf serialization binding is only used if the class can be loaded, # i.e. com.google.protobuf dependency has been added in the application project. "com.google.protobuf.GeneratedMessage" = proto + + "java.util.Optional" = akka-misc } # For the purpose of preserving protocol backward compatibility these bindings are not diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala index b29c5a2f2de..9b2fcdf1319 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala @@ -7,6 +7,7 @@ import akka.actor._ import akka.protobuf.ByteString import akka.remote.{ ContainerFormats, RemoteWatcher } import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest } +import java.util.Optional class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { @@ -22,6 +23,7 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW case identity: ActorIdentity ⇒ serializeActorIdentity(identity) case Some(value) ⇒ serializeSome(value) case None ⇒ ParameterlessSerializedMessage + case o: Optional[_] ⇒ serializeOptional(o) case r: ActorRef ⇒ serializeActorRef(r) case s: Status.Success ⇒ serializeStatusSuccess(s) case f: Status.Failure ⇒ serializeStatusFailure(f) @@ -60,6 +62,16 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW .build() .toByteArray + private def serializeOptional(opt: Optional[_]): Array[Byte] = { + if (opt.isPresent) + ContainerFormats.Option.newBuilder() + .setValue(payloadSupport.payloadBuilder(opt.get)) + .build() + .toByteArray + else + ParameterlessSerializedMessage + } + private def serializeActorRef(ref: ActorRef): Array[Byte] = actorRefBuilder(ref).build().toByteArray @@ -95,6 +107,7 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW private val StatusFailureManifest = "E" private val ThrowableManifest = "F" private val ActorRefManifest = "G" + private val OptionalManifest = "H" private val PoisonPillManifest = "P" private val KillManifest = "K" private val RemoteWatcherHBManifest = "RWHB" @@ -110,18 +123,19 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW ThrowableManifest → throwableSupport.deserializeThrowable, ActorRefManifest → deserializeActorRefBytes, OptionManifest → deserializeOption, + OptionalManifest → deserializeOptional, PoisonPillManifest → ((_) ⇒ PoisonPill), KillManifest → ((_) ⇒ Kill), RemoteWatcherHBManifest → ((_) ⇒ RemoteWatcher.Heartbeat), RemoteWatcherHBRespManifest → deserializeHeartbeatRsp, - ActorInitializationExceptionManifest → deserializeActorInitializationException - ) + ActorInitializationExceptionManifest → deserializeActorInitializationException) override def manifest(o: AnyRef): String = o match { case _: Identify ⇒ IdentifyManifest case _: ActorIdentity ⇒ ActorIdentityManifest case _: Option[Any] ⇒ OptionManifest + case _: Optional[_] ⇒ OptionalManifest case _: ActorRef ⇒ ActorRefManifest case _: Status.Success ⇒ StatusSuccessManifest case _: Status.Failure ⇒ StatusFailureManifest @@ -174,6 +188,15 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW } } + private def deserializeOptional(bytes: Array[Byte]): Optional[Any] = { + if (bytes.length == 0) + Optional.empty() + else { + val optionProto = ContainerFormats.Option.parseFrom(bytes) + Optional.of(payloadSupport.deserializePayload(optionProto.getValue)) + } + } + private def deserializeStatusSuccess(bytes: Array[Byte]): Status.Success = Status.Success(payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes))) @@ -197,8 +220,7 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW ActorInitializationException( if (serializedEx.hasActor) ref else null, reconstructedMessage, - payloadSupport.deserializePayload(serializedEx.getCause).asInstanceOf[Throwable] - ) + payloadSupport.deserializePayload(serializedEx.getCause).asInstanceOf[Throwable]) } } diff --git a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala index 5e0a2c3b4db..24b4f7a6c76 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala @@ -11,6 +11,7 @@ import akka.testkit.AkkaSpec import com.typesafe.config.ConfigFactory import scala.util.control.NoStackTrace +import java.util.Optional object MiscMessageSerializerSpec { val serializationTestOverrides = @@ -77,11 +78,12 @@ class MiscMessageSerializerSpec extends AkkaSpec(MiscMessageSerializerSpec.testC "ActorRef" → ref, "Some" → Some("value"), "None" → None, + "Optional.present" → Optional.of("value2"), + "Optional.empty" → Optional.empty(), "Kill" → Kill, "PoisonPill" → PoisonPill, "RemoteWatcher.Heartbeat" → RemoteWatcher.Heartbeat, - "RemoteWatcher.HertbeatRsp" → RemoteWatcher.HeartbeatRsp(65537) - ).foreach { + "RemoteWatcher.HertbeatRsp" → RemoteWatcher.HeartbeatRsp(65537)).foreach { case (scenario, item) ⇒ s"resolve serializer for $scenario" in { val serializer = SerializationExtension(system)