-
Notifications
You must be signed in to change notification settings - Fork 125
/
badop_protocol.ml
165 lines (142 loc) · 6.59 KB
/
badop_protocol.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
(*
Copyright © 2011 MLstate
This file is part of OPA.
OPA is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License, version 3, as published by
the Free Software Foundation.
OPA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
more details.
You should have received a copy of the GNU Affero General Public License
along with OPA. If not, see <http://www.gnu.org/licenses/>.
*)
(*
@author Louis Gesbert
**)
module Dialog = Badop_lib.Dialog
module Dialog_aux = Badop_lib.Dialog_aux
(** This file defines types used for message exchange between Badop_server and
Badop_client. As thus it's shared between the two *)
(* on non-asynchronous cps function, masquerade the result in the return value to get it back *)
let nocps : (('a -> unit) -> unit) -> 'a = fun f ->
let (r: unit) = f (fun x -> (Obj.magic x : unit)) in
Obj.magic r
module F
(Host : sig
type spoken
type understood
type revision
end)
=
struct
type revision = Host.revision
type tr_version = int
type 'which read_op = ('which,revision) Badop.generic_read_op
type 'which internal_write_op = ('which,unit,revision) Badop.generic_write_op
(* 'transaction is a parameter, because we change it to string when we want to serialise *)
type ('which,'transaction_channel) poly_transaction_op =
| Read of Badop.path
* ('which, tr_version * Dialog.query read_op, Dialog.response read_op Badop.answer) Dialog.t
| Write of Badop.path * tr_version * 'which internal_write_op
| WriteList of tr_version * ('which, (Badop.path * Dialog.query internal_write_op) list, unit) Dialog.t
| Prepare of ('which, tr_version, bool) Dialog.t
| Commit of ('which, tr_version, bool) Dialog.t
| Abort of ('which, tr_version, unit) Dialog.t
| Fork of ('which, tr_version, 'transaction_channel) Dialog.t
type transaction_channel =
((Host.spoken,transaction_channel) poly_transaction_op, (Host.understood,transaction_channel) poly_transaction_op)
Hlnet.channel
type 'which transaction_op =
('which,transaction_channel) poly_transaction_op
type transaction = {
channel : transaction_channel;
version : tr_version;
mutable last : bool;
}
type 'which write_op = ('which,transaction,revision) Badop.generic_write_op
type 'which database_query =
| Transaction of ('which, unit, transaction_channel) Dialog.t
| Transaction_at of ('which, revision, transaction_channel) Dialog.t
| Status of ('which, unit, Badop.status) Dialog.t
(* Just maps on transactions *)
let map_transaction_op
: 'which 'transaction1 'transaction2.
('transaction1 -> 'transaction2) -> ('which,'transaction1) poly_transaction_op
-> ('which,'transaction2) poly_transaction_op
= fun f op ->
match op with
| Write (path, v, internal_write_op) -> Write (path, v, internal_write_op)
| WriteList (v, dialog) -> WriteList (v, dialog)
| Prepare dialog -> Prepare dialog
| Read (path, op) -> Read (path, op)
| Commit op -> Commit op
| Abort op -> Abort op
| Fork dialog ->
let dialog = nocps
(Dialog_aux.map_dialog ~query:(fun x k -> k x) ~response:(fun tr k -> k (f tr)) dialog)
in
Fork dialog
(* We need to expand this functions even if we use marshal internally, because
embedded transactions need to be processed through
[Hlnet.channel_(un)serialise]. Maybe a map on the operation type to bind
'transaction to string and back just for the transmission would be nicer. *)
let transaction_op_serialise
: 'which transaction_op -> string
= fun op ->
Marshal.to_string (map_transaction_op Hlnet.serialise_channel op : ('which,string) poly_transaction_op) []
let rec transaction_op_unserialise
: ('a,'b) Hlnet.channel -> 'which transaction_op Hlnet.stream_unserialise
= fun channel s offset ->
let unserialise_channel tr =
match Hlnet.unserialise_remote_channel transaction_channel_spec channel tr 0
with `data (x,_) -> x | _ -> raise Exit
in
try
Hlnet.Aux.map_unserialise (map_transaction_op unserialise_channel) Hlnet.Aux.magic_unserialise
s offset
with Exit -> `failure "Bad embedded transaction"
and transaction_channel_spec
: (Host.spoken transaction_op, Host.understood transaction_op) Hlnet.channel_spec
= {
Hlnet.
service = Hlnet.make_service_id ~name:"badop/trans" ~version:2;
out_serialise = transaction_op_serialise;
in_unserialise = transaction_op_unserialise;
}
let database_op_serialise = function
| Transaction (Dialog.Query ()) -> "\000"
| Transaction (Dialog.Response transaction) -> "\100" ^ Hlnet.serialise_channel transaction
| Transaction_at (Dialog.Query rev) -> "\001" ^ Marshal.to_string rev []
| Transaction_at (Dialog.Response transaction) -> "\101" ^ Hlnet.serialise_channel transaction
| Status (Dialog.Query ()) -> "\002"
| Status (Dialog.Response status) -> "\102" ^ Marshal.to_string status []
let database_op_unserialise channel s offset = match s.[offset] with
| '\000' -> `data (Transaction (Dialog_aux.make_unsafe_query ()), offset + 1)
| '\100' ->
Hlnet.Aux.map_unserialise (fun tr -> Transaction (Dialog_aux.make_unsafe_response tr))
(Hlnet.unserialise_remote_channel transaction_channel_spec channel)
s (offset+1)
| '\001' ->
Hlnet.Aux.map_unserialise
(fun (rev:revision) -> Transaction_at (Dialog_aux.make_unsafe_query rev)) Hlnet.Aux.magic_unserialise s (offset+1)
| '\101' ->
Hlnet.Aux.map_unserialise (fun tr -> Transaction_at (Dialog_aux.make_unsafe_response tr))
(Hlnet.unserialise_remote_channel transaction_channel_spec channel)
s (offset+1)
| '\002' -> `data (Status (Dialog_aux.make_unsafe_query ()), offset + 1)
| '\102' ->
Hlnet.Aux.map_unserialise (fun st -> Status (Dialog_aux.make_unsafe_response st))
(Hlnet.Aux.magic_unserialise)
s (offset+1)
| _ -> `failure "Bad database message"
type database = (Host.spoken database_query, Host.understood database_query) Hlnet.channel
let database_channel_spec
: (Host.spoken database_query, Host.understood database_query) Hlnet.channel_spec
= {
Hlnet.
service = Hlnet.make_service_id ~name:"badop/db" ~version:2;
out_serialise = database_op_serialise;
in_unserialise = database_op_unserialise;
}
end