Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

[fix] hlnet: fixed a huge overhead when a client was opening too many…

… one-use channels
  • Loading branch information...
commit 24ae1063b2d23329425a01dbc2413a7d8e04e47d 1 parent e4f239a
Louis Gesbert authored

Showing 1 changed file with 22 additions and 42 deletions. Show diff stats Hide diff stats

  1. +22 42 libnet/hlnet.ml
64 libnet/hlnet.ml
@@ -76,7 +76,7 @@ module rec Types : sig
76 76 (* don't look at this, it's a copy-paste of the definitions below (ocaml needs
77 77 an explicit sig for functors and here the module only contains type
78 78 definitions *)
79   - type ('out','in') channel_spec = { service: service_id; out_serialise: 'out' -> string; in_unserialise: ('out','in') channel -> 'in' stream_unserialise; } and ('out','in') channel = { id: channel_id; spec: ('out','in') channel_spec; connection: connection; mutable handler : ('in' -> ('out' -> unit) -> unit) option; mutable waiting_handler : ((exn -> unit) * ('in' -> unit)) IM.t; mutable pending: 'in' list IM.t ; on_disconnect: ((bool -> unit) -> unit) ref; mutable propagate_removal: bool ref option } and black_channel = (black,black) channel and connection = { remote: endpoint; mutable local: endpoint; scheduler: Scheduler.t; mutable info: Scheduler.connection_info option Cps.Lazy.t; channels: Wchannels.t; last_channels: black_channel Weak.t; mutable last_channels_ptr: int; mutable finalised: bool; }
  79 + type ('out','in') channel_spec = { service: service_id; out_serialise: 'out' -> string; in_unserialise: ('out','in') channel -> 'in' stream_unserialise; } and ('out','in') channel = { id: channel_id; spec: ('out','in') channel_spec; connection: connection; mutable handler : ('in' -> ('out' -> unit) -> unit) option; mutable waiting_handler : ((exn -> unit) * ('in' -> unit)) IM.t; mutable pending: 'in' list IM.t ; on_disconnect: ((bool -> unit) -> unit) ref; mutable propagate_removal: bool ref option } and black_channel = (black,black) channel and connection = { remote: endpoint; mutable local: endpoint; scheduler: Scheduler.t; mutable info: Scheduler.connection_info option Cps.Lazy.t; channels: Wchannels.t; mutable num_listeners: int; mutable finalised: bool; }
80 80 val _define_at_least_one_value_in_the_module_to_avoid_ocaml_bug_: unit
81 81 end = struct
82 82 type ('out','in') channel_spec = {
@@ -122,10 +122,8 @@ end = struct
122 122 channels: Wchannels.t;
123 123 (* each connection needs to know about all channels it hosts, to trigger
124 124 their [on_disconnect] handler in case of problem *)
125   - last_channels: black_channel Weak.t; mutable last_channels_ptr: int;
126   - (* Keep track of the last channels registered that are listening; this is
127   - only an optimisation for the is_listening function (avoids going
128   - through all reg. channels) *)
  125 + mutable num_listeners: int;
  126 + (* keep track of listeners, to detect when that list becomes empty *)
129 127 mutable finalised: bool; (* used to finalise only once *)
130 128 }
131 129
@@ -166,8 +164,8 @@ end = struct
166 164 end)
167 165 let dummy_connection = (* for lookup, we rewrite field [remote] and the others never get accessed *)
168 166 { remote = Obj.magic(); local = Obj.magic(); scheduler = Obj.magic();
169   - info = Obj.magic(); channels = Obj.magic(); last_channels = Obj.magic();
170   - last_channels_ptr = 0; finalised = false; }
  167 + info = Obj.magic(); channels = Obj.magic(); num_listeners = 0;
  168 + finalised = false; }
171 169 let get_opt t remote = try Some (find t { dummy_connection with remote }) with Not_found -> None
172 170 end
173 171
@@ -816,7 +814,7 @@ end = struct
816 814 scheduler = sched;
817 815 info = Cps.Lazy.lazy_from_val (Some connection_info);
818 816 channels = Wchannels.create 17;
819   - last_channels = Weak.create 7; last_channels_ptr = 0;
  817 + num_listeners = 0;
820 818 finalised = false;
821 819 } in
822 820 gc_finalise sched disconnect connection;
@@ -888,7 +886,7 @@ end = struct
888 886 scheduler = sched;
889 887 info = Cps.Lazy.make (Scheduler.push sched) connect;
890 888 channels = Wchannels.create 17;
891   - last_channels = Weak.create 7; last_channels_ptr = 0;
  889 + num_listeners = 0;
892 890 finalised = false;
893 891 } in
894 892 update_local_ref := (fun local -> connection.local <- local);
@@ -899,33 +897,14 @@ end = struct
899 897
900 898 let find = Wconnections.get_opt table
901 899
902   - let push_last_channel ch =
903   - let conn = ch.connection in
904   - let last_chans = conn.last_channels in
905   - let ptr = conn.last_channels_ptr in
906   - Weak.set last_chans ptr (Some ch);
907   - conn.last_channels_ptr <- succ ptr mod Weak.length last_chans
908   -
909 900 let register_channel channel =
910 901 let channel = channel_to_black channel in
911   - Wchannels.add channel.connection.channels channel;
912   - if is_channel_listening channel then push_last_channel channel
  902 + Wchannels.add channel.connection.channels channel
913 903
914 904 let get_channel connection id =
915 905 Wchannels.get_opt connection.channels id
916 906
917   - let is_listening connection =
918   - try
919   - let last_channels = connection.last_channels in
920   - for i = 0 to Weak.length last_channels - 1 do
921   - Option.iter (fun ch -> if is_channel_listening ch then raise Exit) (Weak.get last_channels i)
922   - done;
923   - Wchannels.iter
924   - (fun ch ->
925   - if is_channel_listening ch then (push_last_channel ch; raise Exit))
926   - connection.channels;
927   - false
928   - with Exit -> true
  907 + let is_listening connection = connection.num_listeners > 0
929 908
930 909 let write connection message ~failure ~success =
931 910 Cps.Lazy.force connection.info
@@ -1100,7 +1079,9 @@ end = struct
1100 1079
1101 1080 let table = H.create 89
1102 1081
1103   - let add chan = H.replace table chan.id (channel_to_black chan)
  1082 + let add chan =
  1083 + chan.connection.num_listeners <- chan.connection.num_listeners + 1;
  1084 + H.replace table chan.id (channel_to_black chan)
1104 1085 (* Strong ref, CH is for channels with pending operations, we hold it in memory *)
1105 1086
1106 1087 let mem id = H.mem table id
@@ -1109,7 +1090,11 @@ end = struct
1109 1090
1110 1091 let remove id =
1111 1092 #<If$minlevel 20> debug "Channel %s unregistered" (channel_id_to_debug_string id) #<End>;
1112   - H.remove table id
  1093 + try
  1094 + let ch = Hashtbl.find table id in
  1095 + ch.connection.num_listeners <- ch.connection.num_listeners - 1;
  1096 + H.remove table id
  1097 + with Not_found -> ()
1113 1098
1114 1099 let to_string () =
1115 1100 H.fold
@@ -1261,12 +1246,7 @@ let first_message_treatment (channel:black_channel) reqid msg =
1261 1246 debug "-- Received a message, waiting handler for %s" (request_id_to_debug_string reqid)
1262 1247 #<End>;
1263 1248 channel.waiting_handler <- IM.remove reqid channel.waiting_handler;
1264   - if Option.is_none channel.handler && IM.is_empty channel.waiting_handler && IM.is_empty channel.pending
1265   - then
1266   - (#<If$minlevel 25>
1267   - debug "Remove channel %s (bec. no more handlers)" (channel_id_to_debug_string channel.id)
1268   - #<End>;
1269   - ChanH.remove channel.id);
  1249 + if not (is_channel_listening channel) then ChanH.remove channel.id;
1270 1250 inh msg
1271 1251 | None ->
1272 1252 match channel.handler with
@@ -1510,10 +1490,9 @@ let receive_aux chan ?(request_id=RequestId.dummy_request_id) errcont inhandler
1510 1490 debug "-- Receive: adding a waiting handler for %s (%d)"
1511 1491 (request_id_to_debug_string request_id) (IM.size chan.waiting_handler)
1512 1492 #<End>;
1513   - let cond = IM.is_empty chan.waiting_handler
1514   - in
  1493 + let should_register = not (is_channel_listening chan) in
1515 1494 chan.waiting_handler <- IM.add request_id (errcont,handl) chan.waiting_handler;
1516   - if cond then ChanH.add chan;
  1495 + if should_register then ChanH.add chan;
1517 1496 reading_loop chan.connection
1518 1497 | msg::y ->
1519 1498 (let newreqids =
@@ -1536,8 +1515,9 @@ let receive_aux chan ?(request_id=RequestId.dummy_request_id) errcont inhandler
1536 1515
1537 1516 let setup_respond chan inouthandler =
1538 1517 #<If> debug "Setting answering machine on channel %s" (channel_id_to_debug_string chan.id) #<End>;
  1518 + let should_register = not (is_channel_listening chan) in
1539 1519 chan.handler <- Some inouthandler;
1540   - ChanH.add chan;
  1520 + if should_register then ChanH.add chan;
1541 1521 reading_loop chan.connection;
1542 1522 register_channel chan
1543 1523

0 comments on commit 24ae106

Please sign in to comment.
Something went wrong with that request. Please try again.