Skip to content

Commit

Permalink
make Watchman.blocking_read return a result
Browse files Browse the repository at this point in the history
Summary:
this stack converts all of the functions in watchman.ml to return results instead of throwing exceptions, so that we can ensure we handle all the error cases.

during the transition, we will call `raise_error` to maintain exception behavior, but eventually thread the `result` further and further up and eliminate `try/with`'s.

Reviewed By: nmote

Differential Revision: D28173779

fbshipit-source-id: a2f27bdb3d81583ec0a84d7789f0996a8af13325
  • Loading branch information
mroch authored and facebook-github-bot committed May 6, 2021
1 parent ceb6eb2 commit 43717a2
Showing 1 changed file with 33 additions and 25 deletions.
58 changes: 33 additions & 25 deletions src/hack_forked/watchman/watchman.ml
Expand Up @@ -409,6 +409,12 @@ let read_line reader =
let msg = "Connection closed (connection reset)" in
Lwt.return (Error (Socket_unavailable { msg }))

let read_line_with_timeout ~timeout reader =
try%lwt Lwt_unix.with_timeout timeout @@ fun () -> read_line reader
with Lwt_unix.Timeout ->
let msg = spf "Timed out reading payload after %f seconds" timeout in
Lwt.return (Error (Socket_unavailable { msg }))

(* Sends a request to watchman and returns the response. If we don't have a connection,
* a new connection will be created before the request and destroyed after the response *)
let rec request ~debug_logging ?conn json =
Expand All @@ -432,29 +438,29 @@ let request_exn ~debug_logging ?conn json =
| Ok result -> Lwt.return result
| Error err -> raise_error err

let wait_read reader =
match%lwt Lwt_unix.wait_read (Buffered_line_reader_lwt.get_fd reader) with
| () -> Lwt.return (Ok ())
| exception Unix.Unix_error (Unix.EBADF, _, _) ->
(* this is a curious error. it means that the file descriptor was already
closed via `Lwt_unix.close` before we called `wait_read`, and the only
place we do that is in `close_connection`. So that suggests that we're
calling `Watchman.close` and then still calling `blocking_read` on the
same instance again, but it's not clear where; we are cancelling those
promises. *)
let msg = "Connection closed (bad file descriptor)" in
Lwt.return (Error (Socket_unavailable { msg }))

let blocking_read ~debug_logging ~conn:(reader, _) =
let%lwt () =
try%lwt Lwt_unix.wait_read (Buffered_line_reader_lwt.get_fd reader)
with Unix.Unix_error (Unix.EBADF, _, _) ->
(* this is a curious error. it means that the file descriptor was already
closed via `Lwt_unix.close` before we called `wait_read`, and the only
place we do that is in `close_connection`. So that suggests that we're
calling `Watchman.close` and then still calling `blocking_read` on the
same instance again, but it's not clear where; we are cancelling those
promises. *)
raise (Watchman_error "Connection closed")
in
let read_timeout = 40.0 in
match%lwt Lwt_unix.with_timeout read_timeout @@ fun () -> read_line reader with
| exception Lwt_unix.Timeout ->
raise (Watchman_error (spf "Timed out reading payload after %f seconds" read_timeout))
| Error err -> raise_error err
| Ok output ->
(match parse_response ~debug_logging output with
| Ok response -> Lwt.return response
| Error msg ->
EventLogger.watchman_error ~response:(String_utils.truncate 100000 output) msg;
raise (Watchman_error msg))
match%lwt wait_read reader with
| Error _ as err -> Lwt.return err
| Ok () ->
(match%lwt read_line_with_timeout 40.0 reader with
| Error _ as err -> Lwt.return err
| Ok response ->
(match parse_response ~debug_logging response with
| Ok _ as response -> Lwt.return response
| Error msg -> Lwt.return (Error (Response_error { request = None; response; msg }))))

(****************************************************************************)
(* Initialization, reinitialization *)
Expand Down Expand Up @@ -837,9 +843,11 @@ let get_changes instance =
~on_dead:(fun _ -> Watchman_unavailable)
~on_alive:(fun env ->
let debug_logging = env.settings.debug_logging in
let%lwt response = blocking_read ~debug_logging ~conn:env.conn in
let (env, result) = transform_asynchronous_get_changes_response env response in
Lwt.return (env, Watchman_pushed result))
match%lwt blocking_read ~debug_logging ~conn:env.conn with
| Ok response ->
let (env, result) = transform_asynchronous_get_changes_response env response in
Lwt.return (env, Watchman_pushed result)
| Error err -> raise_error err)

let get_mergebase_and_changes instance =
call_on_instance
Expand Down

0 comments on commit 43717a2

Please sign in to comment.