Skip to content

Commit

Permalink
Close partially-established TCP connections
Browse files Browse the repository at this point in the history
If a client sends SYN, we connect the external socket and reply with
SYN ACK. If the client responds with RST ACK then previously we would
leak the connection.

This patch extends the existing mechanism which closes connections when
switch ports are timed-out, adding a connection close when such an
"early reset" is encountered. Once the connection has been established
we assume we can use the existing closing mechanism: a client sending
a RST should cause the TCP/IP stack to close our flow.

Related to [docker/for-mac#1132]

Signed-off-by: David Scott <dave.scott@docker.com>
  • Loading branch information
djs55 committed Jun 6, 2018
1 parent 3a8cfb4 commit 7d19e82
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 28 deletions.
5 changes: 3 additions & 2 deletions src/hostnet/frame.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ and t =
| Icmp: { ty: int; code: int; raw: Cstruct.t; icmp: icmp } -> t
| Ipv4: ipv4 -> t
| Udp: { src: int; dst: int; len: int; raw: Cstruct.t; payload: t } -> t
| Tcp: { src: int; dst: int; syn: bool; raw: Cstruct.t; payload: t } -> t
| Tcp: { src: int; dst: int; syn: bool; rst: bool; raw: Cstruct.t; payload: t } -> t
| Payload: Cstruct.t -> t
| Unknown: t

Expand Down Expand Up @@ -97,7 +97,8 @@ let rec ipv4 inner =
let payload = Cstructs.shift inner ((offres lsr 4) * 4)
|> Cstructs.to_cstruct in
let syn = (flags land (1 lsl 1)) > 0 in
Ok (Tcp { src; dst; syn; raw = Cstructs.to_cstruct inner;
let rst = (flags land (1 lsl 2)) > 0 in
Ok (Tcp { src; dst; syn; rst; raw = Cstructs.to_cstruct inner;
payload = Payload payload })
| 17 ->
let raw = Cstructs.to_cstruct inner in
Expand Down
2 changes: 1 addition & 1 deletion src/hostnet/frame.mli
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ and t =
| Icmp: { ty: int; code: int; raw: Cstruct.t; icmp: icmp } -> t
| Ipv4: ipv4 -> t
| Udp: { src: int; dst: int; len: int; raw: Cstruct.t; payload: t } -> t
| Tcp: { src: int; dst: int; syn: bool; raw: Cstruct.t; payload: t } -> t
| Tcp: { src: int; dst: int; syn: bool; rst: bool; raw: Cstruct.t; payload: t } -> t
| Payload: Cstruct.t -> t
| Unknown: t

Expand Down
2 changes: 1 addition & 1 deletion src/hostnet/hostnet_dns.ml
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ struct
in
Lwt.pick [
loop ();
close
Lwt.map (fun _ -> ()) close
]
in
Some f
Expand Down
2 changes: 1 addition & 1 deletion src/hostnet/hostnet_dns.mli
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ sig
t:t -> udp:Udp.t -> src:Ipaddr.V4.t -> dst:Ipaddr.V4.t -> src_port:int ->
Cstruct.t -> (unit, Udp.error) result Lwt.t

val handle_tcp: t:t -> close:(unit Lwt.t) -> (int -> (Tcp.flow -> unit Lwt.t) option) Lwt.t
val handle_tcp: t:t -> close:([ `Port_disconnect | `Reset | `Fin ] Lwt.t) -> (int -> (Tcp.flow -> unit Lwt.t) option) Lwt.t

val destroy: t -> unit Lwt.t
end
86 changes: 63 additions & 23 deletions src/hostnet/slirp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ struct
mutable pending: Tcp.Id.Set.t;
mutable last_active_time: float;
(* Tasks that will be signalled if the endpoint is destroyed *)
mutable on_destroy: unit Lwt.u Tcp.Id.Map.t;
mutable on_destroy: [ `Port_disconnect | `Reset | `Fin ] Lwt.u Tcp.Id.Map.t;
}
(** A generic TCP/IP endpoint *)

Expand Down Expand Up @@ -270,8 +270,16 @@ struct
in
Lwt.return tcp_stack

let close_flow t ~id reason =
let close_request = Tcp.Id.Map.find id t.on_destroy in
try
Lwt.wakeup_later close_request reason
with Invalid_argument _ ->
(* If we've already woken up with a port disconnection, don't worry *)
()

let destroy t =
Tcp.Id.Map.iter (fun _ u -> Lwt.wakeup_later u ()) t.on_destroy;
Tcp.Id.Map.iter (fun id _ -> close_flow t ~id `Port_disconnect) t.on_destroy;
t.on_destroy <- Tcp.Id.Map.empty

let intercept_tcp_syn t ~id ~syn on_syn_callback (buf: Cstruct.t) =
Expand Down Expand Up @@ -312,8 +320,14 @@ struct
module Proxy =
Mirage_flow_lwt.Proxy(Clock)(Stack_tcp)(Host.Sockets.Stream.Tcp)

let input_tcp t ~id ~syn (ip, port) (buf: Cstruct.t) =
intercept_tcp_syn t ~id ~syn (fun close ->
let input_tcp t ~id ~syn ~rst (ip, port) (buf: Cstruct.t) =
if rst then begin
(* If we receive a reset, then signal the connection is closed *)
if not(Tcp.Id.Map.mem id t.on_destroy)
then Log.warn (fun f -> f "Received a RST for %s but there is no active connection" (string_of_id id))
else close_flow t ~id `Reset
end;
intercept_tcp_syn t ~id ~syn (fun on_destroy ->
Host.Sockets.Stream.Tcp.connect (ip, port)
>>= function
| Error (`Msg m) ->
Expand All @@ -326,6 +340,25 @@ struct
let listeners port =
Log.debug (fun f ->
f "%a:%d handshake complete" Ipaddr.pp_hum ip port);
(* Note that we must cleanup even when the connection is reset before it
is fully established. *)
Lwt.async
(fun () ->
on_destroy
>>= fun reason ->
begin match reason with
| `Port_disconnect ->
Log.warn (fun f -> f "%s closing flow due to idle port disconnection" (Tcp.Flow.to_string tcp))
| `Reset ->
Log.debug (fun f -> f "%s: closing flow due to TCP RST" (Tcp.Flow.to_string tcp))
| `Fin ->
Log.debug (fun f -> f "%s: closing flow due to TCP FIN" (Tcp.Flow.to_string tcp))
end;
t.on_destroy <- Tcp.Id.Map.remove id t.on_destroy;
Tcp.Flow.remove tcp.Tcp.Flow.id;
tcp.Tcp.Flow.socket <- None;
Host.Sockets.Stream.Tcp.close socket
);
let f flow =
match tcp.Tcp.Flow.socket with
| None ->
Expand All @@ -335,20 +368,31 @@ struct
Lwt.return_unit
| Some socket ->
Lwt.finalize (fun () ->
Lwt.pick [
Lwt.map
(function Error e -> Error (`Proxy e) | Ok x -> Ok x)
(Proxy.proxy t.clock flow socket);
Lwt.map
(fun () -> Error `Close)
close
]
let proxy_t =
Proxy.proxy t.clock flow socket
>>= function
| Ok x -> Lwt.return (Ok x)
| Error e -> Lwt.return (Error (`Proxy e)) in
let closed_t =
on_destroy
>>= function
| `Port_disconnect -> Lwt.return (Error `Port_disconnect)
| `Reset -> Lwt.return (Error `Reset)
| `Fin -> Lwt.return (Error `Fin) in
Lwt.pick [ proxy_t; closed_t ]
>>= function
| Error (`Close) ->
| Error `Port_disconnect ->
Log.info (fun f ->
f "%s proxy closed due to switch port disconnection"
(Tcp.Flow.to_string tcp));
Lwt.return_unit
| Error `Reset ->
Log.info (fun f ->
f "%s proxy closed due to TCP RST"
(Tcp.Flow.to_string tcp));
Lwt.return_unit
| Error `Fin ->
Lwt.return_unit
| Error (`Proxy e) ->
Log.debug (fun f ->
f "%s proxy failed with %a"
Expand All @@ -357,12 +401,8 @@ struct
| Ok (_l_stats, _r_stats) ->
Lwt.return_unit
) (fun () ->
Log.debug (fun f ->
f "closing flow %s" (string_of_id tcp.Tcp.Flow.id));
tcp.Tcp.Flow.socket <- None;
t.on_destroy <- Tcp.Id.Map.remove id t.on_destroy;
Tcp.Flow.remove tcp.Tcp.Flow.id;
Host.Sockets.Stream.Tcp.close socket
close_flow t ~id `Fin;
Lwt.return_unit
)
in
Some f
Expand Down Expand Up @@ -502,12 +542,12 @@ struct

(* TCP to local ports *)
| Ipv4 { src; dst;
payload = Tcp { src = src_port; dst = dst_port; syn; raw;
payload = Tcp { src = src_port; dst = dst_port; syn; rst; raw;
payload = Payload _; _ }; _ } ->
let id =
Stack_tcp_wire.v ~src_port:dst_port ~dst:src ~src:dst ~dst_port:src_port
in
Endpoint.input_tcp t.endpoint ~id ~syn
Endpoint.input_tcp t.endpoint ~id ~syn ~rst
(Ipaddr.V4 Ipaddr.V4.localhost, dst_port) raw
>|= ok
| _ ->
Expand Down Expand Up @@ -673,7 +713,7 @@ struct
(* Transparent HTTP intercept? *)
| Ipv4 { src = dest_ip ; dst = local_ip;
payload = Tcp { src = dest_port;
dst = local_port; syn; raw; _ }; _ } ->
dst = local_port; syn; rst; raw; _ }; _ } ->
let id =
Stack_tcp_wire.v
~src_port:local_port ~dst:dest_ip ~src:local_ip ~dst_port:dest_port
Expand All @@ -684,7 +724,7 @@ struct
in
begin match callback with
| None ->
Endpoint.input_tcp t.endpoint ~id ~syn (Ipaddr.V4 local_ip, local_port)
Endpoint.input_tcp t.endpoint ~id ~syn ~rst (Ipaddr.V4 local_ip, local_port)
raw (* common case *)
>|= ok
| Some cb ->
Expand Down

0 comments on commit 7d19e82

Please sign in to comment.