Permalink
Browse files

[net] port Channel over to cstruct/Io_page, with some minor interface…

… changes

* write function never block as the interface requires explicit flushing
* read_line returns a list of buffers, as it may straddle page boundaries
* rename read_crlf to read_line
* rename write_view to write_buffer for consistency with cstruct
  • Loading branch information...
1 parent b534240 commit 262c582606744df6517f402c1a1fabd00759a040 @avsm committed Jun 6, 2012
Showing with 134 additions and 98 deletions.
  1. +115 −78 lib/net/direct/channel.ml
  2. +8 −9 lib/net/direct/channel.mli
  3. +1 −1 lib/net/direct/net.smlpack
  4. +5 −5 lib/net/direct/nettypes.ml
  5. +5 −5 lib/net/direct/nettypes.mli
@@ -31,53 +31,58 @@ module Make(Flow:FLOW) :
type t = {
flow: flow;
- mutable ibuf: Bitstring.t;
- mutable obuf: Bitstring.t list;
+ mutable ibuf: OS.Io_page.t option; (* Queue of incoming buf *)
+ mutable obufq: OS.Io_page.t list; (* Queue of completed writebuf *)
+ mutable obuf: OS.Io_page.t option; (* Active write buffer *)
+ mutable opos: int; (* Position in active write buffer *)
abort_t: unit Lwt.t;
abort_u: unit Lwt.u;
}
exception Closed
let create flow =
- let ibuf = "",0,0 in
- let obuf = [] in
+ let ibuf = None in
+ let obufq = [] in
+ let obuf = None in
+ let opos = 0 in
let abort_t, abort_u = Lwt.task () in
- { ibuf; obuf; flow; abort_t; abort_u }
+ { ibuf; obuf; flow; obufq; opos; abort_t; abort_u }
let ibuf_refill t =
match_lwt Flow.read t.flow with
|Some buf ->
- t.ibuf <- buf;
+ t.ibuf <- Some buf;
return ()
|None ->
fail Closed
+ let rec get_ibuf t =
+ match t.ibuf with
+ |None -> ibuf_refill t >> get_ibuf t
+ |Some buf when Cstruct.len buf = 0 -> ibuf_refill t >> get_ibuf t
+ |Some buf -> return buf
+
(* Read one character from the input channel *)
- let rec read_char t =
- bitmatch t.ibuf with
- | { c:8; rest:-1:bitstring } ->
- t.ibuf <- rest;
- return (Char.chr c)
- | { rest:-1:bitstring } when Bitstring.bitstring_length rest = 0 ->
- ibuf_refill t >>
- read_char t
+ let read_char t =
+ lwt buf = get_ibuf t in
+ let c = Cstruct.get_char buf 0 in
+ t.ibuf <- Some (Cstruct.shift buf 1);
+ return c
(* Read up to len characters from the input channel
and at most a full view. If not specified, read all *)
let read_some ?len t =
- lwt () = if Bitstring.bitstring_length t.ibuf = 0 then
- ibuf_refill t else return () in
- let avail = Bitstring.bitstring_length t.ibuf in
- let len = match len with |Some len -> len * 8 |None -> avail in
+ lwt buf = get_ibuf t in
+ let avail = Cstruct.len buf in
+ let len = match len with |Some len -> len |None -> avail in
if len < avail then begin
- let r = Bitstring.subbitstring t.ibuf 0 len in
- t.ibuf <- Bitstring.subbitstring t.ibuf len (avail-len);
- return r
+ let hd,tl = Cstruct.split buf len in
+ t.ibuf <- Some tl;
+ return hd
end else begin
- let r = t.ibuf in
- t.ibuf <- "",0,0;
- return r
+ t.ibuf <- None;
+ return buf
end
(* Read up to len characters from the input channel as a
@@ -93,75 +98,107 @@ module Make(Flow:FLOW) :
(* Read until a character is found *)
let read_until t ch =
- lwt () = if Bitstring.bitstring_length t.ibuf = 0 then
- ibuf_refill t else return () in
- try_lwt
- let buf,off,len = t.ibuf in
- let idx = (String.index_between buf (off/8) ((off+len)/8) ch) * 8 in
- let rlen = idx - off in
- (bitmatch t.ibuf with
- | { _:8; rest:-1:bitstring } when rlen = 0 ->
- t.ibuf <- rest;
- return (true, Bitstring.empty_bitstring)
- | { r:rlen:bitstring; _:8; rest:-1:bitstring } ->
- t.ibuf <- rest;
- return (true, r)
- | { _ } ->
- printf "Flow: unexpected bitmatch failure in read_until\n%!";
- exit 1
- )
- with Not_found -> begin
- let r = t.ibuf in
- t.ibuf <- "",0,0;
- return (false,r)
- end
+ lwt buf = get_ibuf t in
+ let len = Cstruct.len buf in
+ let rec scan off =
+ if off = len then None else begin
+ if Cstruct.get_char buf off = ch then
+ Some off else scan (off+1)
+ end
+ in
+ match scan 0 with
+ |None -> (* not found, return what we have until EOF *)
+ t.ibuf <- None;
+ return (false, buf)
+ |Some off -> (* found, so split the buffer *)
+ let hd = Cstruct.sub_buffer buf 0 off in
+ t.ibuf <- Some (Cstruct.shift buf (off+1));
+ return (true, hd)
(* This reads a line of input, which is terminated either by a CRLF
sequence, or the end of the channel (which counts as a line).
@return Returns a stream of views that terminates at EOF.
@raise Closed to signify EOF *)
- let read_crlf t =
+ let read_line t =
let rec get acc =
match_lwt read_until t '\n' with
|(false, v) ->
get (v :: acc)
|(true, v) -> begin
(* chop the CR if present *)
- let vlen = Bitstring.bitstring_length v in
- let v = bitmatch v with
- | { rest:vlen-8:bitstring; 13:8 } when vlen >= 8 -> rest
- | { _ } -> v in
+ let vlen = Cstruct.len v in
+ let v =
+ if vlen > 0 && (Cstruct.get_char v (vlen-1) = '\r') then
+ Cstruct.sub v 0 (vlen-1) else v
+ in
return (v :: acc)
end
in
- lwt res = get [] >|= List.rev in
- return (Bitstring.concat res)
+ get [] >|= List.rev
(* Output functions *)
let rec flush t =
- let l = List.rev t.obuf in
- lwt res = Flow.writev t.flow l in
- t.obuf <- [res];
- if Bitstring.bitstring_length res > 0 then
- flush t
- else
- return ()
-
- (* Stonkingly inefficient *)
+ let l = List.rev t.obufq in
+ t.obufq <- [];
+ Flow.writev t.flow l
+
+ let alloc_obuf t =
+ let buf = OS.Io_page.get () in
+ t.obuf <- Some buf;
+ t.opos <- 0;
+ buf
+
+ (* Queue the active write buffer onto the write queue, resizing the
+ * view if necessary to the correct size. *)
+ let queue_obuf t =
+ match t.obuf with
+ |None -> ()
+ |Some buf when Cstruct.len buf = t.opos -> (* obuf is full *)
+ t.obufq <- buf :: t.obufq;
+ t.obuf <- None
+ |Some buf when t.opos = 0 -> (* obuf wasnt ever used, so discard *)
+ t.obuf <- None
+ |Some buf -> (* partially filled obuf, so resize *)
+ let buf = Cstruct.sub buf 0 t.opos in
+ t.obufq <- buf :: t.obufq;
+ t.obuf <- None
+
+ (* Get an active output buffer, which will allocate it if needed.
+ * The position to write into is stored in t.opos *)
+ let get_obuf t =
+ match t.obuf with
+ |None -> alloc_obuf t
+ |Some buf when Cstruct.len buf = t.opos -> queue_obuf t; alloc_obuf t
+ |Some buf -> buf
+
+ (* Non-blocking character write, since Io page allocation never blocks.
+ * That may change in the future... *)
let write_char t ch =
- t.obuf <- ((String.make 1 ch),0,8) :: t.obuf;
- return ()
-
- let write_bitstring t buf =
- t.obuf <- buf :: t.obuf;
- return ()
-
- let write_string t buf =
- write_bitstring t (Bitstring.bitstring_of_string buf)
+ let buf = get_obuf t in
+ Cstruct.set_char buf t.opos ch;
+ t.opos <- t.opos + 1
+
+ (* This is zero copy; flush current IO page and queue up the incoming
+ * buffer directly. *)
+ let write_buffer t buf =
+ queue_obuf t;
+ t.obufq <- buf :: t.obufq
+
+ let rec write_string t s off len =
+ let buf = get_obuf t in
+ let avail = Cstruct.len buf - t.opos in
+ if avail < len then begin
+ Cstruct.set_buffer s off buf t.opos avail;
+ t.opos <- t.opos + avail;
+ write_string t s (off+avail) (len-avail)
+ end else begin
+ Cstruct.set_buffer s off buf t.opos len;
+ t.opos <- t.opos + len
+ end
let write_line t buf =
- write_string t buf >>
+ write_string t buf 0 (String.length buf);
write_char t '\n'
let close t =
@@ -199,9 +236,9 @@ let read_stream ?len = function
| TCPv4 t -> TCPv4.read_stream ?len t
| Shmem t -> Shmem.read_stream ?len t
-let read_crlf = function
- | TCPv4 t -> TCPv4.read_crlf t
- | Shmem t -> Shmem.read_crlf t
+let read_line = function
+ | TCPv4 t -> TCPv4.read_line t
+ | Shmem t -> Shmem.read_line t
let write_char = function
| TCPv4 t -> TCPv4.write_char t
@@ -211,9 +248,9 @@ let write_string = function
| TCPv4 t -> TCPv4.write_string t
| Shmem t -> Shmem.write_string t
-let write_bitstring = function
- | TCPv4 t -> TCPv4.write_bitstring t
- | Shmem t -> Shmem.write_bitstring t
+let write_buffer = function
+ | TCPv4 t -> TCPv4.write_buffer t
+ | Shmem t -> Shmem.write_buffer t
let write_line = function
| TCPv4 t -> TCPv4.write_line t
@@ -29,15 +29,15 @@ module Shmem : CHANNEL with
type t
val read_char: t -> char Lwt.t
-val read_some: ?len:int -> t -> Bitstring.t Lwt.t
-val read_until: t -> char -> (bool * Bitstring.t) Lwt.t
-val read_stream: ?len:int -> t -> Bitstring.t Lwt_stream.t
-val read_crlf: t -> Bitstring.t Lwt.t
+val read_some: ?len:int -> t -> OS.Io_page.t Lwt.t
+val read_until: t -> char -> (bool * OS.Io_page.t) Lwt.t
+val read_stream: ?len:int -> t -> OS.Io_page.t Lwt_stream.t
+val read_line: t -> OS.Io_page.t list Lwt.t
-val write_char : t -> char -> unit Lwt.t
-val write_string : t -> string -> unit Lwt.t
-val write_bitstring : t -> Bitstring.t -> unit Lwt.t
-val write_line : t -> string -> unit Lwt.t
+val write_char : t -> char -> unit
+val write_string : t -> string -> int -> int -> unit
+val write_buffer : t -> OS.Io_page.t -> unit
+val write_line : t -> string -> unit
val flush : t -> unit Lwt.t
val close : t -> unit Lwt.t
@@ -53,4 +53,3 @@ val listen :
| `Shmem of peer_uid * (peer_uid -> t -> unit Lwt.t)
| `TCPv4 of ipv4_src * (ipv4_dst -> t -> unit Lwt.t)
] -> unit Lwt.t
-
@@ -11,4 +11,4 @@ Config
Manager
Flow
Datagram
-#Channel
+Channel
@@ -145,12 +145,12 @@ module type CHANNEL = sig
val read_until: t -> char -> (bool * OS.Io_page.t) Lwt.t
val read_some: ?len:int -> t -> OS.Io_page.t Lwt.t
val read_stream: ?len:int -> t -> OS.Io_page.t Lwt_stream.t
- val read_crlf: t -> OS.Io_page.t Lwt.t
+ val read_line: t -> OS.Io_page.t list Lwt.t
- val write_char : t -> char -> unit Lwt.t
- val write_string : t -> string -> unit Lwt.t
- val write_bitstring : t -> OS.Io_page.t -> unit Lwt.t
- val write_line : t -> string -> unit Lwt.t
+ val write_char : t -> char -> unit
+ val write_string : t -> string -> int -> int -> unit
+ val write_buffer : t -> OS.Io_page.t -> unit
+ val write_line : t -> string -> unit
val flush : t -> unit Lwt.t
val close : t -> unit Lwt.t
@@ -87,12 +87,12 @@ module type CHANNEL = sig
val read_until: t -> char -> (bool * OS.Io_page.t) Lwt.t
val read_some: ?len:int -> t -> OS.Io_page.t Lwt.t
val read_stream: ?len: int -> t -> OS.Io_page.t Lwt_stream.t
- val read_crlf: t -> OS.Io_page.t Lwt.t
+ val read_line: t -> OS.Io_page.t list Lwt.t
- val write_char : t -> char -> unit Lwt.t
- val write_string : t -> string -> unit Lwt.t
- val write_bitstring : t -> OS.Io_page.t -> unit Lwt.t
- val write_line : t -> string -> unit Lwt.t
+ val write_char : t -> char -> unit
+ val write_string : t -> string -> int -> int -> unit
+ val write_buffer : t -> OS.Io_page.t -> unit
+ val write_line : t -> string -> unit
val flush : t -> unit Lwt.t
val close : t -> unit Lwt.t

0 comments on commit 262c582

Please sign in to comment.