Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 392 lines (323 sloc) 12.583 kb
fccc685 Initial open-source release
MLstate authored
1 (*
2 Copyright © 2011 MLstate
3
4 This file is part of OPA.
5
6 OPA is free software: you can redistribute it and/or modify it under the
7 terms of the GNU Affero General Public License, version 3, as published by
8 the Free Software Foundation.
9
10 OPA is distributed in the hope that it will be useful, but WITHOUT ANY
11 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
13 more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with OPA. If not, see <http://www.gnu.org/licenses/>.
17 *)
18
19 exception Syscall (* temporary, could probably be removed for new scheduler *)
20
21 (**
22 Module to manage asynchronous operations.
23
24 @author Henri Binsztok
25 @author Laurent Le Brun
26 @author Frederic Ye
27 @author Cedric Soulas
28 @author Hugo Venturini
29 *)
30
31 (**
32 This module provides functions to register
33 - asynchronous operations : [listen], [read] and [write] over sockets and file descriptors.
34 - asynchronous calculations : [sleep], [timer] and [yield].
35
36 The scheduler is asynchronous and non-preemptive {e i.e.} operations and calculation are registers
37 for sometime later in the future and they cannot interrupt one another.
38 The general mechanism is to register operations ([read], [write], [listen]) and calculation
39 ([sleep], [timer], [yield]) and to call for their execution ([wait]).
40 *)
41
42 (** {6 Types} *)
43
44 type t
45 (** The type of schedulers *)
46
47 type connection_info = { (* Temporarily exported for debug *)
48 addr : NetAddr.t; (* the connection type *)
49 conn_id : int (* the unique id of the connection *)
50 }
51 (** The type of connections *)
52
53 type async_key
54 (** The type of a key associated to an asynchronous job. See [listen] and [sleep]. *)
55
56 (** {6 Exceptions} *)
57
58 exception Timeout
59 (** Raised when a timeout expires. *)
60
61 exception Connection_closed
62 (** Raised when trying to connect through a closed connection. *)
63
64 exception Busy_direction
65 (** Raised when another read event is already waiting for data on that connection *)
66
67 exception StopTimer
68 (** Raised to stop the [timer] *)
69
70 exception Empty
71 (** Raised when there is nothing queued in the scheduler *)
72
73 exception Unbound_key
74 (** Raised when an key of type [async_key] is unbound *)
75
76 (** {6 Control of the scheduler} *)
77
78 val make : ?is_server:bool -> unit -> t
79 (** @return a fresh [Scheduler.t]
80 @param is_server (optional argument) Setting this parameter to true adds a server startup message. Default: [false]
81 *)
82
83 val default : t
84 (** A default scheduler *)
85
86 val wait : t -> block:bool -> bool
87 (** [wait sched block] Performs the following action in that order:
88 - Executes functions corresponding to operations which reached a deadline.
89 - Execute the next pending operation.
90
91 @param block Boolean stating whether the waiting blocks or not
92 {e i.e.} if [block] then it waits until any incoming operation,
93 otherwise it returns right after the end of execution.
94
95 @return [true] if there still are any pending events or any opened connections, [false] otherwise.
96 *)
97
98 val is_empty : t -> bool
99 (** @return [true] if and only if there is nothing queued in the scheduler, {e e.g.} no connections
100 open and no events or timeouts pending. This method is needed since [wait]
101 processes one event and may return [false] whether something was done or
102 not. *)
103
104 (** {6 Manipulation of connections} *)
105
106 val make_connection : t -> ?register:bool -> NetAddr.t -> connection_info
107 (** @return a fresh connection.
108 @param register States whether the new connection should be registered in the scheduler provided or not. Default: [true]
109 *)
110
111 val remove_connection :
112 t -> connection_info -> unit
113 (**
114 Removes the given connection.
115 *)
116
117 val check_connection :
118 t -> connection_info -> bool
119 (**
120 @return [true] if the given connection is registered in the given scheduler, [false] otherwise.
121 *)
122
123 val get_connection_addr: connection_info -> NetAddr.t
124 (**
125 @return the address of the given connection.
126 *)
127
128 val get_connection_inet_addr : connection_info -> Unix.inet_addr
129
130 val get_connection_fd : connection_info -> Unix.file_descr
131
132 val nb_of_connection : t -> int
133 (**
134 @return the number of registered connections in the given scheduler
135 *)
136
137 val get_connection_secured_from_normal : connection_info -> Ssl.socket -> connection_info
138 (**
139 Creates a secured connection from a given connection and a given socket.
140 *)
141
142 (** {6 Asynchronous operation over [connection_info]} *)
143
144 (**
145 An operation is register in a given scheduler and is set over a connection. It must
146 provide a continuation, among other parameters specific to each operation e.g. a string to write.
147
148 A {b continuation} is a function which returns [unit] and which will be executed when
149 the corresponding operation is triggered.
150
151 An continuation to handle errors can be provide. If an error happen, this error continuation will be used *instead* of the normal continuation.
152
153 Beside [listen], any operation can be registered with a timeout and/or an error continuation.
154 This allows you a fine-grained handling of continuations over time. But {b with great power comes
155 great responsabilities}. The policy is the following: if you don't provide any error continuation,
156 we'll handle the connection you provided at registration, fine. But if you decide to provide an error
157 continuation, you will have to explicitly remove the connection (see [remove_connection] above).
158 *)
159
160 val listen :
161 t ->
162 connection_info ->
163 ?timeout:Time.t ->
164 ?err_cont:(exn -> unit) ->
165 (unit -> unit)
166 -> async_key
167 (**
168 Listen for events onto the given connection.
169 @return the associated key that can be used with [abort].
170 *)
171
172 val listen_once :
173 t ->
174 connection_info ->
175 ?timeout:Time.t ->
176 ?err_cont:(exn -> unit) ->
177 (unit -> unit)
178 -> unit
179 (**
180 Listen for one event coming on a given connection.
181 *)
182
183 val connect :
184 t ->
185 connection_info ->
186 ?timeout:Time.t ->
187 ?err_cont:(exn -> unit) ->
188 (unit -> unit)
189 -> unit
190 (**
191 Connect on a given connection.
192 *)
193
194 val read_more :
195 t ->
196 connection_info ->
197 ?read_max:int ->
198 ?block_size:int ->
199 ?timeout:Time.t ->
200 FBuffer.t ->
201 ?size_max:int ->
202 ?err_cont:(exn -> unit) ->
203 (int * FBuffer.t -> unit) -> unit
204 (** Reads more data over a [connection_info], appending those new data in the providing buffer
205 @raise Busy_direction exception is raised when another callback is already waiting
206 for datas on that [connection_info] *)
207
208 val read_content :
209 t ->
210 connection_info ->
211 ?read_max:int ->
212 ?block_size:int ->
213 ?timeout:Time.t ->
214 Rcontent.content ->
215 ?size_max:int ->
216 ?err_cont:(exn -> unit) ->
217 (int * Rcontent.content -> unit) -> unit
218
219 val read_more2 :
220 t ->
221 connection_info ->
222 ?read_max:int ->
223 ?timeout:Time.t ->
224 Buffer.t ->
225 ?size_max:int ->
226 ?err_cont:(exn -> unit) ->
227 (int * Buffer.t -> unit) -> unit
228
229 val read :
230 t ->
231 connection_info ->
232 ?timeout:Time.t ->
233 ?err_cont:(exn -> unit) ->
234 (int * string -> unit) -> unit
235 (** Reads over a [connection_info], a maximum of 4096 characters.
236 @raise Busy_direction exception is raised when another callback is already waiting
237 for datas on that [connection_info] *)
238
239 val read_from :
240 t ->
241 connection_info ->
242 ?timeout:Time.t ->
243 ?err_cont:(exn -> unit) ->
244 (int * Unix.sockaddr * string -> unit) -> unit
245 (** Reads over a [connection_info], a maximum of 4096 characters. The difference with
246 [read] is that the callback is additionally informed of the address of the sender
247 of the message. This is useful for un-connected UDP communication.
248 @raise Busy_direction exception is raised when another callback is already waiting
249 for data on that [connection_info] *)
250
251 val read_until :
252 t ->
253 connection_info ->
254 (int * string -> bool) ->
255 ?block_size:int ->
256 ?timeout:Time.t ->
257 ?err_cont:(exn -> unit) ->
258 (int * string -> unit) -> unit
259 (** [read_until sched conn read_cond cont] reads over a [connection_info],
260 until read_cond returns true from the provided couple (number of characters, data) read.
261 It can be used, for example, to read a minimum number of characters or for your callback
262 to be called only if the data ends with certain characters.
263 @raise Busy_direction exception is raised when another callback is already waiting
264 for datas on that [connection_info] *)
265
266 val read_min :
267 t ->
268 connection_info ->
269 int ->
270 ?block_size:int ->
271 ?timeout:Time.t ->
272 ?err_cont:(exn -> unit) ->
273 (int * string -> unit) -> unit
274 (** [read_min sched conn read_min cont] reads over a [connection_info],
275 a minimum of [read_min] characters.
276 @raise Busy_direction exception is raised when another callback is already waiting
277 for datas on that [connection_info] *)
278
279 val read_lines :
280 t ->
281 connection_info ->
282 ?block_size:int ->
283 ?timeout:Time.t ->
284 ?err_cont:(exn -> unit) ->
285 (int * string -> unit) -> unit
286 (** [read_lines sched conn cont] reads over a [connection_info],
287 and call you callback only if last characters are "\r\n",
288 it means only when the data is a line or a set of lines.
289 @raise Busy_direction exception is raised when another callback is already waiting
290 for datas on that [connection_info] *)
291
292 val read_all :
293 t ->
294 connection_info ->
295 ?read_max:int option->
296 ?block_size:int ->
297 ?timeout:Time.t ->
298 ?buf:FBuffer.t ->
299 ?size_max:int ->
300 ?err_cont:(exn -> unit) ->
301 (int * FBuffer.t -> unit) -> unit
302 (** Reads until the connection is closed.
303 It means your callback will be called only at the end of the connection.
304 See [read] for further information.
305 @raise Connection_closed exception is raised when the connection on which it was reading is closed.
306 *)
307
308 val write :
309 t ->
310 connection_info ->
311 ?block_size:int ->
312 ?timeout:Time.t ->
313 string ->
314 ?err_cont:(exn -> unit) ->
315 (int -> unit) -> unit
316 (** Writes over a [connection_info]
317 Several writes on the same connection are permited. In this case,
318 datas will be written in the same order as the [write] declarations.
319 *)
320
321 val write_to :
322 t ->
323 connection_info ->
324 Unix.sockaddr ->
325 ?block_size:int ->
326 ?timeout:Time.t ->
327 string ->
328 ?err_cont:(exn -> unit) ->
329 (int -> unit) -> unit
330 (** Writes over a [connection_info]
331 Several writes on the same connection are permited. In this case,
332 datas will be written in the same order as the [write] declarations.
333 *)
334
335 (** {6 Asynchronous calculation} *)
336
337 val sleep : t -> Time.t -> (unit -> unit) -> async_key
338 (**
339 [sleep sched x f] schedules the execution of [f] in time interval [x] in [sched]
340 @return the associated key that can be used with [abort].
341 *)
342
343 val abort : t -> async_key -> unit
344 (**
345 [abort sched key] abort the pending job associated the [key].
346 @raise Unbound_key if the key is unbound
347 *)
348
349 val timer : t -> Time.t -> (unit -> unit) -> unit
350 (** [timer sched x f] schedules the execution of [f] every time interval [x] in [sched].
351
352 If you don't define an explicit stop to the timer, it will be call forever (and ever).
353 Although you might have excellent reasons not to stop the timer, feel free to copy the
354 following PATTERN:
355 {v
356 let f () =
357 if <condition> then <do_somthing>
358 else raise Scheduler.StopTimer;
359 in Scheduler.timer sched x f;
360
361 v}
362 *)
363
364
365 val push : t -> (unit -> unit) -> unit
366 (** [push sched f] Push in [sched] the task [f]. *)
367
368 val at_exit : t -> (unit -> unit) -> unit
369 (** [at_exit sched f] Push in [sched] the task [f] to be done at the end of the program. *)
370
371 val flush : ?f:(unit -> unit) -> t -> unit
372 (** Execute all asynchronous calculation and operation. Optionally given [f] will be evaluated after
373 performing any scheduled operation. *)
374
375 val loop_until : t -> (unit -> bool) -> unit
376 (** [flush_until sched cond] Execute all asynchronous calculation and operation until [cond ()] is true
377 @raise Empty if cond not satisfied and there is nothing queued in the scheduler.
378 @raise Reentrant_routine if another [wait], [flush] or [flush_until] is yet in progress.
379 *)
380
381 val finalise : t -> ('a -> unit) -> 'a -> unit
382 (**
383 [finalise sched f v] registers f as a finalisation function for v.
384 It is permited, and only with this finalisation function, to use the
385 scheduler inside the finalisation function.
386 WARNING: contrary to [Gc.finalise], the order of calls
387 to finalisation functions is not guarantee.
388 *)
389
390 val run : t -> unit
391 (** Like [flush] but catch all exn *)
Something went wrong with that request. Please try again.