Skip to content
Newer
Older
100644 120 lines (97 sloc) 3.44 KB
fccc685 Initial open-source release
MLstate authored
1 (*
2 Copyright © 2011 MLstate
3
4 This file is part of OPA.
5
6 OPA is free software: you can redistribute it and/or modify it under the
7 terms of the GNU Affero General Public License, version 3, as published by
8 the Free Software Foundation.
9
10 OPA is distributed in the hope that it will be useful, but WITHOUT ANY
11 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
13 more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with OPA. If not, see <http://www.gnu.org/licenses/>.
17 *)
18 (*
19 @author Adam Koprowski
20 **)
21
22
23 #<Debugvar:LIBNET_CLUSTER>
24
25 let (|>) = InfixOperator.(|>)
26
27 module Sched = Scheduler
28 module Ux = Unix
29 module NA = NetAddr
30
31 type connection = Sched.connection_info
32
33 type ('out', 'in') t =
34 { cluster : Cluster.t
35 ; sched : Scheduler.t
36 ; conn : connection
37 }
38
39 type addr = Unix.sockaddr
40
41 let get_inet_addr = function
42 | Ux.ADDR_UNIX _ -> failwith "[Cluster] cannot use local Unix file descriptors to initialize a cluster"
43 | Ux.ADDR_INET (addr, _port) -> addr
44
45 let get_port = function
46 | Ux.ADDR_UNIX _ -> failwith "[Cluster] cannot use local Unix file descriptors to initialize a cluster"
47 | Ux.ADDR_INET (_addr, port) -> port
48
49 let init_from ~protocol sched cluster =
50 let connection =
51 let make_sched_connection fd addr =
52 let addr = NA.mk_udp ~protocol ~fd ~addr in
53 Scheduler.make_connection sched addr
54 in
55 let me =
56 try
57 Cluster.me cluster |> Cluster.get_addr cluster
58 with
59 Cluster.MeUnknown-> Ux.ADDR_INET (Unix.inet_addr_any, 0)
60 in
61 let listen_addr = Ux.ADDR_INET (Unix.inet_addr_any, get_port me) in
62 let socket = Connection.listen ~socket_type:Connection.UDP listen_addr in
63 make_sched_connection socket (get_inet_addr me)
64 in
65 let dc =
66 { cluster = cluster
67 ; conn = connection
68 ; sched = sched
69 }
70 in
71 dc
72
73 let init ~protocol sched ?me others =
74 let cluster = Cluster.init ?me others in
75 init_from ~protocol sched cluster
76
77 let register_msg_handler dc msg_handler =
78 let rec read_one () = Scheduler.read_from dc.sched dc.conn callback
79 and callback (_, addr, msg_str) =
80 let msg = Marshal.from_string msg_str 0 in
81 (* let nodeId = Cluster.get_id cluster addr in*)
82 read_one ();
83 #<If> Logger.debug "%s processing response from %s\n%!" (Cluster.label dc.cluster) (NA.string_of_sockaddr addr) #<End>;
84 msg_handler dc addr msg
85 in
86 read_one ()
87
88 let get_cluster dc =
89 dc.cluster
90
91 let close dc =
92 Scheduler.remove_connection dc.sched dc.conn
93
94 let send_to_aux dc remote_addr v k =
95 let msg = Marshal.to_string v [] in
96 Scheduler.write_to dc.sched dc.conn remote_addr msg (fun _ -> k ())
97
98 let send_to dc remote_addr v k =
99 #<If> Logger.debug "%s Sending msg to %s\n%!" (Cluster.label dc.cluster) (NA.string_of_sockaddr remote_addr) #<End>;
100 send_to_aux dc remote_addr v k
101
102 let send dc id v k =
103 let remote_addr = Cluster.get_addr dc.cluster id in
104 #<If> Logger.debug "%s Sending msg to %s\n%!" (Cluster.label dc.cluster) (Cluster.node_id_to_string dc.cluster id) #<End>;
105 send_to_aux dc remote_addr v k
106
107 let broadcast ?(including_myself = true) dc v k =
108 let rec send_all = function
109 | [] -> ()
110 | id::ids ->
111 let k =
112 match ids with
113 | [] -> k
114 | _ -> fun _ -> ()
115 in
116 send dc id v k;
117 send_all ids
118 in
119 send_all (Cluster.all_server_ids ~including_myself dc.cluster)
Something went wrong with that request. Please try again.