Skip to content

Commit

Permalink
Use Lwt_dllist instead of Lwt_sequence
Browse files Browse the repository at this point in the history
This is due to the latter being deprecated upstream in Lwt (ocsigen/lwt#361)
  • Loading branch information
avsm committed Jan 14, 2019
1 parent 64cb1d6 commit 5f29f57
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 48 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## dev

* Use `Lwt_dllist` instead of `Lwt_sequence`, due to the latter being deprecated
upstream in Lwt (ocsigen/lwt#361).

### v3.6.0 (2019-01-04)

* The IPv4 implementation now supports reassembly of IPv4 fragments (#375 by @hannesm)
Expand Down
2 changes: 1 addition & 1 deletion src/tcp/jbuild
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(library
((name tcp)
(public_name tcpip.tcp)
(libraries (logs mirage-protocols-lwt ipaddr cstruct
(libraries (logs mirage-protocols-lwt ipaddr cstruct lwt-dllist
rresult mirage-profile io-page tcpip duration
randomconv fmt mirage-time-lwt mirage-clock
mirage-random))
Expand Down
20 changes: 10 additions & 10 deletions src/tcp/segment.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,29 @@ let src = Logs.Src.create "segment" ~doc:"Mirage TCP Segment module"
module Log = (val Logs.src_log src : Logs.LOG)

let lwt_sequence_add_l s seq =
let (_:'a Lwt_sequence.node) = Lwt_sequence.add_l s seq in
let (_:'a Lwt_dllist.node) = Lwt_dllist.add_l s seq in
()

let lwt_sequence_add_r s seq =
let (_:'a Lwt_sequence.node) = Lwt_sequence.add_r s seq in
let (_:'a Lwt_dllist.node) = Lwt_dllist.add_r s seq in
()

let peek_opt_l seq =
match Lwt_sequence.take_opt_l seq with
match Lwt_dllist.take_opt_l seq with
| None -> None
| Some s ->
lwt_sequence_add_l s seq;
Some s

let peek_l seq =
match Lwt_sequence.take_opt_l seq with
match Lwt_dllist.take_opt_l seq with
| None -> assert false
| Some s ->
let _ = Lwt_sequence.add_l s seq in
let _ = Lwt_dllist.add_l s seq in
s

let rec reset_seq segs =
match Lwt_sequence.take_opt_l segs with
match Lwt_dllist.take_opt_l segs with
| None -> ()
| Some _ -> reset_seq segs

Expand Down Expand Up @@ -258,7 +258,7 @@ module Tx (Time:Mirage_time_lwt.S) (Clock:Mirage_clock.MCLOCK) = struct

(* Queue of pre-transmission segments *)
type ('a, 'b) q = {
segs: seg Lwt_sequence.t; (* Retransmitted segment queue *)
segs: seg Lwt_dllist.t; (* Retransmitted segment queue *)
xmit: ('a, 'b) xmit; (* Transmit packet to the wire *)
rx_ack: Sequence.t Lwt_mvar.t; (* RX Ack thread that we've sent one *)
wnd: Window.t; (* TCP Window information *)
Expand Down Expand Up @@ -328,7 +328,7 @@ module Tx (Time:Mirage_time_lwt.S) (Clock:Mirage_clock.MCLOCK) = struct
| false -> Sequence.zero (* here we return 0l instead of ack_remaining in case
the ack was an old packet in the network *)
| true ->
match Lwt_sequence.take_opt_l segs with
match Lwt_dllist.take_opt_l segs with
| None ->
Log.debug (fun f -> f "Dubious ACK received");
ack_remaining
Expand Down Expand Up @@ -391,7 +391,7 @@ module Tx (Time:Mirage_time_lwt.S) (Clock:Mirage_clock.MCLOCK) = struct
let dupacktest () =
0l = Sequence.to_int32 ack_len &&
Window.tx_wnd_unscaled q.wnd = Int32.of_int win &&
not (Lwt_sequence.is_empty q.segs)
not (Lwt_dllist.is_empty q.segs)
in
serviceack (dupacktest ()) ack_len seq win
end >>= fun () ->
Expand All @@ -402,7 +402,7 @@ module Tx (Time:Mirage_time_lwt.S) (Clock:Mirage_clock.MCLOCK) = struct
tx_ack_t ()

let create ~clock ~xmit ~wnd ~state ~rx_ack ~tx_ack ~tx_wnd_update =
let segs = Lwt_sequence.create () in
let segs = Lwt_dllist.create () in
let dup_acks = 0 in
let expire = ontimer xmit state segs wnd in
let period_ns = Window.rto wnd in
Expand Down
74 changes: 37 additions & 37 deletions src/tcp/user_buffer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
open Lwt.Infix

let lwt_sequence_add_l s seq =
let (_:'a Lwt_sequence.node) = Lwt_sequence.add_l s seq in
let (_:'a Lwt_dllist.node) = Lwt_dllist.add_l s seq in
()

(* A bounded queue to receive data segments and let readers block on
Expand All @@ -32,19 +32,19 @@ module Rx = struct
stops taking data the window closes so the other side stops sending *)

type t = {
q: Cstruct.t option Lwt_sequence.t;
q: Cstruct.t option Lwt_dllist.t;
wnd: Window.t;
writers: unit Lwt.u Lwt_sequence.t;
readers: Cstruct.t option Lwt.u Lwt_sequence.t;
writers: unit Lwt.u Lwt_dllist.t;
readers: Cstruct.t option Lwt.u Lwt_dllist.t;
mutable watcher: int32 Lwt_mvar.t option;
mutable max_size: int32;
mutable cur_size: int32;
}

let create ~max_size ~wnd =
let q = Lwt_sequence.create () in
let writers = Lwt_sequence.create () in
let readers = Lwt_sequence.create () in
let q = Lwt_dllist.create () in
let writers = Lwt_dllist.create () in
let readers = Lwt_dllist.create () in
let watcher = None in
let cur_size = 0l in
{ q; wnd; writers; readers; max_size; cur_size; watcher }
Expand All @@ -64,34 +64,34 @@ module Rx = struct
let add_r t s =
if t.cur_size > t.max_size then
let th,u = MProf.Trace.named_task "User_buffer.add_r" in
let node = Lwt_sequence.add_r u t.writers in
Lwt.on_cancel th (fun _ -> Lwt_sequence.remove node);
let node = Lwt_dllist.add_r u t.writers in
Lwt.on_cancel th (fun _ -> Lwt_dllist.remove node);
(* Update size before blocking, which may push cur_size above max_size *)
t.cur_size <- Int32.(add t.cur_size (of_int (seglen s)));
notify_size_watcher t >>= fun () ->
th >>= fun () ->
ignore(Lwt_sequence.add_r s t.q);
ignore(Lwt_dllist.add_r s t.q);
Lwt.return_unit
else match Lwt_sequence.take_opt_l t.readers with
else match Lwt_dllist.take_opt_l t.readers with
| None ->
t.cur_size <- Int32.(add t.cur_size (of_int (seglen s)));
ignore(Lwt_sequence.add_r s t.q);
ignore(Lwt_dllist.add_r s t.q);
notify_size_watcher t
| Some u ->
Lwt.return (Lwt.wakeup u s)

let take_l t =
if Lwt_sequence.is_empty t.q then begin
if Lwt_dllist.is_empty t.q then begin
let th,u = MProf.Trace.named_task "User_buffer.take_l" in
let node = Lwt_sequence.add_r u t.readers in
Lwt.on_cancel th (fun _ -> Lwt_sequence.remove node);
let node = Lwt_dllist.add_r u t.readers in
Lwt.on_cancel th (fun _ -> Lwt_dllist.remove node);
th
end else begin
let s = Lwt_sequence.take_l t.q in
let s = Lwt_dllist.take_l t.q in
t.cur_size <- Int32.(sub t.cur_size (of_int (seglen s)));
notify_size_watcher t >>= fun () ->
if t.cur_size < t.max_size then begin
match Lwt_sequence.take_opt_l t.writers with
match Lwt_dllist.take_opt_l t.writers with
|None -> ()
|Some w -> Lwt.wakeup w ()
end;
Expand All @@ -117,16 +117,16 @@ module Tx(Time:Mirage_time_lwt.S)(Clock:Mirage_clock.MCLOCK) = struct

type t = {
wnd: Window.t;
writers: unit Lwt.u Lwt_sequence.t;
writers: unit Lwt.u Lwt_dllist.t;
txq: TXS.t;
buffer: Cstruct.t Lwt_sequence.t;
buffer: Cstruct.t Lwt_dllist.t;
max_size: int32;
mutable bufbytes: int32;
}

let create ~max_size ~wnd ~txq =
let buffer = Lwt_sequence.create () in
let writers = Lwt_sequence.create () in
let buffer = Lwt_dllist.create () in
let writers = Lwt_dllist.create () in
let bufbytes = 0l in
{ wnd; writers; txq; buffer; max_size; bufbytes }

Expand Down Expand Up @@ -157,8 +157,8 @@ module Tx(Time:Mirage_time_lwt.S)(Clock:Mirage_clock.MCLOCK) = struct
end
else begin
let th,u = MProf.Trace.named_task "User_buffer.wait_for" in
let node = Lwt_sequence.add_r u t.writers in
Lwt.on_cancel th (fun _ -> Lwt_sequence.remove node);
let node = Lwt_dllist.add_r u t.writers in
Lwt.on_cancel th (fun _ -> Lwt_dllist.remove node);
th >>= fun () ->
wait_for t sz
end
Expand All @@ -167,20 +167,20 @@ module Tx(Time:Mirage_time_lwt.S)(Clock:Mirage_clock.MCLOCK) = struct

(* Wait until the user buffer is flushed *)
let rec wait_for_flushed t =
if Lwt_sequence.is_empty t.buffer then begin
if Lwt_dllist.is_empty t.buffer then begin
Lwt.return_unit
end
else begin
let th,u = MProf.Trace.named_task "User_buffer.wait_for_flushed" in
let node = Lwt_sequence.add_r u t.writers in
Lwt.on_cancel th (fun _ -> Lwt_sequence.remove node);
let node = Lwt_dllist.add_r u t.writers in
Lwt.on_cancel th (fun _ -> Lwt_dllist.remove node);
th >>= fun () ->
wait_for_flushed t
end

let rec clear_buffer t =
let rec addon_more curr_data l =
match Lwt_sequence.take_opt_l t.buffer with
match Lwt_dllist.take_opt_l t.buffer with
| None -> List.rev curr_data
| Some s ->
let s_len = len s in
Expand All @@ -194,7 +194,7 @@ module Tx(Time:Mirage_time_lwt.S)(Clock:Mirage_clock.MCLOCK) = struct
in
let get_pkt_to_send () =
let avail_len = min (available_cwnd t) (Int32.of_int (Window.tx_mss t.wnd)) in
let s = Lwt_sequence.take_l t.buffer in
let s = Lwt_dllist.take_l t.buffer in
let s_len = len s in
match s_len > avail_len with
| true -> begin
Expand All @@ -218,7 +218,7 @@ module Tx(Time:Mirage_time_lwt.S)(Clock:Mirage_clock.MCLOCK) = struct
t.bufbytes <- Int32.sub t.bufbytes s_len;
Some [s]
in
match Lwt_sequence.is_empty t.buffer with
match Lwt_dllist.is_empty t.buffer with
| true -> Lwt.return_unit
| false ->
match get_pkt_to_send () with
Expand Down Expand Up @@ -256,11 +256,11 @@ module Tx(Time:Mirage_time_lwt.S)(Clock:Mirage_clock.MCLOCK) = struct
let write t datav =
let l = lenv datav in
let mss = Int32.of_int (Window.tx_mss t.wnd) in
match Lwt_sequence.is_empty t.buffer &&
match Lwt_dllist.is_empty t.buffer &&
(l = mss || not (Window.tx_inflight t.wnd)) with
| false ->
t.bufbytes <- Int32.add t.bufbytes l;
List.iter (fun data -> ignore(Lwt_sequence.add_r data t.buffer)) datav;
List.iter (fun data -> ignore(Lwt_dllist.add_r data t.buffer)) datav;
if t.bufbytes < mss then
Lwt.return_unit
else
Expand All @@ -270,33 +270,33 @@ module Tx(Time:Mirage_time_lwt.S)(Clock:Mirage_clock.MCLOCK) = struct
match avail_len < l with
| true ->
t.bufbytes <- Int32.add t.bufbytes l;
List.iter (fun data -> ignore(Lwt_sequence.add_r data t.buffer)) datav;
List.iter (fun data -> ignore(Lwt_dllist.add_r data t.buffer)) datav;
Lwt.return_unit
| false ->
let max_size = Window.tx_mss t.wnd in
transmit_segments ~mss:max_size ~txq:t.txq datav

let write_nodelay t datav =
let l = lenv datav in
match Lwt_sequence.is_empty t.buffer with
match Lwt_dllist.is_empty t.buffer with
| false ->
t.bufbytes <- Int32.add t.bufbytes l;
List.iter (fun data -> ignore(Lwt_sequence.add_r data t.buffer)) datav;
List.iter (fun data -> ignore(Lwt_dllist.add_r data t.buffer)) datav;
Lwt.return_unit
| true ->
let avail_len = available_cwnd t in
match avail_len < l with
| true ->
t.bufbytes <- Int32.add t.bufbytes l;
List.iter (fun data -> ignore(Lwt_sequence.add_r data t.buffer)) datav;
List.iter (fun data -> ignore(Lwt_dllist.add_r data t.buffer)) datav;
Lwt.return_unit
| false ->
let max_size = Window.tx_mss t.wnd in
transmit_segments ~mss:max_size ~txq:t.txq datav


let inform_app t =
match Lwt_sequence.take_opt_l t.writers with
match Lwt_dllist.take_opt_l t.writers with
| None -> Lwt.return_unit
| Some w ->
Lwt.wakeup w ();
Expand All @@ -314,7 +314,7 @@ module Tx(Time:Mirage_time_lwt.S)(Clock:Mirage_clock.MCLOCK) = struct
let reset t =
(* FIXME: duplicated code with Segment.reset_seq *)
let rec reset_seq segs =
match Lwt_sequence.take_opt_l segs with
match Lwt_dllist.take_opt_l segs with
| None -> ()
| Some _ -> reset_seq segs
in
Expand Down
1 change: 1 addition & 0 deletions tcpip.opam
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ depends: [
"mirage-profile" {>= "0.5"}
"fmt"
"lwt" {>= "3.0.0"}
"lwt-dlllist"
"logs" {>= "0.6.0"}
"duration"
"io-page-unix"
Expand Down

0 comments on commit 5f29f57

Please sign in to comment.