/
AsyncSerializer.scala
86 lines (70 loc) · 2.89 KB
/
AsyncSerializer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*
* Copyright (C) 2018-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.serialization
import java.util.concurrent.CompletionStage
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration.Duration
import akka.actor.ExtendedActorSystem
import akka.event.Logging
/**
* Serializer that supports async serialization.
*
* Only used for Akka persistence journals that explicitly support async serializers.
*
* Implementations should typically extend [[AsyncSerializerWithStringManifest]] or
* [[AsyncSerializerWithStringManifestCS]] that delegates synchronous calls to their async equivalents.
*/
trait AsyncSerializer {
/**
* Serializes the given object into an Array of Byte.
*
* Note that the array must not be mutated by the serializer after it has been used to complete the returned `Future`.
*/
def toBinaryAsync(o: AnyRef): Future[Array[Byte]]
/**
* Produces an object from an array of bytes, with an optional type-hint.
*/
def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef]
}
/**
* Scala API: Async serializer with string manifest that delegates synchronous calls to the asynchronous calls
* and blocks.
*/
abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem)
extends SerializerWithStringManifest
with AsyncSerializer {
private val log = Logging(system, classOf[AsyncSerializerWithStringManifest])
final override def toBinary(o: AnyRef): Array[Byte] = {
log.warning(
"Async serializer called synchronously. This will block. Async serializers should only be used for akka persistence plugins that support them. Class: {}",
o.getClass)
Await.result(toBinaryAsync(o), Duration.Inf)
}
final override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
log.warning(
"Async serializer called synchronously. This will block. Async serializers should only be used for akka persistence plugins that support them. Manifest: [{}]",
manifest)
Await.result(fromBinaryAsync(bytes, manifest), Duration.Inf)
}
}
/**
* Java API: Async serializer with string manifest that delegates synchronous calls to the asynchronous calls
* and blocks.
*/
abstract class AsyncSerializerWithStringManifestCS(system: ExtendedActorSystem)
extends AsyncSerializerWithStringManifest(system) {
import scala.compat.java8.FutureConverters._
def toBinaryAsyncCS(o: AnyRef): CompletionStage[Array[Byte]]
def fromBinaryAsyncCS(bytes: Array[Byte], manifest: String): CompletionStage[AnyRef]
/**
* Delegates to [[AsyncSerializerWithStringManifestCS#toBinaryAsyncCS]]
*/
final def toBinaryAsync(o: AnyRef): Future[Array[Byte]] =
toBinaryAsyncCS(o).toScala
/**
* Delegates to [[AsyncSerializerWithStringManifestCS#fromBinaryAsyncCS]]
*/
def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef] =
fromBinaryAsyncCS(bytes, manifest).toScala
}