-
Notifications
You must be signed in to change notification settings - Fork 4
/
RPCChannel.scala
171 lines (149 loc) · 5.3 KB
/
RPCChannel.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
* Copyright 2021 Hossein Naderi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package lepus.std
import cats.effect.Concurrent
import cats.syntax.all.*
import fs2.Stream
import lepus.client.*
import lepus.client.apis.NormalMessagingChannel
import lepus.protocol.domains.*
trait RPCServer[F[_], I, O] {
def requests: Stream[F, RequestMethod[I]]
def respond(req: RequestMethod[I], o: O): F[Unit]
def ignore(i: RequestMethod[I]): F[Unit]
def reject(i: RequestMethod[I]): F[Unit]
}
trait RPCClient[F[_], I, O] {
def send(id: ShortString, i: I): F[Unit]
def responses: Stream[F, ResponseMethod[O]]
def processed(i: ResponseMethod[O]): F[Unit]
}
final case class RequestMethod[I](
id: ShortString,
sender: QueueName,
payload: I,
tag: DeliveryTag
)
final case class ResponseMethod[I](
requestId: ShortString,
payload: I,
tag: DeliveryTag
)
/** RPCChannel implements an async RPC communication channel topology.
*
* In this topology, each server has its own endpoint, where clients can send
* methods to, server then can decide to response to sender's address, ignore
* the request, or reject it. Clients can then consume responses, and mark them
* as processed. This topology models an point to point communication, with at
* least one delivery semantics, so your processing MUST be idempotent and
* async, as both responses and requests might be received several times, and
* with any ordering.
*/
object RPCChannel {
/** server peer in [[lepus.std.RPCChannel]] topology */
def server[F[_]: Concurrent, I, O](
endpoint: RPCDefinition[I, O]
)(ch: Channel[F, NormalMessagingChannel[F]]): F[RPCServer[F, I, O]] = for {
_ <- ch.queue.declare(endpoint.name, durable = true)
} yield new {
override def respond(req: RequestMethod[I], o: O): F[Unit] =
endpoint.serverCodec.encode(o) match {
case Left(error) => error.raiseError
case Right(msg) =>
ch.messaging.publishRaw(
ExchangeName.default,
routingKey = req.sender,
msg
) >> ch.messaging.ack(req.tag, false)
}
override def requests: Stream[F, RequestMethod[I]] = ch.messaging
.consumeRaw(endpoint.name, noAck = false)
.flatMap(env =>
endpoint.clientCodec.decode(env.message) match
case Left(error @ _) =>
Stream.exec(reject(env.deliveryTag))
case Right(value) =>
val senderQ =
value.properties.replyTo
.flatMap(QueueName.from(_).toOption)
val msgId = value.properties.messageId
msgId
.zip(senderQ)
.fold(Stream.exec(reject(env.deliveryTag))) { (id, sender) =>
Stream.emit(
RequestMethod(
id,
sender,
value.payload,
env.deliveryTag
)
)
}
)
override def ignore(i: RequestMethod[I]): F[Unit] =
ch.messaging.ack(i.tag)
override def reject(i: RequestMethod[I]): F[Unit] = reject(i.tag)
private def reject(dtag: DeliveryTag) =
ch.messaging.reject(dtag, false)
}
/** client peer in [[lepus.std.RPCChannel]] topology */
def client[F[_], I, O](
endpoint: RPCDefinition[I, O],
persistent: Option[QueueName] = None
)(
ch: Channel[F, NormalMessagingChannel[F]]
)(using F: Concurrent[F]): F[RPCClient[F, I, O]] = for {
q <- persistent match {
case None =>
ch.queue
.declare(QueueName.autoGen, exclusive = true)
.flatMap(
F.fromOption(_, new UnknownError("Must respond with Queue name"))
)
.map(_.queue)
case Some(value) => ch.queue.declare(value, durable = true).as(value)
}
} yield new {
override def send(id: ShortString, i: I): F[Unit] = endpoint.clientCodec
.encode(i)
.fold(
_.raiseError,
msg =>
ch.messaging.publishRaw(
ExchangeName.default,
routingKey = endpoint.name,
msg.withMessageId(id).withReplyTo(q)
)
)
override def responses: Stream[F, ResponseMethod[O]] = ch.messaging
.consumeRaw(q, noAck = false)
.flatMap(env =>
endpoint.serverCodec.decode(env.message) match
case Left(error) => Stream.exec(reject(env.deliveryTag))
case Right(value) =>
value.properties.correlationId
.fold(Stream.exec(reject(env.deliveryTag)))(responseTo =>
Stream.emit(
ResponseMethod(responseTo, value.payload, env.deliveryTag)
)
)
)
override def processed(i: ResponseMethod[O]): F[Unit] =
ch.messaging.ack(i.tag)
private def reject(dtag: DeliveryTag) =
ch.messaging.reject(dtag, requeue = false)
}
}