From 43717a25ae918870f049585078e1f37aca8b908a Mon Sep 17 00:00:00 2001 From: Marshall Roch Date: Thu, 6 May 2021 07:41:22 -0700 Subject: [PATCH] make Watchman.blocking_read return a result Summary: this stack converts all of the functions in watchman.ml to return results instead of throwing exceptions, so that we can ensure we handle all the error cases. during the transition, we will call `raise_error` to maintain exception behavior, but eventually thread the `result` further and further up and eliminate `try/with`'s. Reviewed By: nmote Differential Revision: D28173779 fbshipit-source-id: a2f27bdb3d81583ec0a84d7789f0996a8af13325 --- src/hack_forked/watchman/watchman.ml | 58 ++++++++++++++++------------ 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/src/hack_forked/watchman/watchman.ml b/src/hack_forked/watchman/watchman.ml index 68501039beb..819b21148d5 100644 --- a/src/hack_forked/watchman/watchman.ml +++ b/src/hack_forked/watchman/watchman.ml @@ -409,6 +409,12 @@ let read_line reader = let msg = "Connection closed (connection reset)" in Lwt.return (Error (Socket_unavailable { msg })) +let read_line_with_timeout ~timeout reader = + try%lwt Lwt_unix.with_timeout timeout @@ fun () -> read_line reader + with Lwt_unix.Timeout -> + let msg = spf "Timed out reading payload after %f seconds" timeout in + Lwt.return (Error (Socket_unavailable { msg })) + (* Sends a request to watchman and returns the response. If we don't have a connection, * a new connection will be created before the request and destroyed after the response *) let rec request ~debug_logging ?conn json = @@ -432,29 +438,29 @@ let request_exn ~debug_logging ?conn json = | Ok result -> Lwt.return result | Error err -> raise_error err +let wait_read reader = + match%lwt Lwt_unix.wait_read (Buffered_line_reader_lwt.get_fd reader) with + | () -> Lwt.return (Ok ()) + | exception Unix.Unix_error (Unix.EBADF, _, _) -> + (* this is a curious error. it means that the file descriptor was already + closed via `Lwt_unix.close` before we called `wait_read`, and the only + place we do that is in `close_connection`. So that suggests that we're + calling `Watchman.close` and then still calling `blocking_read` on the + same instance again, but it's not clear where; we are cancelling those + promises. *) + let msg = "Connection closed (bad file descriptor)" in + Lwt.return (Error (Socket_unavailable { msg })) + let blocking_read ~debug_logging ~conn:(reader, _) = - let%lwt () = - try%lwt Lwt_unix.wait_read (Buffered_line_reader_lwt.get_fd reader) - with Unix.Unix_error (Unix.EBADF, _, _) -> - (* this is a curious error. it means that the file descriptor was already - closed via `Lwt_unix.close` before we called `wait_read`, and the only - place we do that is in `close_connection`. So that suggests that we're - calling `Watchman.close` and then still calling `blocking_read` on the - same instance again, but it's not clear where; we are cancelling those - promises. *) - raise (Watchman_error "Connection closed") - in - let read_timeout = 40.0 in - match%lwt Lwt_unix.with_timeout read_timeout @@ fun () -> read_line reader with - | exception Lwt_unix.Timeout -> - raise (Watchman_error (spf "Timed out reading payload after %f seconds" read_timeout)) - | Error err -> raise_error err - | Ok output -> - (match parse_response ~debug_logging output with - | Ok response -> Lwt.return response - | Error msg -> - EventLogger.watchman_error ~response:(String_utils.truncate 100000 output) msg; - raise (Watchman_error msg)) + match%lwt wait_read reader with + | Error _ as err -> Lwt.return err + | Ok () -> + (match%lwt read_line_with_timeout 40.0 reader with + | Error _ as err -> Lwt.return err + | Ok response -> + (match parse_response ~debug_logging response with + | Ok _ as response -> Lwt.return response + | Error msg -> Lwt.return (Error (Response_error { request = None; response; msg })))) (****************************************************************************) (* Initialization, reinitialization *) @@ -837,9 +843,11 @@ let get_changes instance = ~on_dead:(fun _ -> Watchman_unavailable) ~on_alive:(fun env -> let debug_logging = env.settings.debug_logging in - let%lwt response = blocking_read ~debug_logging ~conn:env.conn in - let (env, result) = transform_asynchronous_get_changes_response env response in - Lwt.return (env, Watchman_pushed result)) + match%lwt blocking_read ~debug_logging ~conn:env.conn with + | Ok response -> + let (env, result) = transform_asynchronous_get_changes_response env response in + Lwt.return (env, Watchman_pushed result) + | Error err -> raise_error err) let get_mergebase_and_changes instance = call_on_instance