Browse files

[feature] libnet: Partially functorised stream parser, instantiated o…

…n Buf and Buffer.
  • Loading branch information...
1 parent a12b805 commit d1b6eda64cb4e08b3429367bcf831f9a9d8eeba4 @nrs135 nrs135 committed Sep 13, 2011
Showing with 102 additions and 43 deletions.
  1. +98 −43 libnet/httpTools.ml
  2. +4 −0 libnet/httpTools.mli
View
141 libnet/httpTools.ml
@@ -379,50 +379,105 @@ let upto_mark_stream_cps3 ?inclusive sched conn (buf,pos) mark ?callback payload
upto_mark_stream_cps2 ?inclusive sched conn (buf,pos) mark ?callback payload ?blocksize ?err_cont ?timeout
(fun (buf,start,len) -> cont (Buffer.sub buf start len))
-let fixed_stream_cps2 sched conn (buf,pos) count ?callback payload ?blocksize ?err_cont ?timeout cont =
- let conn_id = conn.Scheduler.conn_id in
- buf_clean (buf,pos);
- let start = !pos in
- let call_callback = get_callback ?callback payload ?blocksize ?err_cont start pos buf () in
- pos := Buffer.length buf;
- let rec aux () =
- #<If$minlevel 2>Logger.debug "fixed_stream_cps2: conn_id:%d pos=%d start=%d count=%d buflen=%d"
- conn_id !pos start count (Buffer.length buf)#<End>;
- if call_callback ()
- then begin
- if !pos - start >= count
+(* TODO: Expand this to the other stream parser functions *)
+module type BUF_SIG =
+sig
+ type t
+ val length : t -> int
+ val sub : t -> int -> int -> string
+ val clear : t -> unit
+ val reset : t -> unit
+ val add_string : t -> string -> unit
+ val read_more :
+ Scheduler.t -> Scheduler.connection_info -> ?read_max:int -> ?timeout:Time.t ->
+ t -> ?size_max:int -> ?err_cont:(exn -> unit) -> (int * t -> unit) -> unit
+ end
+
+module type STREAM_PARSER_SIG =
+sig
+ module B : BUF_SIG
+ val fixed_stream_cps2 :
+ (*?oc_opt:out_channel option ->*) Scheduler.t -> Scheduler.connection_info -> (B.t * int ref) -> int ->
+ ?callback:('a -> int -> B.t -> bool) -> 'a -> ?blocksize:int ->
+ ?err_cont:(exn -> unit) -> ?timeout:Time.t -> (B.t * int * int -> unit) -> unit
+end
+
+module StreamParserF(B: BUF_SIG) : STREAM_PARSER_SIG with module B = B =
+struct
+
+ module B = B
+
+ let buf_clean (b,pos) =
+ #<If$minlevel 10>Logger.debug "buf_clean: blen=%d pos=%d" (B.length b) (!pos)#<End>;
+ let blen = B.length b in
+ if !pos >= blen
+ then (#<If$minlevel 2>Logger.debug "buf_clean: clear"#<End>; B.clear b; pos := 0)
+ else
+ let tq x = (x lsr 1) + (x lsr 2) in
+ if blen >= 1024 && !pos >= (tq blen)
then
- (if !pos - start > count then pos := start + count;
- (*#<If>Logger.debug "HttpTools.fixed_stream_cps2: returning pos=%d '%s'"
- !pos (String.escaped (Buffer.sub buf start count))#<End>;*)
- cont (buf,start,count))
- else
- let err_cont = Option.default (fun exn -> Logger.debug "fixed_stream_cps2(A): conn_id=%d exn=%s"
- conn_id (Printexc.to_string exn)) err_cont in
- (#<If$minlevel 2>Logger.debug "fixed_stream_cps2(read_more2): pos=%d buflen=%d" !pos (Buffer.length buf)#<End>;
- try Scheduler.read_more2 ?timeout sched conn buf
- ~err_cont:(function
- | End_of_file -> (Logger.debug "fixed_stream_cps2: got End_of_file";
- cont (buf,start,(!pos)-start))
- | exn -> (Logger.debug "fixed_stream_cps2(B): conn_id=%d exn=%s"
- conn_id (Printexc.to_string exn);
- err_cont exn))
- (fun (n,_buf) ->
- (*#<If$minlevel 10>match oc_opt with
- | Some oc -> (output_string oc (Buffer.sub buf !pos n); Pervasives.flush oc)
- | None -> ()#<End>;*)
- #<If$minlevel 2>Logger.debug "fixed_stream_cps2: conn_id=%d read %d" conn_id n;
- (*Logger.debug "fixed_stream_cps2: buf='%s'" (String.escaped (Buffer.contents buf))*)
- #<End>;
- if n <= 0
- then (Logger.debug "fixed_stream_cps2: raising End_of_file"; raise End_of_file)
- else (pos := !pos + n; aux ()))
- with exn -> (Logger.debug "fixed_stream_cps2(C): conn_id=%d exn=%s"
- conn_id (Printexc.to_string exn); cont (buf,start,(!pos)-start)))
- end
- else ()
- in
- aux ()
+ let str = B.sub b !pos (blen-(!pos)) in
+ B.clear b;
+ B.add_string b str;
+ #<If$minlevel 2>Logger.debug "buf_clean: remove %d shift %d" (blen - B.length b) (String.length str)#<End>;
+ pos := 0
+
+ let fixed_stream_cps2 sched conn (buf,pos) count ?callback payload ?blocksize ?err_cont ?timeout cont =
+ let conn_id = conn.Scheduler.conn_id in
+ buf_clean (buf,pos);
+ let start = !pos in
+ let call_callback = get_callback ?callback payload ?blocksize ?err_cont start pos buf () in
+ pos := B.length buf;
+ let rec aux () =
+ #<If$minlevel 2>Logger.debug "fixed_stream_cps2: conn_id:%d pos=%d start=%d count=%d buflen=%d"
+ conn_id !pos start count (B.length buf)#<End>;
+ if call_callback ()
+ then begin
+ if !pos - start >= count
+ then
+ (if !pos - start > count then pos := start + count;
+ (*#<If>Logger.debug "HttpTools.fixed_stream_cps2: returning pos=%d '%s'"
+ !pos (String.escaped (B.sub buf start count))#<End>;*)
+ cont (buf,start,count))
+ else
+ let err_cont = Option.default (fun exn -> Logger.debug "fixed_stream_cps2(A): conn_id=%d exn=%s"
+ conn_id (Printexc.to_string exn)) err_cont in
+ (#<If$minlevel 2>Logger.debug "fixed_stream_cps2(read_more2): pos=%d buflen=%d" !pos (B.length buf)#<End>;
+ try B.read_more ?timeout sched conn buf
+ ~err_cont:(function
+ | End_of_file -> (Logger.debug "fixed_stream_cps2: got End_of_file";
+ cont (buf,start,(!pos)-start))
+ | exn -> (Logger.debug "fixed_stream_cps2(B): conn_id=%d exn=%s"
+ conn_id (Printexc.to_string exn);
+ err_cont exn))
+ (fun (n,_buf) ->
+ (*#<If$minlevel 10>match oc_opt with
+ | Some oc -> (output_string oc (B.sub buf !pos n); Pervasives.flush oc)
+ | None -> ()#<End>;*)
+ #<If$minlevel 2>Logger.debug "fixed_stream_cps2: conn_id=%d read %d" conn_id n;
+ (*Logger.debug "fixed_stream_cps2: buf='%s'" (String.escaped (B.contents buf))*)
+ #<End>;
+ if n <= 0
+ then (Logger.debug "fixed_stream_cps2: raising End_of_file"; raise End_of_file)
+ else (pos := !pos + n; aux ()))
+ with exn -> (Logger.debug "fixed_stream_cps2(C): conn_id=%d exn=%s"
+ conn_id (Printexc.to_string exn); cont (buf,start,(!pos)-start)))
+ end
+ else ()
+ in
+ aux ()
+
+ end
+
+module Buf_ : BUF_SIG with type t = Buf.t =
+struct include Buf let read_more = Scheduler.read_more4 end
+module StreamParserBuf : STREAM_PARSER_SIG with module B = Buf_ = StreamParserF(Buf_)
+let fixed_stream_cps2_buf = StreamParserBuf.fixed_stream_cps2
+
+module Buffer_ : BUF_SIG with type t = Buffer.t =
+struct include Buffer let read_more = Scheduler.read_more2 end
+module StreamParserBuffer : STREAM_PARSER_SIG with module B = Buffer_ = StreamParserF(Buffer_)
+let fixed_stream_cps2 = StreamParserBuffer.fixed_stream_cps2
let fixed_stream_cps3 sched conn (buf,pos) count ?callback payload ?blocksize ?err_cont ?timeout cont =
fixed_stream_cps2 sched conn (buf,pos) count ?callback payload ?blocksize ?err_cont ?timeout
View
4 libnet/httpTools.mli
@@ -91,6 +91,10 @@ val read_fixed_stream_cps :
Scheduler.t ->
?err_cont:(exn -> unit) -> ?timeout:Time.t -> (string -> unit) -> unit
+val fixed_stream_cps2_buf :
+ (*?oc_opt:out_channel option ->*) Scheduler.t -> Scheduler.connection_info -> (Buf.t * int ref) -> int ->
+ ?callback:('a -> int -> Buf.t -> bool) -> 'a -> ?blocksize:int ->
+ ?err_cont:(exn -> unit) -> ?timeout:Time.t -> (Buf.t * int * int -> unit) -> unit
val fixed_stream_cps2 :
(*?oc_opt:out_channel option ->*) Scheduler.t -> Scheduler.connection_info -> (Buffer.t * int ref) -> int ->
?callback:('a -> int -> Buffer.t -> bool) -> 'a -> ?blocksize:int ->

0 comments on commit d1b6eda

Please sign in to comment.