Skip to content

Commit

Permalink
Add compat module for wire compatibility with Akka
Browse files Browse the repository at this point in the history
  • Loading branch information
nvollmar committed Oct 16, 2023
1 parent 0f86f37 commit 0777a2b
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.altoo.akka.serialization.kryo.compat

import akka.actor.ExtendedActorSystem
import io.altoo.akka.serialization.kryo.DefaultKryoInitializer
import io.altoo.akka.serialization.kryo.compat.serializer.CompatActorRefSerializer
import io.altoo.akka.serialization.kryo.serializer.akka.ByteStringSerializer
import io.altoo.akka.serialization.kryo.serializer.scala.ScalaKryo

class PekkoCompatKryoInitializer extends DefaultKryoInitializer {

override def initAkkaSerializer(kryo: ScalaKryo, system: ExtendedActorSystem): Unit = {
super.initAkkaSerializer(kryo, system)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.ActorRef], new CompatActorRefSerializer(system))
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.RepointableActorRef], new CompatActorRefSerializer(system))
// registering dummy Akka ByteString to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.util.ByteString], classOf[ByteStringSerializer])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.altoo.akka.serialization.kryo.compat

import akka.actor.{ExtendedActorSystem, typed}
import io.altoo.akka.serialization.kryo.compat.serializer.{CompatActorRefSerializer, CompatTypedActorRefSerializer}
import io.altoo.akka.serialization.kryo.serializer.akka.ByteStringSerializer
import io.altoo.akka.serialization.kryo.serializer.scala.ScalaKryo
import io.altoo.akka.serialization.kryo.typed.TypedKryoInitializer

class TypedPekkoCompatKryoInitializer extends TypedKryoInitializer {

override def initAkkaSerializer(kryo: ScalaKryo, typedSystem: typed.ActorSystem[Nothing]): Unit = {
super.initAkkaSerializer(kryo, typedSystem)

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.ActorRef], new CompatActorRefSerializer(typedSystem.classicSystem.asInstanceOf[ExtendedActorSystem]))
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.RepointableActorRef], new CompatActorRefSerializer(typedSystem.classicSystem.asInstanceOf[ExtendedActorSystem]))
// registering dummy Akka ByteString to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.util.ByteString], classOf[ByteStringSerializer])

// registering dummy Akka ActorRef to provide wire compatibility
kryo.addDefaultSerializer(classOf[org.apache.pekko.actor.typed.ActorRef[Nothing]], new CompatTypedActorRefSerializer(typedSystem))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ****************************************************************************
*/

package io.altoo.akka.serialization.kryo.compat.serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import akka.actor.{ActorRef, ExtendedActorSystem}
import akka.serialization.Serialization

/**
* Specialized serializer for actor refs.
*
* @author Roman Levenstein
*/
class CompatActorRefSerializer(val system: ExtendedActorSystem) extends Serializer[ActorRef] {

override def read(kryo: Kryo, input: Input, typ: Class[? <: ActorRef]): ActorRef = {
val path = input.readString()
val newPath = path.replace("pekko://", "akka://")
system.provider.resolveActorRef(newPath)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef): Unit = {
output.writeAscii(Serialization.serializedActorPath(obj))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ****************************************************************************
*/

package io.altoo.akka.serialization.kryo.compat.serializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import akka.actor.typed.{ActorRef, ActorRefResolver, ActorSystem}

/**
* Specialized serializer for typed actor refs.
*
* @author Arman Bilge
*/
class CompatTypedActorRefSerializer(val system: ActorSystem[Nothing]) extends Serializer[ActorRef[Nothing]] {

private val resolver = ActorRefResolver(system)

override def read(kryo: Kryo, input: Input, typ: Class[_ <: ActorRef[Nothing]]): ActorRef[Nothing] = {
val path = input.readString()
val newPath = path.replace("akka://", "pekko://")
resolver.resolveActorRef(newPath)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef[Nothing]): Unit = {
output.writeAscii(resolver.toSerializationFormat(obj))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.pekko.actor

/**
* Dummy class to register a serializer for akka.actor.ActorRef on Pekko system
*/
class ActorRef
class RepointableActorRef
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.apache.pekko.actor.typed

/**
* Dummy class to register a serializer for akka.actor.typed.ActorRef on Pekko system
*/
class ActorRef[-T]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.pekko.util


/**
* Dummy class to register a serializer for akka.util.ByteString on Pekko system
*/
class ByteString
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.altoo.pekko.serialization.kryo.compat

import com.typesafe.config.ConfigFactory
import akka.actor.{Actor, ActorSystem, Props}
import akka.serialization.SerializationExtension
import akka.testkit.TestKit
import io.altoo.akka.serialization.kryo.KryoSerializer
import io.altoo.testing.SampleMessage
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, Inside}

object PekkoCompatSerializerTest {
private val testConfig =
"""
|akka {
| actor {
| serializers {
| kryo = "io.altoo.akka.serialization.kryo.KryoSerializer"
| }
| serialization-bindings {
| "org.apache.pekko.actor.ActorRef" = kryo
| "akka.actor.ActorRef" = kryo
| "io.altoo.testing.SampleMessage" = kryo
| }
| }
|}
|akka-kryo-serialization {
| trace = true
| id-strategy = "default"
| implicit-registration-logging = true
| post-serialization-transformations = off
|
| kryo-initializer = "io.altoo.akka.serialization.kryo.compat.PekkoCompatKryoInitializer"
|}
|""".stripMargin

// serialized io.altoo.testing.SampleMessage(actorRef: org.apache.pekko.actor.ActorRef) with pekko-kryo-serialization
private val pekkoActorRefSerialized = Array[Byte](1, 0, 105, 111, 46, 97, 108, 116, 111, 111, 46, 116, 101, 115, 116, 105, 110, 103, 46, 83, 97, 109, 112, 108, 101, 77, 101, 115, 115, 97,
103, -27, 1, 1, 1, 111, 114, 103, 46, 97, 112, 97, 99, 104, 101, 46, 112, 101, 107, 107, 111, 46, 97, 99, 116, 111, 114, 46, 82, 101, 112, 111, 105, 110, 116, 97, 98, 108, 101, 65, 99,
116, 111, 114, 82, 101, -26, 1, 112, 101, 107, 107, 111, 58, 47, 47, 116, 101, 115, 116, 83, 121, 115, 116, 101, 109, 47, 117, 115, 101, 114, 47, 115, 97, 109, 112, 108, 101, 65, 99, 116,
111, 114, 35, 56, 48, 52, 54, 54, 57, 49, 52, -79)
}

class PekkoCompatSerializerTest extends TestKit(ActorSystem("testSystem", ConfigFactory.parseString(PekkoCompatSerializerTest.testConfig).withFallback(ConfigFactory.load())))
with AnyFlatSpecLike with Matchers with Inside with BeforeAndAfterAll {

private val serialization = SerializationExtension(system)

override protected def afterAll(): Unit = shutdown(system)

behavior of "ActorRefSerializer"

it should "deserialize actorRef from Pekko" in {
// create actor with path to not get deadLetter ref
system.actorOf(Props(new Actor { def receive: Receive = PartialFunction.empty }), "sampleActor")

val serializer = serialization.serializerFor(classOf[SampleMessage])
serializer shouldBe a[KryoSerializer]

// deserialize
val deserialized = serializer.fromBinary(PekkoCompatSerializerTest.pekkoActorRefSerialized)
deserialized shouldBe a[SampleMessage]
deserialized.asInstanceOf[SampleMessage].actorRef.path.toString shouldBe "akka://testSystem/user/sampleActor"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.altoo.testing

import akka.actor.ActorRef

// Mirror class using Pekko ActorRef instead of Akka ActorRef
case class SampleMessage(actorRef: ActorRef) extends Serializable
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy val root: Project = project.in(file("."))
.settings(publish / skip := true)
.settings(OsgiKeys.privatePackage := Nil)
.settings(OsgiKeys.exportPackage := Seq("io.altoo.*"))
.aggregate(core, typed)
.aggregate(core, typed, pekkoCompat)

lazy val core: Project = Project("akka-kryo-serialization", file("akka-kryo-serialization"))
.settings(moduleSettings)
Expand All @@ -59,6 +59,12 @@ lazy val typed: Project = Project("akka-kryo-serialization-typed", file("akka-kr
.settings(libraryDependencies ++= typedDeps ++ testingDeps)
.dependsOn(core)

lazy val pekkoCompat: Project = Project("akka-kryo-serialization-pekko-compat", file("akka-kryo-serialization-pekko-compat"))
.settings(moduleSettings)
.settings(description := "akka-serialization implementation using kryo - extension for improved wire compatibility with Pekko")
.settings(libraryDependencies ++= testingDeps)
.dependsOn(core, typed)


// Dependencies
lazy val coreDeps = Seq(
Expand Down

0 comments on commit 0777a2b

Please sign in to comment.