Skip to content

Commit 750b789

Browse files
committed
server: init() instead of initial ctx, on_close callback in bistream
1 parent 9c3a426 commit 750b789

File tree

4 files changed

+59
-35
lines changed

4 files changed

+59
-35
lines changed

examples/routeguide/src/server.ml

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,12 @@ let get_server (features : feature_list) clock =
7171
module ListFeatures = struct
7272
type context = Pb.feature Seq.t
7373

74-
let initial_context = list_take 10 features |> List.to_seq
74+
let init () =
75+
Printf.printf "[S_STREAMING] /ListFeatures\n%!";
76+
list_take 10 features |> List.to_seq
7577

7678
let handler : Pb.rectangle -> context -> Pb.feature option * context =
7779
fun rectangle ->
78-
Printf.printf "[S_STREAMING] /ListFeatures\n%!";
7980
Format.printf "[S_STREAMING] Received a rectangle %a@." Pb.pp_rectangle
8081
rectangle;
8182
fun feature_seq ->
@@ -93,10 +94,11 @@ let get_server (features : feature_list) clock =
9394
module RecordRoute = struct
9495
type context = int * int * int * Pb.point option * float
9596

96-
let initial_context = (0, 0, 0, None, 0.)
97+
let init () =
98+
Printf.printf "[C_STREAMING] /RecordRoute\n%!";
99+
(0, 0, 0, None, 0.)
97100

98101
let reader : context -> Pb.point -> context =
99-
(* Printf.printf "[C_STREAMING] /RecordRoute\n%!"; *)
100102
let start = Eio.Time.now clock in
101103
fun (point_count, feature_count, distance, last_point, _) point ->
102104
Format.printf "[C_STREAMING] Received a point: %a@." Pb.pp_point point;
@@ -132,7 +134,9 @@ let get_server (features : feature_list) clock =
132134
module RouteChat = struct
133135
type context = [ `Empty | `Note of Pb.route_note | `End ]
134136

135-
let initial_context = `Empty
137+
let init () =
138+
Printf.printf "[BI_STREAMING] /RouteChat\n%!";
139+
`Empty
136140

137141
let reader : context -> Pb.route_note option -> context =
138142
fun buffer note ->
@@ -151,6 +155,8 @@ let get_server (features : feature_list) clock =
151155
note;
152156
(Some note, `Empty)
153157
| `End -> (None, `End)
158+
159+
let on_close _ = print_endline "[BI_STREAMING] End of stream"
154160
end
155161
end in
156162
(module RouteGuideServerImplementation : Route_guide_server.Implementation)

lib/arpaca/bin/codegen.ml

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -254,21 +254,22 @@ let gen_service_server_struct ~proto_gen_module (service : Ot.service) top_scope
254254
| `Client_streaming ->
255255
p sc "type context";
256256
el sc;
257-
p sc "val initial_context : context";
257+
p sc "val init : unit -> context";
258258
p sc "val reader : (context -> %s -> context)" req_type;
259259
p sc "val respond : (context -> %s)" res_type
260260
| `Server_streaming ->
261261
p sc "type context";
262262
el sc;
263-
p sc "val initial_context : context";
263+
p sc "val init : unit -> context";
264264
p sc "val handler : %s -> (context -> %s option * context)" req_type
265265
res_type
266266
| `Bidirectional_streaming ->
267267
p sc "type context";
268268
el sc;
269-
p sc "val initial_context : context";
269+
p sc "val init : unit -> context";
270270
p sc "val reader : (context -> %s option -> context)" req_type;
271-
p sc "val writer : (context -> %s option * context)" res_type);
271+
p sc "val writer : (context -> %s option * context)" res_type;
272+
p sc "val on_close : (context -> unit)");
272273
p sc "end"
273274
in
274275

@@ -320,8 +321,7 @@ let gen_service_server_struct ~proto_gen_module (service : Ot.service) top_scope
320321
p sc "let msg, c = f c in";
321322
p sc "(Option.map %s msg, c)" encoder_func));
322323
p sc "in";
323-
p sc "Some (ServerStreaming.respond %s.initial_context handler)"
324-
impl)
324+
p sc "Some (ServerStreaming.respond %s.init handler)" impl)
325325
| `Client_streaming ->
326326
p sc {|| "%s", %S ->|}
327327
(String.concat "." (service.service_packages @ [ service_name ]))
@@ -333,9 +333,7 @@ let gen_service_server_struct ~proto_gen_module (service : Ot.service) top_scope
333333
impl decoder_func;
334334
p sc "let respond = (fun c -> %s.respond c |> %s) in" impl
335335
encoder_func;
336-
p sc
337-
"Some (ClientStreaming.respond %s.initial_context reader respond)"
338-
impl)
336+
p sc "Some (ClientStreaming.respond %s.init reader respond)" impl)
339337
| `Bidirectional_streaming ->
340338
p sc {|| "%s", %S ->|}
341339
(String.concat "." (service.service_packages @ [ service_name ]))
@@ -348,9 +346,9 @@ let gen_service_server_struct ~proto_gen_module (service : Ot.service) top_scope
348346
p sc "let reader = (fun c d -> %s.reader c (Option.map (%s) d)) in"
349347
impl decoder_func;
350348
p sc
351-
"Some (BidirectionalStreaming.respond %s.initial_context reader \
352-
writer)"
353-
impl)
349+
"Some (BidirectionalStreaming.respond %s.init reader writer \
350+
%s.on_close)"
351+
impl impl)
354352
in
355353

356354
let gen_connection_handler (sc : F.scope) =

lib/server.ml

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,12 @@ module ServerStreaming = struct
153153
{ context with read_state; parse_state }
154154
| _ -> context
155155

156-
let respond : 'a -> (Decoder.t -> 'a stream_writer) -> Reqd.handler_result =
157-
fun context get_writer ->
156+
let respond :
157+
(unit -> 'a) -> (Decoder.t -> 'a stream_writer) -> Reqd.handler_result =
158+
fun init get_writer ->
159+
let user_context = init () in
158160
let context =
159-
{
160-
read_state = Reading get_writer;
161-
parse_state = Idle;
162-
user_context = context;
163-
}
161+
{ read_state = Reading get_writer; parse_state = Idle; user_context }
164162
in
165163
let response_writer = make_response_writer (body_writer ()) in
166164
Reqd.handle ~context ~response_writer ~error_handler:stream_error_handler
@@ -223,11 +221,15 @@ module ClientStreaming = struct
223221
{ context with parse_state; user_context; read_state }
224222

225223
let respond :
226-
'a -> 'a stream_reader -> ('a -> single_writer) -> Reqd.handler_result =
227-
fun context reader respond ->
224+
(unit -> 'a) ->
225+
'a stream_reader ->
226+
('a -> single_writer) ->
227+
Reqd.handler_result =
228+
fun init reader respond ->
229+
let user_context = init () in
228230
let context =
229231
{
230-
user_context = context;
232+
user_context;
231233
read_state = Reading;
232234
reader;
233235
respond;
@@ -246,6 +248,7 @@ module BidirectionalStreaming = struct
246248
reader : 'a stream_reader;
247249
writer : 'a stream_writer;
248250
errored : Header.t list option;
251+
on_close : 'a -> unit;
249252
}
250253

251254
let body_writer () : _ context Body.writer =
@@ -300,21 +303,30 @@ module BidirectionalStreaming = struct
300303

301304
{ context with parse_state; errored; user_context }
302305

306+
let on_close : _ context -> unit =
307+
fun { user_context; on_close; _ } -> on_close user_context
308+
303309
let respond :
304-
'a -> 'a stream_reader -> 'a stream_writer -> Reqd.handler_result =
305-
fun context reader writer ->
310+
(unit -> 'a) ->
311+
'a stream_reader ->
312+
'a stream_writer ->
313+
('a -> unit) ->
314+
Reqd.handler_result =
315+
fun init reader writer user_on_close ->
316+
let user_context = init () in
306317
let context =
307318
{
308-
user_context = context;
319+
user_context;
309320
parse_state = Idle;
310321
reader;
311322
writer;
312323
errored = None;
324+
on_close = user_on_close;
313325
}
314326
in
315327
let response_writer = make_response_writer (body_writer ()) in
316-
Reqd.handle ~context ~response_writer ~error_handler:stream_error_handler
317-
~body_reader ()
328+
Reqd.handle ~on_close ~context ~response_writer
329+
~error_handler:stream_error_handler ~body_reader ()
318330
end
319331

320332
let respond_not_found =

lib/server.mli

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,25 @@ module Unary : sig
1111
end
1212

1313
module ServerStreaming : sig
14-
val respond : 'a -> (Decoder.t -> 'a stream_writer) -> Reqd.handler_result
14+
val respond :
15+
(unit -> 'a) -> (Decoder.t -> 'a stream_writer) -> Reqd.handler_result
1516
end
1617

1718
module ClientStreaming : sig
1819
val respond :
19-
'a -> 'a stream_reader -> ('a -> single_writer) -> Reqd.handler_result
20+
(unit -> 'a) ->
21+
'a stream_reader ->
22+
('a -> single_writer) ->
23+
Reqd.handler_result
2024
end
2125

2226
module BidirectionalStreaming : sig
2327
val respond :
24-
'a -> 'a stream_reader -> 'a stream_writer -> Reqd.handler_result
28+
(unit -> 'a) ->
29+
'a stream_reader ->
30+
'a stream_writer ->
31+
('a -> unit) ->
32+
Reqd.handler_result
2533
end
2634

2735
val connection_handler : route_getter -> _ Eio.Net.connection_handler

0 commit comments

Comments
 (0)