Skip to content

Commit

Permalink
Close partially-established TCP connections
Browse files Browse the repository at this point in the history
If a client sends SYN, we connect the external socket and reply with
SYN ACK. If the client responds with RST ACK then previously we would
leak the connection.

This patch extends the existing mechanism which closes connections when
switch ports are timed-out, adding a connection close when such an
"early reset" is encountered. Once the connection has been established
we assume we can use the existing closing mechanism: a client sending
a RST should cause the TCP/IP stack to close our flow.

Related to [docker/for-mac#1132]

Signed-off-by: David Scott <dave.scott@docker.com>
  • Loading branch information
djs55 committed Jun 6, 2018
1 parent 3a8cfb4 commit 5d7e3ab
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 28 deletions.
5 changes: 3 additions & 2 deletions src/hostnet/frame.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ and t =
| Icmp: { ty: int; code: int; raw: Cstruct.t; icmp: icmp } -> t
| Ipv4: ipv4 -> t
| Udp: { src: int; dst: int; len: int; raw: Cstruct.t; payload: t } -> t
| Tcp: { src: int; dst: int; syn: bool; raw: Cstruct.t; payload: t } -> t
| Tcp: { src: int; dst: int; syn: bool; rst: bool; raw: Cstruct.t; payload: t } -> t
| Payload: Cstruct.t -> t
| Unknown: t

Expand Down Expand Up @@ -97,7 +97,8 @@ let rec ipv4 inner =
let payload = Cstructs.shift inner ((offres lsr 4) * 4)
|> Cstructs.to_cstruct in
let syn = (flags land (1 lsl 1)) > 0 in
Ok (Tcp { src; dst; syn; raw = Cstructs.to_cstruct inner;
let rst = (flags land (1 lsl 2)) > 0 in
Ok (Tcp { src; dst; syn; rst; raw = Cstructs.to_cstruct inner;
payload = Payload payload })
| 17 ->
let raw = Cstructs.to_cstruct inner in
Expand Down
2 changes: 1 addition & 1 deletion src/hostnet/frame.mli
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ and t =
| Icmp: { ty: int; code: int; raw: Cstruct.t; icmp: icmp } -> t
| Ipv4: ipv4 -> t
| Udp: { src: int; dst: int; len: int; raw: Cstruct.t; payload: t } -> t
| Tcp: { src: int; dst: int; syn: bool; raw: Cstruct.t; payload: t } -> t
| Tcp: { src: int; dst: int; syn: bool; rst: bool; raw: Cstruct.t; payload: t } -> t
| Payload: Cstruct.t -> t
| Unknown: t

Expand Down
2 changes: 1 addition & 1 deletion src/hostnet/hostnet_dns.ml
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ struct
in
Lwt.pick [
loop ();
close
Lwt.map (fun _ -> ()) close
]
in
Some f
Expand Down
2 changes: 1 addition & 1 deletion src/hostnet/hostnet_dns.mli
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ sig
t:t -> udp:Udp.t -> src:Ipaddr.V4.t -> dst:Ipaddr.V4.t -> src_port:int ->
Cstruct.t -> (unit, Udp.error) result Lwt.t

val handle_tcp: t:t -> close:(unit Lwt.t) -> (int -> (Tcp.flow -> unit Lwt.t) option) Lwt.t
val handle_tcp: t:t -> close:([ `Port_disconnect | `Reset ] Lwt.t) -> (int -> (Tcp.flow -> unit Lwt.t) option) Lwt.t

val destroy: t -> unit Lwt.t
end
87 changes: 64 additions & 23 deletions src/hostnet/slirp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ struct
mutable pending: Tcp.Id.Set.t;
mutable last_active_time: float;
(* Tasks that will be signalled if the endpoint is destroyed *)
mutable on_destroy: unit Lwt.u Tcp.Id.Map.t;
mutable on_destroy: [ `Port_disconnect | `Reset ] Lwt.u Tcp.Id.Map.t;
}
(** A generic TCP/IP endpoint *)

Expand Down Expand Up @@ -271,7 +271,13 @@ struct
Lwt.return tcp_stack

let destroy t =
Tcp.Id.Map.iter (fun _ u -> Lwt.wakeup_later u ()) t.on_destroy;
Tcp.Id.Map.iter (fun _ u ->
try
Lwt.wakeup_later u `Port_disconnect
with Invalid_argument _ ->
(* If we've already woken up for a reset, don't worry *)
()
) t.on_destroy;
t.on_destroy <- Tcp.Id.Map.empty

let intercept_tcp_syn t ~id ~syn on_syn_callback (buf: Cstruct.t) =
Expand Down Expand Up @@ -312,8 +318,21 @@ struct
module Proxy =
Mirage_flow_lwt.Proxy(Clock)(Stack_tcp)(Host.Sockets.Stream.Tcp)

let input_tcp t ~id ~syn (ip, port) (buf: Cstruct.t) =
intercept_tcp_syn t ~id ~syn (fun close ->
let input_tcp t ~id ~syn ~rst (ip, port) (buf: Cstruct.t) =
if rst then begin
(* If we receive a reset, then signal the connection is closed *)
if not(Tcp.Id.Map.mem id t.on_destroy)
then Log.warn (fun f -> f "Received a RST for %s but there is no active connection" (string_of_id id))
else begin
let close_request = Tcp.Id.Map.find id t.on_destroy in
try
Lwt.wakeup_later close_request `Reset
with Invalid_argument _ ->
(* If we've already woken up with a port disconnection, don't worry *)
()
end
end;
intercept_tcp_syn t ~id ~syn (fun on_destroy ->
Host.Sockets.Stream.Tcp.connect (ip, port)
>>= function
| Error (`Msg m) ->
Expand All @@ -326,6 +345,23 @@ struct
let listeners port =
Log.debug (fun f ->
f "%a:%d handshake complete" Ipaddr.pp_hum ip port);
let close () =
Log.debug (fun f -> f "closing flow %s" (string_of_id tcp.Tcp.Flow.id));
t.on_destroy <- Tcp.Id.Map.remove id t.on_destroy;
Tcp.Flow.remove tcp.Tcp.Flow.id;
tcp.Tcp.Flow.socket <- None;
Host.Sockets.Stream.Tcp.close socket in
(* If the connection is timed out or reset before it's established then we
must close. *)
let early_cleanup_t =
on_destroy
>>= function
| `Port_disconnect ->
Log.info (fun f -> f "%s closing flow due to switch port disconnection" (Tcp.Flow.to_string tcp));
close ()
| `Reset ->
Log.info (fun f -> f "%s closing flow due to TCP RST" (Tcp.Flow.to_string tcp));
close () in
let f flow =
match tcp.Tcp.Flow.socket with
| None ->
Expand All @@ -334,21 +370,31 @@ struct
(Tcp.Flow.to_string tcp));
Lwt.return_unit
| Some socket ->
(* Take over responsibility for cleaning up the connection *)
Lwt.cancel early_cleanup_t;
Lwt.finalize (fun () ->
Lwt.pick [
Lwt.map
(function Error e -> Error (`Proxy e) | Ok x -> Ok x)
(Proxy.proxy t.clock flow socket);
Lwt.map
(fun () -> Error `Close)
close
]
let proxy_t =
Proxy.proxy t.clock flow socket
>>= function
| Ok x -> Lwt.return (Ok x)
| Error e -> Lwt.return (Error (`Proxy e)) in
let closed_t =
on_destroy
>>= function
| `Port_disconnect -> Lwt.return (Error `Port_disconnect)
| `Reset -> Lwt.return (Error `Reset) in
Lwt.pick [ proxy_t; closed_t ]
>>= function
| Error (`Close) ->
| Error `Port_disconnect ->
Log.info (fun f ->
f "%s proxy closed due to switch port disconnection"
(Tcp.Flow.to_string tcp));
Lwt.return_unit
| Error `Reset ->
Log.info (fun f ->
f "%s proxy closed due to TCP RST"
(Tcp.Flow.to_string tcp));
Lwt.return_unit
| Error (`Proxy e) ->
Log.debug (fun f ->
f "%s proxy failed with %a"
Expand All @@ -357,12 +403,7 @@ struct
| Ok (_l_stats, _r_stats) ->
Lwt.return_unit
) (fun () ->
Log.debug (fun f ->
f "closing flow %s" (string_of_id tcp.Tcp.Flow.id));
tcp.Tcp.Flow.socket <- None;
t.on_destroy <- Tcp.Id.Map.remove id t.on_destroy;
Tcp.Flow.remove tcp.Tcp.Flow.id;
Host.Sockets.Stream.Tcp.close socket
close ()
)
in
Some f
Expand Down Expand Up @@ -502,12 +543,12 @@ struct

(* TCP to local ports *)
| Ipv4 { src; dst;
payload = Tcp { src = src_port; dst = dst_port; syn; raw;
payload = Tcp { src = src_port; dst = dst_port; syn; rst; raw;
payload = Payload _; _ }; _ } ->
let id =
Stack_tcp_wire.v ~src_port:dst_port ~dst:src ~src:dst ~dst_port:src_port
in
Endpoint.input_tcp t.endpoint ~id ~syn
Endpoint.input_tcp t.endpoint ~id ~syn ~rst
(Ipaddr.V4 Ipaddr.V4.localhost, dst_port) raw
>|= ok
| _ ->
Expand Down Expand Up @@ -673,7 +714,7 @@ struct
(* Transparent HTTP intercept? *)
| Ipv4 { src = dest_ip ; dst = local_ip;
payload = Tcp { src = dest_port;
dst = local_port; syn; raw; _ }; _ } ->
dst = local_port; syn; rst; raw; _ }; _ } ->
let id =
Stack_tcp_wire.v
~src_port:local_port ~dst:dest_ip ~src:local_ip ~dst_port:dest_port
Expand All @@ -684,7 +725,7 @@ struct
in
begin match callback with
| None ->
Endpoint.input_tcp t.endpoint ~id ~syn (Ipaddr.V4 local_ip, local_port)
Endpoint.input_tcp t.endpoint ~id ~syn ~rst (Ipaddr.V4 local_ip, local_port)
raw (* common case *)
>|= ok
| Some cb ->
Expand Down

0 comments on commit 5d7e3ab

Please sign in to comment.