-
Notifications
You must be signed in to change notification settings - Fork 125
/
schedulerKer.mli
415 lines (345 loc) · 11.5 KB
/
schedulerKer.mli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
(*
Copyright © 2011 MLstate
This file is part of OPA.
OPA is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License, version 3, as published by
the Free Software Foundation.
OPA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
more details.
You should have received a copy of the GNU Affero General Public License
along with OPA. If not, see <http://www.gnu.org/licenses/>.
*)
(**
The low level module for [Scheduler] and [SchedulerJob].
This module register and process jobs depending on special events or conditions.
@author Cedric Soulas
*)
(**
This module handle IO operations.
An IO operation is associated to a [id] (a file descriptor) and a [direction] ([In] to read, and [Out] to write).
Several operations can be queued for the same ([id], [direction]) couple.
The first operation, ready to be processed, is called a candidate.
A poll system (called with [wait]) is used to retrieve wich
([id], [direction]) are ready to be processed (with [process], [process_id] or [process_all]).
@inline doc
*)
module Operation :
sig
type t
type key = int
type id = Unix.file_descr
type direction = In | Out
exception Busy_direction
(**
This exception is not used by the module itself, but can be used
to restrict operation queuing
*)
exception Not_found of (id * direction)
(**
Raised when ([id], [direction]) is not found
*)
exception Unbound_key of key
exception Existent_key of key
val make : unit -> t
(**
@return a fresh [t].
*)
(** {6 Test existence} *)
val length : t -> int
(**
@return the number of queue associated to a ([id], [direction]) couple.
*)
val is_empty : t -> bool
(**
[is_empty o] Is equivalent to [length o = 0]
*)
val mem : t -> id -> direction -> bool
(**
@return true if a queue exist for the ([id], [direction]) couple.
*)
val mem_key : t -> key -> bool
(**
@return true if an operation is associated to [key].
*)
(** {6 Add / remove operation} *)
val add : t -> id -> direction -> key -> (unit -> unit) -> (exn -> unit) -> unit
(**
[add o id d key callback callback_error] add a new operation for the [id] on the direction [d].
If another operation exists on that id and direction, the [callback] is queued,
otherwise, this operation is candidate for processing.
See [remove] for more information.
Raise [Existent_key] is [key] already exists
*)
val replace : t -> key -> (unit -> unit) -> (exn -> unit) -> unit
(**
[replace o key callback callback_error] replace the current callback
of the operation associated to [key] with a new [callback], for the [id] on direction [d].
Raise an [Unbound_key] if the [key] is unbound
*)
val remove_id : t -> id -> unit
(**
[remove_id o id] remove all operations associated to the id [id].
*)
val remove : t -> key -> unit
(**
[remove o key] remove the operation associated to [key].
*)
val wait :
t -> int ->
(id * Epoll.supported_event list) array
(**
[wait o t] wait for a maximum of [t] milliseconds.
@return a maximum of 1000 ([id], [Epoll.supported_event list]) representing
operations ready to be processed.
See [process_all] for more information.
*)
(** {6 Process operations} *)
val process : t -> id -> direction -> unit
(**
[process o id d] process the current candidate operation ([id], [d]).
It *doesn't* remove this candidate from the queue.
Raise Operation_not_found (id, d) if no operation is candidate.
*)
val process_error : t -> id -> direction -> exn -> unit
(**
[process_error o id d] process the error callback of the current candidate ([id], [d]).
It *doesn't* remove this candidate from the queue.
Raise Operation_not_found (id, d) if no operation is candidate.
*)
val process_id_error : t -> id -> exn -> unit
(**
[process_id_error o id e], process in both direction, if it exists,
process the error callback, with [e] argument, of the first candidate of the queue.
It *doesn't* remove those candidate from the queue.
This function use [process_error].
*)
val process_all : t -> (id * Epoll.supported_event list) array -> id list
(**
[process_all o a] process the array [a] of ([id], [Epoll.supported_event list]).
The [Epoll.supported_event list] describes the list of events ready to be processed for the id [id].
@return the list of [id] associated to an error like connection hang up or closed, as describe in the [Epoll.supported_event list].
This function use [process].
*)
(** {6 Misc.} *)
val direction_to_string : direction -> string
end
(**
This module handle callbacks to be processed after a certain amout of time.
"After" means after, not necessarily exactly on timeout.
Each callback registered is associated to a [key] (see [add]).
The couple (callback, key) is called a job.
The most priority job can be processed with the [process] function.
@inline doc
*)
module Priority :
sig
exception Timeout
(**
This exception is not used by the module itself,
but can be used on timeout event.
*)
type t
type key = int
exception Existent_key of key
val make : unit -> t
(**
@return a fresh [t].
*)
(** {6 Test existence} *)
val length : t -> int
(**
@return the number of jobs yet to be processed.
*)
val is_empty : t -> bool
(**
@return true if no jobs are registered, false otherwise.
[is_empty p] is equivalent to [length p = 0].
*)
(** {6 Add / remove jobs} *)
val add : t -> key -> Time.t -> (unit -> unit) -> unit
(**
[add p key t callback] add a callback, associated to a [key], to be executed throught the [process] function after time intrval [t]
See [execute] for more information.
Raise an [Existent_key] if the [key] already exists.
*)
val mem : t -> key -> bool
(**
[mem p key] return [true] if the job idenfied by the [key] exists.
*)
val remove : t -> key -> unit
(**
[remove p key] remove the job idenfied by the key [key].
It does nothing if [key] is not bound in [p].
See [add] for more information.
*)
val clear : t -> unit
(**
[clear p ] remove all jobs
*)
(** {6 Process a job} *)
val process : t -> int
(**
Process and *remove* the next priority job. A maximum of *one* job is processed.
The next priority job is dependent of timeouts set for the jobs.
@return 0 if one or several other jobs have reach their tiemouts and are ready to be processed,
-1 if no job was processed,
the timeout (in milliseconds) of the most priority job otherwise.
*)
end
(**
This module handle a set of file descriptors.
@inline doc
*)
module Descriptor :
sig
type t
type id = Unix.file_descr
type key = int
type mem_response = Alive | Replaced | Closed
(**
See [mem] for more information
*)
val make : unit -> t
(** {6 Test existence} *)
val length : t -> int
(**
@return the number of [Alive] descriptors.
See [mem] for more information.
*)
val is_empty : t -> bool
(**
[is_empty d] is equivalent to [length d = 0]
*)
val mem : t -> id -> key -> mem_response
(**
@return a [mem_response] for a given ([id], [key]).
Alive means the [key] for that [id] is still present.
Replaced means [id] exist, but is not associated to [key] anymore.
Closed means [id] doesn't exist anymore.
*)
(** {6 Add / remove descriptors} *)
val add : t -> id -> key
(**
[add d key] generate and store a new [key] for the [id].
If a key already exists for that id,
that key is removed without warning.
@return the generated key.
*)
val remove : t -> id -> unit
(**
[remove d id] remove the key associated to [id].
*)
end
(**
This module handle a set of task to execute. Is implemented with a
very simple structure and it is ideal for simple operation.
@inline doc
*)
module Compute :
sig
type t
(** Create an empty scheduler. *)
val make : unit -> t
(** {6 Test existence} *)
(** Returns the number of task into a scheduler. *)
val length : t -> int
(** Test if the scheduler is empty *)
val is_empty : t -> bool
(** {6 Push tasks}*)
(** Push one jobs to execute into a scheduler*)
val push : t -> (unit -> unit) -> unit
(** Process some jobs was pushed on a scheduler. *)
val process : t -> unit
(** Remove all jobs of a scheduler. *)
val clear : t -> unit
(** Transfer all jobs from a scheduler to another *)
val rev_transfer : src:t -> dest:t -> unit
end
(**
It is not permited to use [Gc.finalise f v] directly because [f] can
occures at any time.
This module store those finalise callback, to be processed at a safe
moment.
@inline doc
*)
module Finalise :
sig
type t
val make : unit -> t
(**
@return a fresh [t]
*)
val length : t -> int
(**
@return the number of finalisation function ready
to be processed
*)
val is_empty : t -> bool
(**
[is_empty f] is equivalent to [length f = 0]
*)
val add : t -> ('a -> unit) -> 'a -> unit
(**
[add f v] registers [f] as a finalisation function for [v].
Waiting finalisation functions are executed with [process_all].
*)
val process_all : t -> unit
(**
Process all waiting finalisation functions.
WARNING: contrary to Gc.finalise, the order of calls
to finalisation functions is not guarantee.
*)
end
(**
This module handle the number of reentrant routines
and the number of synchronous computations authorized to
guarantee a good qualities of service.
It also manage a set of unique key.
@inline doc
*)
module Counter :
sig
type t
exception Sync_limit
val make : unit -> t
(** Return a fresh counter of type [t] *)
(** {6 Key getter} *)
val get_next_int : t -> int
(**
Return an int and increase the counter.
[get_key] and [get_next_key] never return the same key.
You can only use this key to store data in a structure that hash the key,
like hash tables.
Be aware that when max_int is reached, the counter will restart from min_int.
*)
val get_key : t -> int
(**
Return an unused int to be used as a key.
You have to release the key with [release_key]
as soon as you can because the number of those keys is limited.
If the limit is reach, the number of key is automaticaly doubled.
This function is usefull to store callbacks in an resizable array.
*)
val release_key : t -> int -> unit
(** Release a previous used key *)
val incr_level : t -> unit
(**
Increment the reentrant level counter.
Raise [Reentrant_routine] if already incremented.
*)
val decr_level : t -> unit
(**
Decrement the reentrant level counter.
*)
val incr_sync : t -> unit
(**
Increment the synchronous counter.
Raise [Sync_limit] if, according to the counter value, the computation have to be asynchronous.
*)
val decr_sync : t -> unit
(**
Decrement the synchronous counter.
*)
end