Skip to content

Commit

Permalink
Ipv4.Fragments: use a mutable LRU cache (Lru.M.t instead of mutable c…
Browse files Browse the repository at this point in the history
…ache : Lru.F.t):

- is faster (O(1) instead of O(log n))
- still avoids (due to randomization) collision-based complexity
  • Loading branch information
hannesm committed Oct 13, 2019
1 parent 451c566 commit 2201224
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 99 deletions.
45 changes: 25 additions & 20 deletions src/ipv4/fragments.ml
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,12 @@ end
module K = struct
type t = Ipaddr.V4.t * Ipaddr.V4.t * int * int

let compare (src, dst, proto, id) (src', dst', proto', id') =
let (&&&) a b = match a with 0 -> b | x -> x in
let int_cmp : int -> int -> int = compare in
Ipaddr.V4.compare src src' &&&
Ipaddr.V4.compare dst dst' &&&
int_cmp proto proto' &&&
int_cmp id id'
let equal a b = a = b

let hash r k = Hashtbl.seeded_hash r k
end

module Cache = Lru.F.Make(K)(V)
module Cache = Lru.M.MakeSeeded(K)(V)

(* insert_sorted inserts a fragment in a list, sort is by frag_start, descending *)
let rec insert_sorted ((frag_start, _) as frag) = function
Expand Down Expand Up @@ -125,12 +121,12 @@ let max_duration = Duration.of_sec 10

let process cache ts (packet : Ipv4_packet.t) payload =
let add_trim key value cache =
let cache' = Cache.add key value cache in
Cache.trim cache'
Cache.add key value cache;
Cache.trim cache
in
if packet.off land 0x3FFF = 0 then (* ignore reserved and don't fragment *)
(* fastpath *)
cache, Some (packet, payload)
Some (packet, payload)
else
let offset, more =
(packet.off land 0x1FFF) lsl 3, (* of 8 byte blocks *)
Expand All @@ -141,13 +137,15 @@ let process cache ts (packet : Ipv4_packet.t) payload =
match Cache.find key cache with
| None ->
Log.debug (fun m -> m "%a none found, inserting into cache" Ipv4_packet.pp packet) ;
add_trim key v cache, None
add_trim key v cache;
None
| Some (ts', options, finished, cnt, frags) ->
if Int64.sub ts ts' >= max_duration then begin
Log.warn (fun m -> m "%a found some, but timestamp exceeded duration %a, dropping old segments and inserting new segment into cache" Ipv4_packet.pp packet Duration.pp max_duration) ;
add_trim key v cache, None
end else
let cache' = Cache.promote key cache in
add_trim key v cache;
None
end else begin
Cache.promote key cache;
let all_frags = insert_sorted (offset, payload) frags
and try_reassemble = finished || not more
and options' = if offset = 0 then packet.options else options
Expand All @@ -167,7 +165,8 @@ let process cache ts (packet : Ipv4_packet.t) payload =
| Ok p ->
Log.debug (fun m -> m "%a reassembled to payload %d" Ipv4_packet.pp packet (Cstruct.len p)) ;
let packet' = { packet with options = options' ; off = 0 } in
Cache.remove key cache', Some (packet', p)
Cache.remove key cache;
Some (packet', p)
| Error Bad ->
Log.warn (fun m -> m "%a dropping from cache, bad fragments (%a)"
Ipv4_packet.pp packet
Expand All @@ -176,10 +175,16 @@ let process cache ts (packet : Ipv4_packet.t) payload =
Log.debug (fun m -> m "full fragments: %a"
Fmt.(list ~sep:(unit "@.") Cstruct.hexdump_pp)
(List.map snd all_frags)) ;
Cache.remove key cache', None
| Error Hole -> maybe_add_to_cache cache', None
else
maybe_add_to_cache cache', None
Cache.remove key cache;
None
| Error Hole ->
maybe_add_to_cache cache;
None
else begin
maybe_add_to_cache cache;
None
end
end

(* TODO hdr.options is a Cstruct.t atm, but instead we need to parse all the
options, and distinguish based on the first bit -- only these with the bit
Expand Down
50 changes: 20 additions & 30 deletions src/ipv4/fragments.mli
Original file line number Diff line number Diff line change
Expand Up @@ -45,43 +45,33 @@
identifier, and protocol ID, is received, reassembly is attempted - also on
subsequent packets with the same quadruple. *)

module V : sig
type t = int64 * Cstruct.t * bool * int * (int * Cstruct.t) list
(** The type of values in the fragment cache: a timestamp of the first
received one, IP options (of the first fragment), whether or not the last
fragment was received (the one with more fragments cleared), amount of
received fragments, and a list of pairs of offset and fragment. *)
module V : Lru.Weighted with type t = int64 * Cstruct.t * bool * int * (int * Cstruct.t) list
(** The type of values in the fragment cache: a timestamp of the first
received one, IP options (of the first fragment), whether or not the last
fragment was received (the one with more fragments cleared), amount of
received fragments, and a list of pairs of offset and fragment. *)

val weight : t -> int
(** [weight t] is the data length of the received fragments. *)
end
module K : Hashtbl.SeededHashedType
with type t = Ipaddr.V4.t * Ipaddr.V4.t * int * int
(** The type of keys in the fragment cache: source IP address, destination
IP address, protocol type, and IP identifier. *)

module K : sig
type t = Ipaddr.V4.t * Ipaddr.V4.t * int * int
(** The type of keys in the fragment cache: source IP address, destination
IP address, protocol type, and IP identifier. *)

val compare : t -> t -> int
end

module Cache : sig
include Lru.F.S with type k = K.t and type v = V.t
end
module Cache : Lru.M.S with type k = K.t and type v = V.t

val max_duration : int64
(** [max_duration] is the maximum delta between first and last received
fragment, in nanoseconds. At the moment it is 10 seconds. *)

val process : Cache.t -> int64 -> Ipv4_packet.t -> Cstruct.t -> Cache.t *
(Ipv4_packet.t * Cstruct.t) option (** [process t timestamp hdr payload] is
[t'], a new cache, and maybe a fully reassembled IPv4 packet. If reassembly
fails, e.g. too many fragments, delta between receive timestamp of first and
last packet exceeds {!max_duration}, overlapping packets, these packets
will be dropped from the cache. The IPv4 header options are always taken from
the first fragment (where offset is 0). If the provided IPv4 header has an
fragmentation offset of 0, and the more fragments bit is not set, the given
header and payload is directly returned. Handles out-of-order fragments
gracefully. *)
val process : Cache.t -> int64 -> Ipv4_packet.t -> Cstruct.t ->
(Ipv4_packet.t * Cstruct.t) option
(** [process t timestamp hdr payload] is [t'], a new cache, and maybe a fully
reassembled IPv4 packet. If reassembly fails, e.g. too many fragments, delta
between receive timestamp of first and last packet exceeds {!max_duration},
overlapping packets, these packets will be dropped from the cache. The IPv4
header options are always taken from the first fragment (where offset is
0). If the provided IPv4 header has an fragmentation offset of 0, and the
more fragments bit is not set, the given header and payload is directly
returned. Handles out-of-order fragments gracefully. *)

val fragment : mtu:int -> Ipv4_packet.t -> Cstruct.t -> Cstruct.t list
(** [fragment ~mtu hdr payload] is called with the IPv4 header of the first
Expand Down
8 changes: 3 additions & 5 deletions src/ipv4/static_ipv4.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ module Make (R: Mirage_random.C) (C: Mirage_clock.MCLOCK) (Ethernet: Mirage_prot
mutable ip: Ipaddr.V4.t;
network: Ipaddr.V4.Prefix.t;
mutable gateway: Ipaddr.V4.t option;
mutable cache: Fragments.Cache.t;
cache: Fragments.Cache.t;
}

let write t ?(fragment = true) ?(ttl = 38) ?src dst proto ?(size = 0) headerf bufs =
Expand Down Expand Up @@ -151,9 +151,7 @@ module Make (R: Mirage_random.C) (C: Mirage_clock.MCLOCK) (Ethernet: Mirage_prot
Lwt.return_unit
end else
let ts = C.elapsed_ns t.clock in
let cache, res = Fragments.process t.cache ts packet payload in
t.cache <- cache ;
match res with
match Fragments.process t.cache ts packet payload with
| None -> Lwt.return_unit
| Some (packet, payload) ->
let src, dst = packet.src, packet.dst in
Expand All @@ -175,7 +173,7 @@ module Make (R: Mirage_random.C) (C: Mirage_clock.MCLOCK) (Ethernet: Mirage_prot
Arpv4.set_ips arp [ip] >>= fun () ->
(* TODO currently hardcoded to 256KB, should be configurable
and maybe limited per-src/dst-ip as well? *)
let cache = Fragments.Cache.empty (1024 * 256) in
let cache = Fragments.Cache.create ~random:true (1024 * 256) in
let t = { ethif; arp; ip; clock; network; gateway ; cache } in
Lwt.return t

Expand Down
113 changes: 69 additions & 44 deletions test/test_ipv4.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,129 +64,152 @@ let gray =
Cstruct.memset buf 0x55 ;
buf

let empty_cache = Fragments.Cache.empty 1000
let below_max = Int64.sub Fragments.max_duration 1L

let basic_fragments payload () =
let cache = Fragments.Cache.create 1000 in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__
(Some (test_packet, payload))
(snd @@ Fragments.process empty_cache 0L test_packet payload)) ;
(Fragments.process cache 0L test_packet payload));
let off_packet = { test_packet with off = 1 } in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__
None
(snd @@ Fragments.process empty_cache 0L off_packet payload)) ;
(Fragments.process cache 0L off_packet payload));
Lwt.return_unit

let basic_reassembly () =
let empty_cache = Fragments.Cache.create 1000 in
let more_frags = { test_packet with off = mf } in
let cache, res = Fragments.process empty_cache 0L more_frags black in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
let res = Fragments.process empty_cache 0L more_frags black in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
let off_packet = { test_packet with off = 2 } in
Alcotest.(check (option (pair ipv4_packet cstruct)) "reassembly of two segments works"
(Some (test_packet, Cstruct.append black white))
(snd @@ Fragments.process cache 0L off_packet white)) ;
(Fragments.process empty_cache 0L off_packet white));
Lwt.return_unit

let basic_reassembly_timeout () =
let cache = Fragments.Cache.create 1000 in
let more_frags = { test_packet with off = mf } in
let cache, res = Fragments.process empty_cache 0L more_frags black in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
let res = Fragments.process cache 0L more_frags black in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
let off_packet = { test_packet with off = 2 } in
let below_max = Int64.sub Fragments.max_duration 1L in
Alcotest.(check (option (pair ipv4_packet cstruct)) "even after just before max duration"
(Some (test_packet, Cstruct.append black white))
(snd @@ Fragments.process cache below_max off_packet white)) ;
Alcotest.(check (option (pair ipv4_packet cstruct)) "none after max duration"
None
(snd @@ Fragments.process cache Fragments.max_duration off_packet white)) ;
(Fragments.process cache Fragments.max_duration off_packet white));
Lwt.return_unit

let multiple_reassembly () =
let cache = Fragments.Cache.create 1000 in
let more_frags = { test_packet with off = mf } in
let res = Fragments.process cache 0L more_frags black in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
let more_off_packet = { test_packet with off = mf lor 2 } in
let cache, res = Fragments.process cache below_max more_off_packet gray in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
let res = Fragments.process cache below_max more_off_packet gray in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
let final_packet = { test_packet with off = 4 } in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__
(Some (test_packet, Cstruct.concat [ black; gray; white]))
(snd @@ Fragments.process cache below_max final_packet white)) ;
(Fragments.process cache below_max final_packet white));
Lwt.return_unit

let multiple_reassembly_timeout () =
let cache = Fragments.Cache.create 1000 in
let more_frags = { test_packet with off = mf } in
let res = Fragments.process cache 0L more_frags black in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
let more_off_packet = { test_packet with off = mf lor 2 } in
let res = Fragments.process cache below_max more_off_packet gray in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
let final_packet = { test_packet with off = 4 } in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__
None
(snd @@ Fragments.process cache Fragments.max_duration off_packet white)) ;
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
(Fragments.process cache Fragments.max_duration final_packet white));
Lwt.return_unit

let reassembly_out_of_order () =
let cache = Fragments.Cache.create 1000 in
let more_frags = { test_packet with off = mf } in
let off_packet = { test_packet with off = 2 } in
let cache, res = Fragments.process empty_cache 0L off_packet gray in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
let res = Fragments.process cache 0L off_packet gray in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
Alcotest.(check (option (pair ipv4_packet cstruct)) "reassembly of two segments works"
(Some (test_packet, Cstruct.append black gray))
(snd @@ Fragments.process cache 0L more_frags black)) ;
(Fragments.process cache 0L more_frags black));
Lwt.return_unit

let reassembly_multiple_out_of_order packets final_payload () =
let _, res = List.fold_left (fun (cache, res) (off, payload) ->
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
let cache = Fragments.Cache.create 1000 in
let res = List.fold_left (fun res (off, payload) ->
Alcotest.(check (option (pair ipv4_packet cstruct))
__LOC__ None res);
let packet = { test_packet with off } in
Fragments.process cache 0L packet payload)
(empty_cache, None) packets
None packets
in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__
(Some (test_packet, final_payload))
res) ;
Lwt.return_unit

let basic_overlaps () =
let cache = Fragments.Cache.create 1000 in
let more_frags = { test_packet with off = mf } in
let off_packet = { test_packet with off = 1 } in
let cache, res = Fragments.process empty_cache 0L off_packet black in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
let res = Fragments.process cache 0L off_packet black in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None
(snd @@ Fragments.process cache 0L more_frags white)) ;
(Fragments.process cache 0L more_frags white));
Lwt.return_unit

let basic_other_ip_flow () =
let cache = Fragments.Cache.create 1000 in
let more_frags = { test_packet with off = mf } in
let cache, res = Fragments.process empty_cache 0L more_frags black in
let res = Fragments.process cache 0L more_frags black in
let off_packet = { test_packet with off = 2 ; src = Ipaddr.V4.of_string_exn "127.0.0.2" } in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None
(snd @@ Fragments.process cache 0L off_packet white)) ;
(Fragments.process cache 0L off_packet white));
let off_packet' = { test_packet with off = 2 ; proto = 25 } in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None
(snd @@ Fragments.process cache 0L off_packet' white)) ;
(Fragments.process cache 0L off_packet' white));
Lwt.return_unit

let max_fragment () =
let cache = Fragments.Cache.create 1000 in
let all_16 = [ white; gray; black; white;
white; gray; black; white;
white; gray; black; white;
white; gray; black ; gray ]
in
let (cache, res), off =
List.fold_left (fun ((cache, res), off) payload ->
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
let res, off =
List.fold_left (fun (res, off) payload ->
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
let r = Fragments.process cache 0L { test_packet with off = off lor mf } payload in
(r, Cstruct.len payload / 8 + off))
((empty_cache, None), 0)
(None, 0)
all_16
in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__
(Some (test_packet, Cstruct.concat (all_16 @ [white ])))
(snd @@ Fragments.process cache 0L { test_packet with off } white)) ;
let cache, res = Fragments.process cache 0L { test_packet with off = off lor mf } white in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
(Fragments.process cache 0L { test_packet with off } white));
let res = Fragments.process cache 0L { test_packet with off = off lor mf } white in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__
None
(snd @@ Fragments.process cache 0L { test_packet with off = off + 2 } black)) ;
(Fragments.process cache 0L { test_packet with off = off + 2 } black));
Lwt.return_unit

let none_returned packets () =
let _, res = List.fold_left (fun (cache, res) (off, payload) ->
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
let cache = Fragments.Cache.create 1000 in
let res = List.fold_left (fun res (off, payload) ->
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
let packet = { test_packet with off } in
Fragments.process cache 0L packet payload)
(empty_cache, None) packets
None packets
in
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res) ;
Alcotest.(check (option (pair ipv4_packet cstruct)) __LOC__ None res);
Lwt.return_unit

let ins_all_positions x l =
Expand Down Expand Up @@ -251,6 +274,8 @@ let suite = [
[ 0 ; 1 ; 2 ; 10 ; 100 ; 1000 ; 5000 ; 10000 ] @ [
"basic reassembly", `Quick, basic_reassembly;
"basic reassembly timeout", `Quick, basic_reassembly_timeout;
"multiple reassembly", `Quick, multiple_reassembly;
"multiple reassembly timeout", `Quick, multiple_reassembly_timeout;
"reassembly out of order", `Quick, reassembly_out_of_order ;
"other ip flow", `Quick, basic_other_ip_flow ;
"maximum amount of fragments", `Quick, max_fragment ] @
Expand Down

0 comments on commit 2201224

Please sign in to comment.