-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.gleam
138 lines (123 loc) · 4.11 KB
/
handler.gleam
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
//// This module provides a subscription handler that can be used
//// to subscribe to NATS subjects and receive them in your own
//// handler.
import gleam/dynamic.{Dynamic}
import gleam/result
import gleam/map.{Map}
import gleam/option.{None, Some}
import gleam/otp/actor
import gleam/erlang/process.{Subject}
import glats/connection.{Command}
import glats/message.{Message}
/// Callback handler that should be provided to handle_subscription to process
/// the received messages from a subscription.
pub type MessageHandler =
fn(Message, Subject(Command)) -> Result(Nil, String)
/// Callback handler that should be provided to handle_request to process the
/// the received request and return a response.
pub type RequestHandler =
fn(Request, Subject(Command)) -> Result(Response, String)
/// Request includes headers and body from a request message.
pub type Request {
Request(headers: Map(String, String), body: String)
}
/// Response includes headers and body for a response to a request.
pub type Response {
Response(headers: Map(String, String), body: String)
}
// State for subscription handler.
type SubscriptionState {
SubscriptionState(conn: Subject(Command), handler: MessageHandler)
}
// State for a request handler.
type RequestState {
RequestState(conn: Subject(Command), handler: RequestHandler)
}
// Externals from Gnat
external fn convert_msg(Dynamic) -> Result(Message, String) =
"Elixir.Glats" "convert_msg"
/// Starts an actor that will handle receiving messages from a NATS subject and
/// call your handler.
pub fn handle_subscription(
conn: Subject(Command),
subject: String,
handler: MessageHandler,
) {
// Start a new actor that will subscribe to NATS messages.
actor.start_spec(actor.Spec(
init: fn() {
let receiver = process.new_subject()
let selector =
process.new_selector()
|> process.selecting_anything(map_message)
// Send a subscription request to connection process.
case connection.subscribe(conn, receiver, subject) {
Ok(_) -> actor.Ready(SubscriptionState(conn, handler), selector)
Error(err) -> actor.Failed(err)
}
},
init_timeout: 10_000,
loop: fn(msg: Message, state: SubscriptionState) {
case state.handler(msg, state.conn) {
Ok(_) -> actor.Continue(state)
Error(_) -> actor.Stop(process.Abnormal("handler returned error!"))
}
},
))
}
/// Starts an actor that will handle received requests on a NATS subject and
/// handle responding to them with what's returned from the provided handler.
pub fn handle_request(
conn: Subject(Command),
subject: String,
handler: RequestHandler,
) {
// Start a new actor that will subscribe to NATS messages.
actor.start_spec(actor.Spec(
init: fn() {
let receiver = process.new_subject()
let selector =
process.new_selector()
|> process.selecting_anything(map_message)
// Send a subscription request to connection process.
case connection.subscribe(conn, receiver, subject) {
Ok(_) -> actor.Ready(RequestState(conn, handler), selector)
Error(err) -> actor.Failed(err)
}
},
init_timeout: 10_000,
loop: request_handler_loop,
))
}
// Handler messages for request handler.
fn request_handler_loop(msg: Message, state: RequestState) {
case msg.reply_to {
Some(reply_to) ->
case state.handler(Request(msg.headers, msg.body), state.conn) {
Ok(response) ->
case
connection.publish_message(
state.conn,
Message(
subject: reply_to,
headers: response.headers,
reply_to: None,
body: response.body,
),
)
{
Ok(Nil) -> actor.Continue(state)
Error(err) -> actor.Stop(process.Abnormal(err))
}
Error(err) -> actor.Stop(process.Abnormal(err))
}
None -> actor.Continue(state)
}
}
/// Converts the dynamic data returned from Gnat in elixir to a typed message
/// in gleam.
fn map_message(msg: Dynamic) {
msg
|> convert_msg
|> result.unwrap(Message("error", map.new(), None, "body"))
}