-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
TwoPhaseSetSerializer2.scala
54 lines (45 loc) · 1.78 KB
/
TwoPhaseSetSerializer2.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/*
* Copyright (C) 2015-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.ddata.protobuf
//#serializer
import akka.actor.ExtendedActorSystem
import akka.cluster.ddata.GSet
import akka.cluster.ddata.protobuf.SerializationSupport
import akka.serialization.Serializer
import docs.ddata.TwoPhaseSet
import docs.ddata.protobuf.msg.TwoPhaseSetMessages
class TwoPhaseSetSerializer2(val system: ExtendedActorSystem) extends Serializer with SerializationSupport {
override def includeManifest: Boolean = false
override def identifier = 99999
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: TwoPhaseSet => twoPhaseSetToProto(m).toByteArray
case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
}
override def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
twoPhaseSetFromBinary(bytes)
}
def twoPhaseSetToProto(twoPhaseSet: TwoPhaseSet): TwoPhaseSetMessages.TwoPhaseSet2 = {
val b = TwoPhaseSetMessages.TwoPhaseSet2.newBuilder()
if (!twoPhaseSet.adds.isEmpty)
b.setAdds(otherMessageToProto(twoPhaseSet.adds).toByteString())
if (!twoPhaseSet.removals.isEmpty)
b.setRemovals(otherMessageToProto(twoPhaseSet.removals).toByteString())
b.build()
}
def twoPhaseSetFromBinary(bytes: Array[Byte]): TwoPhaseSet = {
val msg = TwoPhaseSetMessages.TwoPhaseSet2.parseFrom(bytes)
val adds =
if (msg.hasAdds)
otherMessageFromBinary(msg.getAdds.toByteArray).asInstanceOf[GSet[String]]
else
GSet.empty[String]
val removals =
if (msg.hasRemovals)
otherMessageFromBinary(msg.getRemovals.toByteArray).asInstanceOf[GSet[String]]
else
GSet.empty[String]
TwoPhaseSet(adds, removals)
}
}
//#serializer