Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

[fix] hlnet: fixed a huge overhead when a client was opening too many…

… one-use channels
  • Loading branch information...
commit 24ae1063b2d23329425a01dbc2413a7d8e04e47d 1 parent e4f239a
Louis Gesbert authored
Showing with 22 additions and 42 deletions.
  1. +22 −42 libnet/hlnet.ml
View
64 libnet/hlnet.ml
@@ -76,7 +76,7 @@ module rec Types : sig
(* don't look at this, it's a copy-paste of the definitions below (ocaml needs
an explicit sig for functors and here the module only contains type
definitions *)
- type ('out','in') channel_spec = { service: service_id; out_serialise: 'out' -> string; in_unserialise: ('out','in') channel -> 'in' stream_unserialise; } and ('out','in') channel = { id: channel_id; spec: ('out','in') channel_spec; connection: connection; mutable handler : ('in' -> ('out' -> unit) -> unit) option; mutable waiting_handler : ((exn -> unit) * ('in' -> unit)) IM.t; mutable pending: 'in' list IM.t ; on_disconnect: ((bool -> unit) -> unit) ref; mutable propagate_removal: bool ref option } and black_channel = (black,black) channel and connection = { remote: endpoint; mutable local: endpoint; scheduler: Scheduler.t; mutable info: Scheduler.connection_info option Cps.Lazy.t; channels: Wchannels.t; last_channels: black_channel Weak.t; mutable last_channels_ptr: int; mutable finalised: bool; }
+ type ('out','in') channel_spec = { service: service_id; out_serialise: 'out' -> string; in_unserialise: ('out','in') channel -> 'in' stream_unserialise; } and ('out','in') channel = { id: channel_id; spec: ('out','in') channel_spec; connection: connection; mutable handler : ('in' -> ('out' -> unit) -> unit) option; mutable waiting_handler : ((exn -> unit) * ('in' -> unit)) IM.t; mutable pending: 'in' list IM.t ; on_disconnect: ((bool -> unit) -> unit) ref; mutable propagate_removal: bool ref option } and black_channel = (black,black) channel and connection = { remote: endpoint; mutable local: endpoint; scheduler: Scheduler.t; mutable info: Scheduler.connection_info option Cps.Lazy.t; channels: Wchannels.t; mutable num_listeners: int; mutable finalised: bool; }
val _define_at_least_one_value_in_the_module_to_avoid_ocaml_bug_: unit
end = struct
type ('out','in') channel_spec = {
@@ -122,10 +122,8 @@ end = struct
channels: Wchannels.t;
(* each connection needs to know about all channels it hosts, to trigger
their [on_disconnect] handler in case of problem *)
- last_channels: black_channel Weak.t; mutable last_channels_ptr: int;
- (* Keep track of the last channels registered that are listening; this is
- only an optimisation for the is_listening function (avoids going
- through all reg. channels) *)
+ mutable num_listeners: int;
+ (* keep track of listeners, to detect when that list becomes empty *)
mutable finalised: bool; (* used to finalise only once *)
}
@@ -166,8 +164,8 @@ end = struct
end)
let dummy_connection = (* for lookup, we rewrite field [remote] and the others never get accessed *)
{ remote = Obj.magic(); local = Obj.magic(); scheduler = Obj.magic();
- info = Obj.magic(); channels = Obj.magic(); last_channels = Obj.magic();
- last_channels_ptr = 0; finalised = false; }
+ info = Obj.magic(); channels = Obj.magic(); num_listeners = 0;
+ finalised = false; }
let get_opt t remote = try Some (find t { dummy_connection with remote }) with Not_found -> None
end
@@ -816,7 +814,7 @@ end = struct
scheduler = sched;
info = Cps.Lazy.lazy_from_val (Some connection_info);
channels = Wchannels.create 17;
- last_channels = Weak.create 7; last_channels_ptr = 0;
+ num_listeners = 0;
finalised = false;
} in
gc_finalise sched disconnect connection;
@@ -888,7 +886,7 @@ end = struct
scheduler = sched;
info = Cps.Lazy.make (Scheduler.push sched) connect;
channels = Wchannels.create 17;
- last_channels = Weak.create 7; last_channels_ptr = 0;
+ num_listeners = 0;
finalised = false;
} in
update_local_ref := (fun local -> connection.local <- local);
@@ -899,33 +897,14 @@ end = struct
let find = Wconnections.get_opt table
- let push_last_channel ch =
- let conn = ch.connection in
- let last_chans = conn.last_channels in
- let ptr = conn.last_channels_ptr in
- Weak.set last_chans ptr (Some ch);
- conn.last_channels_ptr <- succ ptr mod Weak.length last_chans
-
let register_channel channel =
let channel = channel_to_black channel in
- Wchannels.add channel.connection.channels channel;
- if is_channel_listening channel then push_last_channel channel
+ Wchannels.add channel.connection.channels channel
let get_channel connection id =
Wchannels.get_opt connection.channels id
- let is_listening connection =
- try
- let last_channels = connection.last_channels in
- for i = 0 to Weak.length last_channels - 1 do
- Option.iter (fun ch -> if is_channel_listening ch then raise Exit) (Weak.get last_channels i)
- done;
- Wchannels.iter
- (fun ch ->
- if is_channel_listening ch then (push_last_channel ch; raise Exit))
- connection.channels;
- false
- with Exit -> true
+ let is_listening connection = connection.num_listeners > 0
let write connection message ~failure ~success =
Cps.Lazy.force connection.info
@@ -1100,7 +1079,9 @@ end = struct
let table = H.create 89
- let add chan = H.replace table chan.id (channel_to_black chan)
+ let add chan =
+ chan.connection.num_listeners <- chan.connection.num_listeners + 1;
+ H.replace table chan.id (channel_to_black chan)
(* Strong ref, CH is for channels with pending operations, we hold it in memory *)
let mem id = H.mem table id
@@ -1109,7 +1090,11 @@ end = struct
let remove id =
#<If$minlevel 20> debug "Channel %s unregistered" (channel_id_to_debug_string id) #<End>;
- H.remove table id
+ try
+ let ch = Hashtbl.find table id in
+ ch.connection.num_listeners <- ch.connection.num_listeners - 1;
+ H.remove table id
+ with Not_found -> ()
let to_string () =
H.fold
@@ -1261,12 +1246,7 @@ let first_message_treatment (channel:black_channel) reqid msg =
debug "-- Received a message, waiting handler for %s" (request_id_to_debug_string reqid)
#<End>;
channel.waiting_handler <- IM.remove reqid channel.waiting_handler;
- if Option.is_none channel.handler && IM.is_empty channel.waiting_handler && IM.is_empty channel.pending
- then
- (#<If$minlevel 25>
- debug "Remove channel %s (bec. no more handlers)" (channel_id_to_debug_string channel.id)
- #<End>;
- ChanH.remove channel.id);
+ if not (is_channel_listening channel) then ChanH.remove channel.id;
inh msg
| None ->
match channel.handler with
@@ -1510,10 +1490,9 @@ let receive_aux chan ?(request_id=RequestId.dummy_request_id) errcont inhandler
debug "-- Receive: adding a waiting handler for %s (%d)"
(request_id_to_debug_string request_id) (IM.size chan.waiting_handler)
#<End>;
- let cond = IM.is_empty chan.waiting_handler
- in
+ let should_register = not (is_channel_listening chan) in
chan.waiting_handler <- IM.add request_id (errcont,handl) chan.waiting_handler;
- if cond then ChanH.add chan;
+ if should_register then ChanH.add chan;
reading_loop chan.connection
| msg::y ->
(let newreqids =
@@ -1536,8 +1515,9 @@ let receive_aux chan ?(request_id=RequestId.dummy_request_id) errcont inhandler
let setup_respond chan inouthandler =
#<If> debug "Setting answering machine on channel %s" (channel_id_to_debug_string chan.id) #<End>;
+ let should_register = not (is_channel_listening chan) in
chan.handler <- Some inouthandler;
- ChanH.add chan;
+ if should_register then ChanH.add chan;
reading_loop chan.connection;
register_channel chan
Please sign in to comment.
Something went wrong with that request. Please try again.