-
Notifications
You must be signed in to change notification settings - Fork 16
/
tcp_messaging.ml.old
248 lines (214 loc) · 7.61 KB
/
tcp_messaging.ml.old
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
open Message
open Messaging
open Lwt
open Log_extra
open Lwtq
let __open_connection socket_address =
(* Lwt_io.open_connection socket_address *)
let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Lwt.catch
(fun () ->
Lwt_unix.connect socket socket_address >>= fun () ->
let fd_field = Obj.field (Obj.repr socket) 0 in
let (fdi:int) = Obj.magic (fd_field) in
Lwt_log.info_f "__open_connection SUCCEEDED (fd=%i)" fdi >>= fun () ->
let oc = Lwt_io.of_fd ~mode:Lwt_io.output socket in
let ic = Lwt_io.of_fd ~mode:Lwt_io.input socket in
Lwt.return (ic,oc))
(fun exn -> Lwt_log.info ~exn "__open_connection failed" >>= fun () ->
Lwt_unix.close socket;
Lwt.fail exn)
module LockedHT = struct
type ('a, 'b) t = { mutable entries: ('a * 'b) list;
lock : Lwt_mutex.t}
let create (n:int) = { entries = [] ; lock = Lwt_mutex.create()}
let _remove t a = t.entries <- List.remove_assoc a t.entries
let _mem t a = List.mem_assoc a t.entries
let _set t a b =
if _mem t a then _remove t a;
t.entries <- (a,b)::t.entries
let _length t = List.length t.entries
let _find t a = List.assoc a t.entries
let mem t a = Lwt_mutex.with_lock t.lock (fun () -> Lwt.return (_mem t a))
let set t a b = Lwt_mutex.with_lock t.lock (fun () -> let () = _set t a b in Lwt.return ())
let find t a = Lwt_mutex.with_lock t.lock (fun () -> let r = _find t a in Lwt.return r)
let remove t a = Lwt_mutex.with_lock t.lock (fun () -> let () = _remove t a in Lwt.return ())
let length t = Lwt_mutex.with_lock t.lock (fun () -> let r = _length t in Lwt.return r)
let remove_if_present t a = Lwt_mutex.with_lock t.lock
(fun () ->
if _mem t a then _remove t a;
Lwt.return ())
let with_lock t f = Lwt_mutex.with_lock t.lock (fun () -> f t)
end
type connection = Lwt_io.input_channel * Lwt_io.output_channel
type mq = (Message.t * id) LWTQ.t
class tcp_messaging my_address =
let never () = false in
object(self : # messaging )
val _id2address = Hashtbl.create 10
val _connections = LockedHT.create 10
val _connections_lock = Lwt_mutex.create ()
val _qs = Hashtbl.create 10
val _outgoing = LWTQ.create ()
method register_receivers mapping =
List.iter
(fun (id,address) -> Hashtbl.add _id2address id address) mapping
method private _get_target_address ~target =
Hashtbl.find _id2address target
method send_message m ~source ~target =
LWTQ.add (source, target, m) _outgoing
method private _get_q ~(target:id) =
try Hashtbl.find _qs target
with Not_found ->
begin
let tq = LWTQ.create () in
Hashtbl.add _qs target tq;
tq
end
method recv_message ~target =
let q = self # _get_q ~target in LWTQ.take q
method peek_message ~target =
let q = self # _get_q ~target in LWTQ.peek q
method private _establish_connection address =
let host_ip, port = address in
let socket_address = Network.make_address host_ip port in
Lwt_log.debug_f "establishing connection to (%s,%i)" host_ip port
>>= fun () ->
(*Backoff.backoff (fun () -> *) __open_connection socket_address (* ) *)
>>= fun (ic,oc) ->
Lwt_log.debug_f "got connection to (%s,%i)" host_ip port >>= fun () ->
let my_ip, my_port = my_address in
Llio.output_string oc my_ip >>= fun () ->
Llio.output_int oc my_port >>= fun () ->
Lwt.return (ic,oc)
(* open_connection can also fail with Unix.Unix_error (63, "connect",_)
on local host *)
method private _get_connection address =
LockedHT.with_lock _connections
(fun t ->
if LockedHT._mem t address
then
Lwt.return (LockedHT._find t address )
else
begin
self # _establish_connection address >>= fun conn ->
LockedHT._set t address conn;
Lwt_log.debug_f "connection added (%i in total)" (LockedHT._length t) >>= fun () ->
Lwt.return conn
end
)
method private _drop_connection address =
Lwt_log.debug "_drop_connection" >>= fun () ->
LockedHT.with_lock _connections
(fun t ->
if LockedHT._mem t address then
begin
let conn = LockedHT._find t address in
Lwt_log.debug "found connection, closing it" >>= fun () ->
let ic,oc = conn in
(* something with conn *)
Lwt.catch
(fun () ->
Lwt_io.close ic >>= fun () ->
Lwt_io.close oc >>= fun () ->
Lwt_log.debug "closed connection"
)
(fun exn -> Lwt_log.warning ~exn "exception while closing, too little too late" )
>>= fun () ->
let () = LockedHT._remove t address in
Lwt.return ()
end
else
begin
let h,p = address in
Lwt_log.debug_f "connection to (%s,%i) not found. we never had one..." h p
end
)
method private _pickle source target msg =
let buffer = Buffer.create 40 in
let () = Llio.string_to buffer source in
let () = Llio.string_to buffer target in
let () = Message.to_buffer msg buffer in
Buffer.contents buffer
method private _maybe_insert_connection address =
let host,port = address in
LockedHT.mem _connections address >>= function
| true -> Lwt_log.debug_f "XXX already have connection with (%S,%i)" host port
| false -> Lwt_log.debug_f "XXX first connection with (%S,%i)" host port
method run ?(stop=never) () =
let conditionally f =
let b = stop () in
if b then
begin
Lwt_log.info "ending loop" >>= fun () ->
Lwt.return ()
end
else f ()
in
let protocol (ic,oc) =
Llio.input_string ic >>= fun ip ->
Llio.input_int ic >>= fun port ->
self # _maybe_insert_connection (ip,port) >>= fun () ->
begin
let rec loop () =
begin
Llio.input_int ic >>= fun msg_size ->
let buffer = String.create msg_size in
Lwt_io.read_into_exactly ic buffer 0 msg_size >>= fun () ->
let (source:id), pos1 = Llio.string_from buffer 0 in
let target, pos2 = Llio.string_from buffer pos1 in
let msg, _ = Message.from_buffer buffer pos2 in
let q = self # _get_q target in
LWTQ.add (msg, source) q >>= fun () ->
conditionally loop
end
in
catch
(fun () -> loop ())
(fun exn ->
Lwt_log.info ~exn "going to drop outgoing connection as well" >>= fun () ->
let address = (ip,port) in
self # _drop_connection address >>= fun () ->
Lwt.fail exn)
end
in
let rec sender_loop () =
LWTQ.take _outgoing >>= fun (source, target, msg) ->
(* Lwt_log.debug_f "sender_loop got %S" (Message.string_of msg) >>= fun () -> *)
let address = self # _get_target_address ~target in
let try_send () =
self # _get_connection address >>= fun connection ->
let ic,oc = connection in
let pickled = self # _pickle source target msg in
Llio.output_string oc pickled >>= fun () ->
Lwt_io.flush oc
in
Lwt.catch
(fun () -> try_send ())
(function
| Unix.Unix_error(Unix.EPIPE,_,_) -> (* stale connection *)
begin
Lwt_log.debug_f "stale connection" >>= fun () ->
self # _drop_connection address >>= fun () ->
Lwt.catch
(fun () -> try_send ())
(fun exn -> Lwt_log.info_f ~exn "dropped message")
end
| exn ->
begin
Lwt_log.info_f ~exn
"dropping message %s with destination '%s' because of"
(Message.string_of msg) target >>= fun () ->
self # _drop_connection address >>= fun () ->
Lwt_log.debug "end of connection epilogue"
end
)
>>= fun () -> conditionally sender_loop
in
let _, my_port = my_address in
let server_t = Server.make_server_thread my_port protocol
in
Lwt.pick [server_t ();sender_loop ();] >>= fun () ->
Lwt_log.info "end of tcp_messaging" >>= fun () ->
Lwt.return ()
end