Skip to content

Commit 97b5beb

Browse files
committed
wip: add eventsource abstraction for handling SSE
1 parent 9015595 commit 97b5beb

File tree

3 files changed

+204
-0
lines changed

3 files changed

+204
-0
lines changed

devkit.opam

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ build: [
1313
depends: [
1414
"ocaml" {>= "4.05.0"}
1515
"dune" {>= "2.0"}
16+
"angstrom"
1617
("extlib" {>= "1.7.1"} | "extlib-compat" {>= "1.7.1"})
1718
"ounit2"
1819
"camlzip"

eventsource.ml

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
(* client SSE implementation, Similar to EventSource object but differs where sensible
2+
https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events *)
3+
4+
type event = Comment of string | Event of (string * string) | Data of string
5+
type ready_state = Connecting | Open | Closed
6+
7+
type t = {
8+
url : string;
9+
request : Curl.t;
10+
mutable reconnection_time : int; (* in miliseconds *)
11+
event : Buffer.t;
12+
data : Buffer.t;
13+
mutable last_event_id : string;
14+
mutable ready_state : ready_state;
15+
}
16+
17+
module Parse = struct
18+
open Angstrom
19+
20+
type ev = Comment of string | Field of (string * string)
21+
22+
let pp_event fmt = function
23+
| Comment s -> Format.fprintf fmt "Comment %s" s
24+
| Field (f, s) -> Format.fprintf fmt "Field (%s, %s)" f s
25+
26+
let char_list_to_string char_list =
27+
let len = List.length char_list in
28+
let bytes = Bytes.create len in
29+
List.iteri (Bytes.set bytes) char_list;
30+
Bytes.unsafe_to_string bytes
31+
32+
(* Characters *)
33+
let lf' = '\x0A'
34+
let cr' = '\x0D'
35+
let colon' = ':'
36+
let space' = ' '
37+
38+
(* Helper range checkers *)
39+
let is_any_char c = List.for_all (fun n -> not @@ Char.equal c n) [ lf'; cr' ]
40+
41+
let is_name_char c =
42+
List.for_all (fun n -> not @@ Char.equal c n) [ lf'; cr'; colon' ]
43+
44+
(* tokens *)
45+
let lf = char lf'
46+
let cr = char cr'
47+
let colon = char colon'
48+
let space = char space'
49+
let bom = string "\xFEFF"
50+
let any_char = satisfy is_any_char
51+
let name_char = satisfy is_name_char
52+
53+
(* Rules *)
54+
let end_of_line =
55+
choice [ both cr lf *> return (); cr *> return (); lf *> return () ]
56+
57+
let comment =
58+
lift3
59+
(fun _ comment _ -> Comment (char_list_to_string comment))
60+
colon (many any_char) end_of_line
61+
<?> "comment"
62+
63+
let field =
64+
lift3
65+
(fun name value _ ->
66+
Field (char_list_to_string name, char_list_to_string value))
67+
(many1 name_char)
68+
(option [] (colon *> option space' space *> many any_char))
69+
end_of_line
70+
<?> "field"
71+
72+
let event =
73+
many
74+
(choice ~failure_msg:"Couln't parse comment or field" [ field; comment ])
75+
<* end_of_line <?> "event"
76+
77+
let stream = option "" bom *> map (many event) ~f:List.flatten <?> "stream"
78+
79+
(* Parse *)
80+
let parse_string = Angstrom.parse_string ~consume:Prefix stream
81+
82+
let parse_string_debug s =
83+
match parse_string s with
84+
| Ok result ->
85+
let pp_event_list ppf =
86+
Format.(pp_print_list ~pp_sep:pp_print_cut pp_event ppf)
87+
in
88+
Format.printf "@[Parsed successfully: @[<v>%a@]@]@." pp_event_list
89+
result
90+
| Error msg -> Printf.printf "Parsing failed: %s\n" msg
91+
92+
let interpret_event t : ev -> event option = function
93+
| Comment s -> Some (Comment s)
94+
| Field (field, data) -> (
95+
match field with
96+
| "event" -> Some (Event (field, data))
97+
| "data" -> Some (Data data)
98+
| "id" ->
99+
if data.[0] <> '\x00' then t.last_event_id <- data;
100+
None
101+
| "retry" ->
102+
t.reconnection_time <- int_of_string data;
103+
None
104+
| f ->
105+
Printf.eprintf "Got unknown field \"%s\", ignoring\n" f;
106+
None)
107+
end
108+
109+
let make ?(reconnection_time = 3000) ?(max_reconnect_attempt = 3)
110+
?(headers = []) ?body ?(event_callback = ignore)
111+
?(comment_callback = ignore) ~url callback =
112+
let t =
113+
{
114+
url;
115+
request = Curl.init ();
116+
reconnection_time;
117+
event = Buffer.create 10;
118+
data = Buffer.create 4096;
119+
last_event_id = "";
120+
ready_state = Connecting;
121+
}
122+
in
123+
Curl.setopt t.request (CURLOPT_MAXREDIRS max_reconnect_attempt);
124+
Curl.set_httpheader t.request ("Accept" :: "text/event-stream" :: headers);
125+
Option.may
126+
(fun body ->
127+
Curl.set_postfields t.request body;
128+
Curl.set_postfieldsize t.request (String.length body))
129+
body;
130+
Curl.set_url t.request url;
131+
Curl.set_writefunction t.request (fun chunk ->
132+
(match Angstrom.parse_string ~consume:Prefix Parse.stream chunk with
133+
| Ok data ->
134+
List.iter
135+
(fun ev ->
136+
match Parse.interpret_event t ev with
137+
| Some (Comment s) -> comment_callback s
138+
| Some (Event pair) -> event_callback pair
139+
| Some (Data d) -> callback d
140+
| None -> ())
141+
data
142+
| Error e -> Printf.eprintf "Parse error: %s" e);
143+
String.length chunk);
144+
145+
(* Reconnection logic *)
146+
let rec perform_with_reconnect n =
147+
let%lwt curlCode = Curl_lwt.perform t.request in
148+
let code = Curl.int_of_curlCode curlCode in
149+
match code / 100 with
150+
| 2 ->
151+
t.ready_state <- Closed;
152+
Lwt.return_unit
153+
| _ ->
154+
Printf.eprintf "Connection broken: %d" code;
155+
if n <= 0 then (
156+
Printf.eprintf
157+
"Exceeded maximum connection retries, closing connection...";
158+
Lwt.return_unit)
159+
else (
160+
Printf.eprintf "Attempting to reconnect after %d ms"
161+
t.reconnection_time;
162+
let%lwt () =
163+
Lwt_unix.sleep (float_of_int (t.reconnection_time / 1000))
164+
in
165+
(* convert to seconds *)
166+
perform_with_reconnect (n - 1))
167+
in
168+
Lwt.async (fun () -> perform_with_reconnect max_reconnect_attempt);
169+
t
170+
171+
let ready_state { ready_state; _ } = ready_state
172+
173+
let close t =
174+
Curl.cleanup t.request;
175+
t.ready_state <- Closed

test_eventsource.ml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
open Eventsource
2+
3+
(* Testing the parser *)
4+
let test_inputs =
5+
[
6+
(* Single event with data *)
7+
"data: Hello, World!\n\n";
8+
(* Event with multiple data lines *)
9+
"data: This is the first line\ndata: This is the second line\n\n";
10+
(* Event with ID and data *)
11+
"id: 1\ndata: Event data\n\n";
12+
(* Event with type, ID, and data *)
13+
"event: message\nid: 42\ndata: Hello, SSE!\n\n";
14+
(* Event with comments *)
15+
": This is a comment\ndata: Comment test\n\n";
16+
(* Multiple events in a stream *)
17+
"id: 1\ndata: Event 1\n\nid: 2\nevent: update\ndata: Event 2\n\n";
18+
(* Event with Unicode characters *)
19+
"data: Привет, мир!\nid: 3\n\n";
20+
(* Event with trailing space after colon *)
21+
"data: \n\n";
22+
(* Event with BOM *)
23+
"\xFEFFdata: Includes BOM\n\n";
24+
(* Empty event *)
25+
"\n";
26+
]
27+
28+
let () = List.iter Parse.parse_string_debug test_inputs

0 commit comments

Comments
 (0)