Permalink
Browse files

[unix] make the unix-socket backend work again

  • Loading branch information...
avsm committed Jun 9, 2012
1 parent ea28ab5 commit 3d886b5895cb8c597cb76d27cea0de062526be0f
View
@@ -93,7 +93,6 @@ function assemble_bin {
assemble_syntax
assemble_xen
assemble_unix "direct"
-#assemble_unix "socket"
-#assemble_node
+assemble_unix "socket"
assemble_scripts
assemble_bin
View
@@ -15,19 +15,13 @@ ifeq ($(OS) $(ARCH),linux x86_64)
XEN_BUILD=xen
endif
-ifeq ($(NODE),)
-NODE_BUILD=
-else
-NODE_BUILD=
-endif
-
-UNIX_BUILD= unix-direct
+UNIX_BUILD= unix-direct unix-socket
-all: $(XEN_BUILD) $(NODE_BUILD) $(UNIX_BUILD)
+all: $(XEN_BUILD) $(UNIX_BUILD)
@ :
doc:
- for spec in $(XEN_BUILD) $(NODE_BUILD) $(UNIX_BUILD); do \
+ for spec in $(XEN_BUILD) $(UNIX_BUILD); do \
SPEC=$$spec ocamlbuild $(OFLOAGS) $(JOBS) doc.otarget; \
done
View
@@ -31,51 +31,58 @@ module Make(Flow:FLOW) :
type t = {
flow: flow;
- mutable ibuf: Bitstring.t;
- mutable obuf: Bitstring.t list;
+ mutable ibuf: OS.Io_page.t option; (* Queue of incoming buf *)
+ mutable obufq: OS.Io_page.t list; (* Queue of completed writebuf *)
+ mutable obuf: OS.Io_page.t option; (* Active write buffer *)
+ mutable opos: int; (* Position in active write buffer *)
abort_t: unit Lwt.t;
abort_u: unit Lwt.u;
}
+ exception Closed
+
let create flow =
- let ibuf = "",0,0 in
- let obuf = [] in
+ let ibuf = None in
+ let obufq = [] in
+ let obuf = None in
+ let opos = 0 in
let abort_t, abort_u = Lwt.task () in
- { ibuf; obuf; flow; abort_t; abort_u }
+ { ibuf; obuf; flow; obufq; opos; abort_t; abort_u }
let ibuf_refill t =
match_lwt Flow.read t.flow with
|Some buf ->
- t.ibuf <- buf;
+ t.ibuf <- Some buf;
return ()
|None ->
fail Closed
+ let rec get_ibuf t =
+ match t.ibuf with
+ |None -> ibuf_refill t >> get_ibuf t
+ |Some buf when Cstruct.len buf = 0 -> ibuf_refill t >> get_ibuf t
+ |Some buf -> return buf
+
(* Read one character from the input channel *)
- let rec read_char t =
- bitmatch t.ibuf with
- | { c:8; rest:-1:bitstring } ->
- t.ibuf <- rest;
- return (Char.chr c)
- | { rest:-1:bitstring } when Bitstring.bitstring_length rest = 0 ->
- ibuf_refill t >>
- read_char t
+ let read_char t =
+ lwt buf = get_ibuf t in
+ let c = Cstruct.get_char buf 0 in
+ t.ibuf <- Some (Cstruct.shift buf 1);
+ return c
(* Read up to len characters from the input channel
and at most a full view. If not specified, read all *)
let read_some ?len t =
- lwt () = if Bitstring.bitstring_length t.ibuf = 0 then
- ibuf_refill t else return () in
- let avail = Bitstring.bitstring_length t.ibuf in
- let len = match len with |Some len -> len * 8 |None -> avail in
+ lwt buf = get_ibuf t in
+ let avail = Cstruct.len buf in
+ let len = match len with |Some len -> len |None -> avail in
if len < avail then begin
- let r = Bitstring.subbitstring t.ibuf 0 len in
- t.ibuf <- Bitstring.subbitstring t.ibuf len (avail-len);
- return r
+ let hd,tl = Cstruct.split buf len in
+ t.ibuf <- Some tl;
+ return hd
end else begin
- let r = t.ibuf in
- t.ibuf <- "",0,0;
- return r
+ t.ibuf <- None;
+ return buf
end
(* Read up to len characters from the input channel as a
@@ -91,77 +98,110 @@ module Make(Flow:FLOW) :
(* Read until a character is found *)
let read_until t ch =
- lwt () = if Bitstring.bitstring_length t.ibuf = 0 then
- ibuf_refill t else return () in
- try_lwt
- let buf,off,len = t.ibuf in
- let idx = (String.index_between buf (off/8) ((off+len)/8) ch) * 8 in
- let rlen = idx - off in
- (bitmatch t.ibuf with
- | { _:8; rest:-1:bitstring } when rlen = 0 ->
- t.ibuf <- rest;
- return (true, Bitstring.empty_bitstring)
- | { r:rlen:bitstring; _:8; rest:-1:bitstring } ->
- t.ibuf <- rest;
- return (true, r)
- | { _ } ->
- printf "Flow: unexpected bitmatch failure in read_until\n%!";
- exit 1
- )
- with Not_found -> begin
- let r = t.ibuf in
- t.ibuf <- "",0,0;
- return (false,r)
- end
+ lwt buf = get_ibuf t in
+ let len = Cstruct.len buf in
+ let rec scan off =
+ if off = len then None else begin
+ if Cstruct.get_char buf off = ch then
+ Some off else scan (off+1)
+ end
+ in
+ match scan 0 with
+ |None -> (* not found, return what we have until EOF *)
+ t.ibuf <- None;
+ return (false, buf)
+ |Some off -> (* found, so split the buffer *)
+ let hd = Cstruct.sub_buffer buf 0 off in
+ t.ibuf <- Some (Cstruct.shift buf (off+1));
+ return (true, hd)
(* This reads a line of input, which is terminated either by a CRLF
sequence, or the end of the channel (which counts as a line).
@return Returns a stream of views that terminates at EOF.
@raise Closed to signify EOF *)
- let read_crlf t =
+ let read_line t =
let rec get acc =
match_lwt read_until t '\n' with
|(false, v) ->
get (v :: acc)
|(true, v) -> begin
(* chop the CR if present *)
- let vlen = Bitstring.bitstring_length v in
- let v = bitmatch v with
- | { rest:vlen-8:bitstring; 13:8 } when vlen >= 8 -> rest
- | { _ } -> v in
+ let vlen = Cstruct.len v in
+ let v =
+ if vlen > 0 && (Cstruct.get_char v (vlen-1) = '\r') then
+ Cstruct.sub v 0 (vlen-1) else v
+ in
return (v :: acc)
end
in
- lwt res = get [] >|= List.rev in
- return (Bitstring.concat res)
+ get [] >|= List.rev
(* Output functions *)
- let rec flush t =
- let l = List.rev t.obuf in
- lwt res = Flow.writev t.flow l in
- t.obuf <- [res];
- if Bitstring.bitstring_length res > 0 then
- flush t
- else
- return ()
-
- (* Stonkingly inefficient *)
+ let alloc_obuf t =
+ let buf = OS.Io_page.get () in
+ t.obuf <- Some buf;
+ t.opos <- 0;
+ buf
+
+ (* Queue the active write buffer onto the write queue, resizing the
+ * view if necessary to the correct size. *)
+ let queue_obuf t =
+ match t.obuf with
+ |None -> ()
+ |Some buf when Cstruct.len buf = t.opos -> (* obuf is full *)
+ t.obufq <- buf :: t.obufq;
+ t.obuf <- None
+ |Some buf when t.opos = 0 -> (* obuf wasnt ever used, so discard *)
+ t.obuf <- None
+ |Some buf -> (* partially filled obuf, so resize *)
+ let buf = Cstruct.sub buf 0 t.opos in
+ t.obufq <- buf :: t.obufq;
+ t.obuf <- None
+
+ (* Get an active output buffer, which will allocate it if needed.
+ * The position to write into is stored in t.opos *)
+ let get_obuf t =
+ match t.obuf with
+ |None -> alloc_obuf t
+ |Some buf when Cstruct.len buf = t.opos -> queue_obuf t; alloc_obuf t
+ |Some buf -> buf
+
+ (* Non-blocking character write, since Io page allocation never blocks.
+ * That may change in the future... *)
let write_char t ch =
- t.obuf <- ((String.make 1 ch),0,8) :: t.obuf;
- return ()
-
- let write_bitstring t buf =
- t.obuf <- buf :: t.obuf;
- return ()
-
- let write_string t buf =
- write_bitstring t (Bitstring.bitstring_of_string buf)
+ let buf = get_obuf t in
+ Cstruct.set_char buf t.opos ch;
+ t.opos <- t.opos + 1
+
+ (* This is zero copy; flush current IO page and queue up the incoming
+ * buffer directly. *)
+ let write_buffer t buf =
+ queue_obuf t;
+ t.obufq <- buf :: t.obufq
+
+ let rec write_string t s off len =
+ let buf = get_obuf t in
+ let avail = Cstruct.len buf - t.opos in
+ if avail < len then begin
+ Cstruct.set_buffer s off buf t.opos avail;
+ t.opos <- t.opos + avail;
+ write_string t s (off+avail) (len-avail)
+ end else begin
+ Cstruct.set_buffer s off buf t.opos len;
+ t.opos <- t.opos + len
+ end
let write_line t buf =
- write_string t buf >>
+ write_string t buf 0 (String.length buf);
write_char t '\n'
+ let rec flush t =
+ queue_obuf t;
+ let l = List.rev t.obufq in
+ t.obufq <- [];
+ Flow.writev t.flow l
+
let close t =
flush t >>
Flow.close t.flow
@@ -197,9 +237,9 @@ let read_stream ?len = function
| TCPv4 t -> TCPv4.read_stream ?len t
| Pipe t -> Pipe.read_stream ?len t
-let read_crlf = function
- | TCPv4 t -> TCPv4.read_crlf t
- | Pipe t -> Pipe.read_crlf t
+let read_line = function
+ | TCPv4 t -> TCPv4.read_line t
+ | Pipe t -> Pipe.read_line t
let write_char = function
| TCPv4 t -> TCPv4.write_char t
@@ -209,9 +249,9 @@ let write_string = function
| TCPv4 t -> TCPv4.write_string t
| Pipe t -> Pipe.write_string t
-let write_bitstring = function
- | TCPv4 t -> TCPv4.write_bitstring t
- | Pipe t -> Pipe.write_bitstring t
+let write_buffer = function
+ | TCPv4 t -> TCPv4.write_buffer t
+ | Pipe t -> Pipe.write_buffer t
let write_line = function
| TCPv4 t -> TCPv4.write_line t
View
@@ -29,15 +29,15 @@ module Pipe : CHANNEL with
type t
val read_char: t -> char Lwt.t
-val read_some: ?len:int -> t -> Bitstring.t Lwt.t
-val read_until: t -> char -> (bool * Bitstring.t) Lwt.t
-val read_stream: ?len:int -> t -> Bitstring.t Lwt_stream.t
-val read_crlf: t -> Bitstring.t Lwt.t
+val read_some: ?len:int -> t -> OS.Io_page.t Lwt.t
+val read_until: t -> char -> (bool * OS.Io_page.t) Lwt.t
+val read_stream: ?len:int -> t -> OS.Io_page.t Lwt_stream.t
+val read_line: t -> OS.Io_page.t list Lwt.t
-val write_char : t -> char -> unit Lwt.t
-val write_string : t -> string -> unit Lwt.t
-val write_bitstring : t -> Bitstring.t -> unit Lwt.t
-val write_line : t -> string -> unit Lwt.t
+val write_char : t -> char -> unit
+val write_string : t -> string -> int -> int -> unit
+val write_buffer : t -> OS.Io_page.t -> unit
+val write_line : t -> string -> unit
val flush : t -> unit Lwt.t
val close : t -> unit Lwt.t
@@ -53,4 +53,3 @@ val listen :
| `Pipe of peer_uid * (peer_uid -> t -> unit Lwt.t)
| `TCPv4 of ipv4_src * (ipv4_dst -> t -> unit Lwt.t)
] -> unit Lwt.t
-
View
@@ -29,16 +29,15 @@ module UDPv4 = struct
type src = ipv4_addr option * int
type dst = ipv4_addr * int
- type msg = Bitstring.t
+ type msg = OS.Io_page.t
- let rec send mgr ?src (dstaddr, dstport) req =
- let (buf,off,len) = req in
+ let rec send mgr ?src (dstaddr, dstport) buf =
lwt fd = match src with
|None -> return (Manager.get_udpv4 mgr)
|Some src -> Manager.get_udpv4_listener mgr src
in
- let off = off / 8 in
- let len = len / 8 in
+ let off = Cstruct.base_offset buf in
+ let len = Cstruct.len buf in
let dst = (ipv4_addr_to_uint32 dstaddr, dstport) in
match R.udpv4_sendto fd buf off len dst with
|R.OK len' ->
@@ -48,18 +47,18 @@ module UDPv4 = struct
return ()
|R.Retry ->
Activations.write fd >>
- send mgr (dstaddr, dstport) req
+ send mgr (dstaddr, dstport) buf
|R.Err err -> fail (Error err)
let recv mgr (addr,port) fn =
lwt lfd = Manager.get_udpv4_listener mgr (addr,port) in
- let buf = String.create 4096 in
+ let buf = OS.Io_page.get () in
let rec listen () =
- match R.udpv4_recvfrom lfd buf 0 (String.length buf) with
+ match R.udpv4_recvfrom lfd buf 0 (Cstruct.len buf) with
|R.OK (frm_addr, frm_port, len) ->
let frm_addr = ipv4_addr_of_uint32 frm_addr in
let dst = (frm_addr, frm_port) in
- let req = (buf,0,(len * 8)) in
+ let req = Cstruct.sub buf 0 len in
(* Be careful to catch an exception here, as otherwise
ignore_result may raise it at some other random point *)
Lwt.ignore_result (
@@ -18,5 +18,5 @@ module UDPv4 : Nettypes.DATAGRAM with
type mgr = Manager.t
and type src = Nettypes.ipv4_src
and type dst = Nettypes.ipv4_dst
- and type msg = Bitstring.t
+ and type msg = OS.Io_page.t
Oops, something went wrong.

0 comments on commit 3d886b5

Please sign in to comment.