/
watchman.ml
897 lines (827 loc) · 31.6 KB
/
watchman.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
(*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*)
open Base
open Utils
module Let_syntax = Lwt.Infix.Let_syntax
(*
* Module for us to interface with Watchman, a file watching service.
* https://facebook.github.io/watchman/
*
* TODO:
* * Connect directly to the Watchman server socket instead of spawning
* a client process each time
* * Use the BSER protocol for enhanced performance
*)
exception Timeout
exception Watchman_error of string
exception Read_payload_too_long
(** Throw this exception when we know there is something to read from
the watchman channel, but reading took too long. *)
exception Subscription_canceled_by_watchman
exception Watchman_restarted
type subscribe_mode =
| All_changes
| Defer_changes
| Drop_changes
(** See also Watchman docs on drop. This means the subscriber will not
get a list of files changed during a repo update. Practically, this
is not useful for the typechecker process which needs to actually
know which files were changed. This is useful for the monitor to
aggressively kill the server. *)
type timeout = float option
type init_settings = {
debug_logging: bool;
defer_states: string list;
expression_terms: Hh_json.json list; (** See watchman expression terms. *)
mergebase_with: string;
roots: Path.t list; (** symbolic commit to find changes against *)
subscribe_mode: subscribe_mode;
subscription_prefix: string;
sync_timeout: int option;
}
type clock = string
(** The message's clock. *)
type pushed_changes =
(*
* State name and metadata.
*
* For example:
* State name: "hg.update"
* Metadata:
* {
* "partial":false,
* "rev":"780dab9ff0a01691c9b18a5ee1194810e555c78b",
* "distance":2,
* "status":"ok"
* }
*
* Note: The distance is HG Revision distance, not SVN revision distance.
*)
| State_enter of string * Hh_json.json option
| State_leave of string * Hh_json.json option
| Changed_merge_base of string * SSet.t * clock
| Files_changed of SSet.t
type changes =
| Watchman_unavailable
| Watchman_pushed of pushed_changes
module Jget = Hh_json_helpers.Jget
module J = Hh_json_helpers.AdhocJsonHelpers
(* Looks for common errors in watchman responses *)
let assert_no_error obj =
let obj = Some obj in
(match Jget.string_opt obj "warning" with
| None -> ()
| Some warning ->
EventLogger.watchman_warning warning;
Hh_logger.log "Watchman warning: %s\n" warning);
(match Jget.string_opt obj "error" with
| None -> ()
| Some error ->
EventLogger.watchman_error error;
raise @@ Watchman_error error);
(match Jget.bool_opt obj "canceled" with
| None
| Some false ->
()
| Some true ->
EventLogger.watchman_error "Subscription canceled by watchman";
raise @@ Subscription_canceled_by_watchman);
()
(* Verifies that a watchman response is valid JSON and free from common errors *)
let sanitize_watchman_response ~debug_logging output =
if debug_logging then Hh_logger.info "Watchman response: %s" output;
let response =
try Hh_json.json_of_string output
with e ->
let exn = Exception.wrap e in
Hh_logger.error
"Failed to parse string as JSON: %s\nEXCEPTION:%s\nSTACK:%s\n"
output
(Exception.get_ctor_string exn)
(Exception.get_backtrace_string exn);
Exception.reraise exn
in
assert_no_error response;
response
type conn = Buffered_line_reader_lwt.t * Lwt_io.output_channel
let catch ~f ~catch =
Lwt.catch f (fun exn ->
let e = Exception.wrap exn in
match exn with
| Lwt.Canceled -> Exception.reraise e
| _ -> catch e)
(** This number is totally arbitrary. Just need some cap. *)
let max_reinit_attempts = 8
type dead_env = {
(* Will reuse original settings to reinitializing watchman subscription. *)
prior_settings: init_settings;
reinit_attempts: int;
dead_since: float;
prior_clockspec: string;
}
type env = {
(* See https://facebook.github.io/watchman/docs/clockspec.html
*
* This is also used to reliably detect a crashed watchman. Watchman has a
* facility to detect watchman process crashes between two "since" queries. *)
mutable clockspec: string;
conn: conn;
settings: init_settings;
subscription: string;
watch_root: string;
watched_path_expression_terms: Hh_json.json option;
}
let dead_env_from_alive env =
{
prior_settings = env.settings;
dead_since = Unix.time ();
reinit_attempts = 0;
(* When we start a new watchman connection, we continue to use the prior
* clockspec. If the same watchman server is still alive, then all is
* good. If not, the clockspec allows us to detect whether a new watchman
* server had to be started. See also "is_fresh_instance" on watchman's
* "since" response. *)
prior_clockspec = env.clockspec;
}
type watchman_instance =
(* Indicates a dead watchman instance (most likely due to chef upgrading,
* reconfiguration, or a user terminating watchman, or a timeout reading
* from the connection) detected by, for example, a pipe error or a timeout.
*
* TODO: Currently fallback to a Watchman_dead is only handled in calls
* wrapped by the with_crash_record. Pipe errors elsewhere (for example
* during request) will still result in Hack exiting. Need to cover those
* cases too. *)
| Watchman_dead of dead_env
| Watchman_alive of env
(****************************************************************************)
(* JSON methods. *)
(****************************************************************************)
(* Projects down from the boolean error monad into booleans.
* Error states go to false, values are projected directly. *)
let project_bool m =
match m with
| Ok (v, _) -> v
| Error _ -> false
let clock root = J.strlist ["clock"; root]
type watch_command =
| Subscribe
| Query
(** Conjunction of extra_expressions and expression_terms. *)
let request_json ?(extra_kv = []) ?(extra_expressions = []) watchman_command env =
Hh_json.(
let command =
match watchman_command with
| Subscribe -> "subscribe"
| Query -> "query"
in
let header =
[JSON_String command; JSON_String env.watch_root]
@
match watchman_command with
| Subscribe -> [JSON_String env.subscription]
| _ -> []
in
let expressions = extra_expressions @ env.settings.expression_terms in
let expressions =
match env.watched_path_expression_terms with
| Some terms -> terms :: expressions
| None -> expressions
in
assert (not (List.is_empty expressions));
let extra_kv =
match env.settings.sync_timeout with
| Some sync_timeout -> ("sync_timeout", JSON_Number (Int.to_string sync_timeout)) :: extra_kv
| None -> extra_kv
in
let directives =
[
JSON_Object
( extra_kv
@ [
("fields", J.strlist ["name"]);
(* Watchman doesn't allow an empty allof expression. But expressions is never empty *)
("expression", J.pred "allof" expressions);
] );
]
in
let request = JSON_Array (header @ directives) in
request)
let get_changes_since_mergebase_query env =
let mergebase_with = env.settings.mergebase_with in
let extra_kv =
[
( "since",
Hh_json.JSON_Object
[("scm", Hh_json.JSON_Object [("mergebase-with", Hh_json.JSON_String mergebase_with)])] );
]
in
request_json ~extra_kv Query env
let subscribe ~mode ~states env =
let states = "hg.update" :: states in
let (since, mode) =
match mode with
| All_changes -> (Hh_json.JSON_String env.clockspec, [])
| Defer_changes -> (Hh_json.JSON_String env.clockspec, [("defer", J.strlist states)])
| Drop_changes -> (Hh_json.JSON_String env.clockspec, [("drop", J.strlist states)])
in
request_json
~extra_kv:(([("since", since)] @ mode) @ [("empty_on_fresh_instance", Hh_json.JSON_Bool true)])
Subscribe
env
let watch_project root = J.strlist ["watch-project"; root]
(** We filter all responses from get_changes through this. This is to detect
* Watchman server crashes.
*
* See also Watchman docs on "since" query parameter. *)
let is_fresh_instance obj =
Hh_json.Access.(return obj >>= get_bool "is_fresh_instance" |> project_bool)
(****************************************************************************)
(* I/O stuff *)
(****************************************************************************)
let with_timeout timeout f =
match timeout with
| None -> f ()
| Some timeout ->
(try%lwt Lwt_unix.with_timeout timeout f with Lwt_unix.Timeout -> raise Timeout)
(* Send a request to the watchman process *)
let send_request ~debug_logging oc json =
let json_str = Hh_json.(json_to_string json) in
if debug_logging then Hh_logger.info "Watchman request: %s" json_str;
(* Print the json with a newline and then flush *)
let%lwt () = Lwt_io.fprintl oc json_str in
Lwt_io.flush oc
let get_sockname () =
let cmd = "watchman" in
let args = ["--no-pretty"; "get-sockname"] in
let%lwt { LwtSysUtils.status; stdout; stderr } = LwtSysUtils.exec cmd args in
match status with
| Unix.WEXITED 0 ->
let json = Hh_json.json_of_string stdout in
Lwt.return @@ J.get_string_val "sockname" json
| Unix.WEXITED 127 ->
let msg =
spf
"watchman not found on PATH: %s"
(Option.value (Sys_utils.getenv_path ()) ~default:"(not set)")
in
let () = EventLogger.watchman_error msg in
let () = Hh_logger.error "%s" msg in
raise (Watchman_error "watchman not found on PATH")
| Unix.WEXITED code ->
let () = EventLogger.watchman_error (spf "watchman exited code %d, stderr = %S" code stderr) in
raise (Watchman_error (spf "watchman exited code %d" code))
| Unix.WSIGNALED signal ->
let msg = spf "watchman signaled with %s signal" (PrintSignal.string_of_signal signal) in
let () = EventLogger.watchman_error msg in
raise (Watchman_error msg)
| Unix.WSTOPPED signal ->
let msg = spf "watchman stopped with %s signal" (PrintSignal.string_of_signal signal) in
let () = EventLogger.watchman_error msg in
raise (Watchman_error msg)
(* Opens a connection to the watchman process through the socket *)
let open_connection () =
let%lwt sockname = get_sockname () in
let (ic, oc) =
if
String.equal Sys.os_type "Unix"
(* Yes, I know that Unix.open_connection uses the same fd for input and output. But I don't
* want to hardcode that assumption here. So let's pretend like ic and oc might be back by
* different fds *)
then
Unix.open_connection (Unix.ADDR_UNIX sockname)
(* On Windows, however, named pipes behave like regular files from the client's perspective.
* We just open the file and create in/out channels for it. The file permissions attribute
* is not needed because the file should exist already but we have to pass something. *)
else
let fd = Unix.openfile sockname [Unix.O_RDWR] 0o640 in
(Unix.in_channel_of_descr fd, Unix.out_channel_of_descr fd)
in
let reader =
Unix.descr_of_in_channel ic
|> Lwt_unix.of_unix_file_descr ~blocking:true
|> Buffered_line_reader_lwt.create
in
let oc =
Unix.descr_of_out_channel oc
|> Lwt_unix.of_unix_file_descr ~blocking:true
|> Lwt_io.of_fd ~mode:Lwt_io.output
in
Lwt.return (reader, oc)
let close_connection (reader, oc) =
let%lwt () = Lwt_unix.close @@ Buffered_line_reader_lwt.get_fd reader in
(* As mention above, if we open the connection with Unix.open_connection, we use a single fd for
* both input and output. That means we might be trying to close it twice here. If so, this
* second close with throw. So let's catch that exception and ignore it. *)
try%lwt Lwt_io.close oc with Unix.Unix_error (Unix.EBADF, _, _) -> Lwt.return_unit
let with_watchman_conn ~timeout f =
let%lwt conn = with_timeout timeout open_connection in
let%lwt result =
try%lwt f conn
with e ->
let e = Exception.wrap e in
let%lwt () = close_connection conn in
Exception.reraise e
in
let%lwt () = close_connection conn in
Lwt.return result
(* Sends a request to watchman and returns the response. If we don't have a connection,
* a new connection will be created before the request and destroyed after the response *)
let rec request ~debug_logging ?conn ~timeout json =
match conn with
| None -> with_watchman_conn ~timeout (fun conn -> request ~debug_logging ~conn ~timeout json)
| Some (reader, oc) ->
let%lwt () = send_request ~debug_logging oc json in
let%lwt line =
with_timeout timeout @@ fun () -> Buffered_line_reader_lwt.get_next_line reader
in
Lwt.return @@ sanitize_watchman_response ~debug_logging line
let has_input ~timeout reader =
let fd = Buffered_line_reader_lwt.get_fd reader in
match timeout with
| None -> Lwt.return @@ Lwt_unix.readable fd
| Some timeout ->
(try%lwt
Lwt_unix.with_timeout timeout @@ fun () ->
let%lwt () = Lwt_unix.wait_read fd in
Lwt.return true
with Lwt_unix.Timeout -> Lwt.return false)
let blocking_read ~debug_logging ~timeout ~conn:(reader, _) =
let%lwt ready = has_input ~timeout reader in
if not ready then
match timeout with
| None -> Lwt.return None
| _ -> raise Timeout
else
let%lwt output =
try%lwt Lwt_unix.with_timeout 40.0 @@ fun () -> Buffered_line_reader_lwt.get_next_line reader
with Lwt_unix.Timeout ->
let () = Hh_logger.log "blocking_read timed out" in
raise Read_payload_too_long
in
Lwt.return @@ Some (sanitize_watchman_response ~debug_logging output)
(****************************************************************************)
(* Initialization, reinitialization, and crash-tracking. *)
(****************************************************************************)
let with_crash_record_exn source f =
catch ~f ~catch:(fun exn ->
Hh_logger.exception_ ~prefix:("Watchman " ^ source ^ ": ") exn;
Exception.reraise exn)
let with_crash_record_opt source f =
catch
~f:(fun () ->
let%map v = with_crash_record_exn source f in
Some v)
~catch:(fun exn ->
match Exception.unwrap exn with
(* Avoid swallowing these *)
| Exit_status.Exit_with _
| Watchman_restarted ->
Exception.reraise exn
| _ -> Lwt.return None)
(* When we re-init our connection to Watchman, we use the old clockspec to get all the changes
* since our last response. However, if Watchman has restarted and the old clockspec pre-dates
* the new Watchman, then we may miss updates.
*
* Unfortunately, the response to "subscribe" doesn't have the "is_fresh_instance" field. So
* we'll instead send a small "query" request. It should always return 0 files, but it should
* tell us whether the Watchman service has restarted since clockspec.
*)
let has_watchman_restarted_since ~debug_logging ~conn ~timeout ~watch_root ~clockspec =
let hard_to_match_name = "irrelevant.potato" in
let query =
Hh_json.(
JSON_Array
[
JSON_String "query";
JSON_String watch_root;
JSON_Object
[
("since", JSON_String clockspec);
("empty_on_fresh_instance", JSON_Bool true);
("expression", JSON_Array [JSON_String "name"; JSON_String hard_to_match_name]);
];
])
in
let%bind response = request ~debug_logging ~conn ~timeout query in
let result =
match Hh_json_helpers.Jget.bool_opt (Some response) "is_fresh_instance" with
| Some has_restarted -> Ok has_restarted
| None ->
(* The response to this query **always** should include the `is_fresh_instance` boolean
* property. If it is missing then something has gone wrong with Watchman. Since we can't
* prove that Watchman has not restarted, we must treat this as an error. *)
Error
(spf
"Invalid Watchman response to `empty_on_fresh_instance` query:\n%s"
(Hh_json.json_to_string ~pretty:true response))
in
Lwt.return result
let prepend_relative_path_term ~relative_path ~terms =
match terms with
| None -> None
| Some _ when String.is_empty relative_path ->
(* If we're watching the watch root directory, then there's no point in specifying a list of
* files and directories to watch. We're already subscribed to any change in this watch root
* anyway *)
None
| Some terms ->
(* So lets say we're being told to watch foo/bar. Is foo/bar a directory? Is it a file? If it
* is a file now, might it become a directory later? I'm not aware of aterm which will watch for either a file or a directory, so let's add two terms *)
Some (J.strlist ["dirname"; relative_path] :: J.strlist ["name"; relative_path] :: terms)
let re_init
?prior_clockspec
{
debug_logging;
defer_states;
expression_terms;
mergebase_with;
roots;
subscribe_mode;
subscription_prefix;
sync_timeout;
} =
with_crash_record_opt "init" @@ fun () ->
let%bind conn = open_connection () in
let%bind (watched_path_expression_terms, watch_roots, failed_paths) =
Lwt_list.fold_left_s
(fun (terms, watch_roots, failed_paths) path ->
(* Watch this root. If the path doesn't exist, watch_project will throw. In that case catch
* the error and continue for now. *)
let%map response =
catch
~f:(fun () ->
let%map response =
request
~debug_logging
~conn
~timeout:None (* the whole init process should be wrapped in a timeout *)
(watch_project (Path.to_string path))
in
Some response)
~catch:(fun _ -> Lwt.return None)
in
match response with
| None -> (terms, watch_roots, SSet.add (Path.to_string path) failed_paths)
| Some response ->
let watch_root = J.get_string_val "watch" response in
let relative_path = J.get_string_val "relative_path" ~default:"" response in
let terms = prepend_relative_path_term ~relative_path ~terms in
let watch_roots = SSet.add watch_root watch_roots in
(terms, watch_roots, failed_paths))
(Some [], SSet.empty, SSet.empty)
roots
in
(* The failed_paths are likely includes which don't exist on the filesystem, so watch_project
* returned an error. Let's do a best effort attempt to infer the watch root and relative
* path for each bad include *)
let watched_path_expression_terms =
SSet.fold
(fun path terms ->
String_utils.(
match SSet.find_first_opt (fun root -> string_starts_with path root) watch_roots with
| None -> failwith (spf "Cannot deduce watch root for path %s" path)
| Some root ->
let relative_path = lstrip (lstrip path root) Caml.Filename.dir_sep in
prepend_relative_path_term ~relative_path ~terms))
failed_paths
watched_path_expression_terms
in
(* All of our watched paths should have the same watch root. Let's assert that *)
let watch_root =
match SSet.elements watch_roots with
| [] -> failwith "Cannot run watchman with fewer than 1 root"
| [watch_root] -> watch_root
| _ ->
failwith
(spf
"Can't watch paths across multiple Watchman watch_roots. Found %d watch_roots"
(SSet.cardinal watch_roots))
in
(* If we don't have a prior clockspec, grab the current clock *)
let%bind clockspec =
match prior_clockspec with
| Some clockspec ->
let%bind has_restarted =
has_watchman_restarted_since
~debug_logging
~conn
~timeout:None (* the whole init process should be wrapped in a timeout *)
~watch_root
~clockspec
in
(match has_restarted with
| Ok true ->
Hh_logger.error "Watchman server restarted so we may have missed some updates";
raise Watchman_restarted
| Ok false -> Lwt.return clockspec
| Error err ->
Hh_logger.error "%s" err;
raise Exit_status.(Exit_with Watchman_failed))
| None ->
let%map response =
request
~debug_logging
~conn
~timeout:None (* the whole init process should be wrapped in a timeout *)
(clock watch_root)
in
J.get_string_val "clock" response
in
let watched_path_expression_terms =
Option.map watched_path_expression_terms ~f:(J.pred "anyof")
in
let env =
{
settings =
{
debug_logging;
defer_states;
expression_terms;
mergebase_with;
roots;
subscribe_mode;
subscription_prefix;
sync_timeout;
};
conn;
watch_root;
watched_path_expression_terms;
clockspec;
subscription = Printf.sprintf "%s.%d" subscription_prefix (Unix.getpid ());
}
in
let%map response =
request
~debug_logging
~conn
~timeout:None (* the whole init process should be wrapped in a timeout *)
(subscribe ~mode:subscribe_mode ~states:defer_states env)
in
ignore response;
env
let init settings = re_init settings
let extract_file_names env json =
let open Hh_json.Access in
return json
>>= get_array "files"
|> counit_with (fun _ ->
(* When an hg.update happens, it shows up in the watchman subscription
as a notification with no files key present. *)
[])
|> List.map ~f:(fun json ->
let s = Hh_json.get_string_exn json in
let abs = Caml.Filename.concat env.watch_root s in
abs)
let within_backoff_time attempts time =
let attempts = min attempts 3 in
let offset = 4 * (2 ** attempts) in
Float.(Unix.time () >= time +. of_int offset)
let maybe_restart_instance instance =
match instance with
| Watchman_alive _ -> Lwt.return instance
| Watchman_dead dead_env ->
if dead_env.reinit_attempts >= max_reinit_attempts then
let () = Hh_logger.log "Ran out of watchman reinit attempts. Exiting." in
raise Exit_status.(Exit_with Watchman_failed)
else if within_backoff_time dead_env.reinit_attempts dead_env.dead_since then (
let () = Hh_logger.log "Attemping to reestablish watchman subscription" in
(* TODO: don't hardcode this timeout *)
let timeout = Some 120. in
match%lwt
with_timeout timeout @@ fun () ->
re_init ~prior_clockspec:dead_env.prior_clockspec dead_env.prior_settings
with
| exception Timeout ->
Hh_logger.log "Reestablishing watchman subscription timed out.";
EventLogger.watchman_connection_reestablishment_failed ();
Lwt.return (Watchman_dead { dead_env with reinit_attempts = dead_env.reinit_attempts + 1 })
| None ->
Hh_logger.log "Reestablishing watchman subscription failed.";
EventLogger.watchman_connection_reestablishment_failed ();
Lwt.return (Watchman_dead { dead_env with reinit_attempts = dead_env.reinit_attempts + 1 })
| Some env ->
Hh_logger.log "Watchman connection reestablished.";
EventLogger.watchman_connection_reestablished ();
Lwt.return (Watchman_alive env)
) else
Lwt.return instance
let close env = close_connection env.conn
let close_channel_on_instance env =
let%map () = close env in
dead_env_from_alive env
let with_instance instance ~try_to_restart ~on_alive ~on_dead =
let%bind instance =
if try_to_restart then
maybe_restart_instance instance
else
Lwt.return instance
in
match instance with
| Watchman_dead dead_env -> on_dead dead_env
| Watchman_alive env -> on_alive env
(** Calls f on the instance, maybe restarting it if its dead and maybe
* reverting it to a dead state if things go south. For example, if watchman
* shuts the connection on us, or shuts down, or crashes, we revert to a dead
* instance, upon which a restart will be attempted down the road.
* Alternatively, we also proactively revert to a dead instance if it appears
* to be unresponsive (Timeout), and if reading the payload from it is
* taking too long. *)
let call_on_instance :
watchman_instance ->
string ->
on_dead:(dead_env -> 'a) ->
on_alive:(env -> (env * 'a) Lwt.t) ->
(watchman_instance * 'a) Lwt.t =
let on_dead' f dead_env = Lwt.return (Watchman_dead dead_env, f dead_env) in
let on_alive' ~on_dead source f env =
catch
~f:(fun () ->
let%map (env, result) = with_crash_record_exn source (fun () -> f env) in
(Watchman_alive env, result))
~catch:(fun exn ->
let close_channel_on_instance' env =
let%bind env = close_channel_on_instance env in
on_dead' on_dead env
in
let log_died msg =
Hh_logger.log "%s" msg;
EventLogger.watchman_died_caught msg
in
match Exception.unwrap exn with
| Sys_error msg when String.equal msg "Broken pipe" ->
log_died "Watchman Pipe broken.";
close_channel_on_instance' env
| Sys_error msg when String.equal msg "Connection reset by peer" ->
log_died "Watchman connection reset by peer.";
close_channel_on_instance' env
| Sys_error msg when String.equal msg "Bad file descriptor" ->
(* This happens when watchman is tearing itself down after we
* retrieved a sock address and connected to the sock address. That's
* because Unix.open_connection (via Timeout.open_connection) doesn't
* error even when the sock adddress is no longer valid and actually -
* it returns a channel that will error at some later time when you
* actually try to do anything with it (write to it, or even get the
* file descriptor of it). I'm pretty sure we don't need to close the
* channel when that happens since we never had a useable channel
* to start with. *)
log_died "Watchman bad file descriptor.";
on_dead' on_dead (dead_env_from_alive env)
| End_of_file ->
log_died "Watchman connection End_of_file. Closing channel";
close_channel_on_instance' env
| Read_payload_too_long ->
log_died "Watchman reading payload too long. Closing channel";
close_channel_on_instance' env
| Timeout ->
log_died "Watchman reading Timeout. Closing channel";
close_channel_on_instance' env
| Watchman_error msg ->
log_died (Printf.sprintf "Watchman error: %s. Closing channel" msg);
close_channel_on_instance' env
| _ ->
let msg = Exception.to_string exn in
EventLogger.watchman_uncaught_failure msg;
raise Exit_status.(Exit_with Watchman_failed))
in
fun instance source ~on_dead ~on_alive ->
with_instance
instance
~try_to_restart:true
~on_dead:(on_dead' on_dead)
~on_alive:(on_alive' ~on_dead source on_alive)
let make_state_change_response state name data =
let metadata = J.try_get_val "metadata" data in
match state with
| `Enter -> State_enter (name, metadata)
| `Leave -> State_leave (name, metadata)
let extract_mergebase data =
Hh_json.Access.(
let accessor = return data in
let ret =
accessor >>= get_obj "clock" >>= get_string "clock" >>= fun (clock, _) ->
accessor >>= get_obj "clock" >>= get_obj "scm" >>= get_string "mergebase"
>>= fun (mergebase, _) -> return (clock, mergebase)
in
to_option ret)
let make_mergebase_changed_response env data =
match extract_mergebase data with
| None -> Error "Failed to extract mergebase"
| Some (clock, mergebase) ->
let files = set_of_list @@ extract_file_names env data in
env.clockspec <- clock;
let response = Changed_merge_base (mergebase, files, clock) in
Ok (env, response)
let transform_asynchronous_get_changes_response env data =
match data with
| None -> (env, Files_changed SSet.empty)
| Some data ->
begin
match make_mergebase_changed_response env data with
| Ok (env, response) -> (env, response)
| Error _ ->
env.clockspec <- Jget.string_exn (Some data) "clock";
if is_fresh_instance data then (
Hh_logger.log "Watchman server is fresh instance. Exiting.";
raise Exit_status.(Exit_with Watchman_fresh_instance)
) else (
match Jget.string_opt (Some data) "state-enter" with
| Some state -> (env, make_state_change_response `Enter state data)
| None ->
(match Jget.string_opt (Some data) "state-leave" with
| Some state -> (env, make_state_change_response `Leave state data)
| None -> (env, Files_changed (set_of_list @@ extract_file_names env data)))
)
end
let get_changes ?deadline instance =
call_on_instance
instance
"get_changes"
~on_dead:(fun _ -> Watchman_unavailable)
~on_alive:(fun env ->
let timeout =
Option.map deadline (fun deadline ->
let timeout = deadline -. Unix.time () in
Float.max timeout 0.0)
in
let debug_logging = env.settings.debug_logging in
let%map response = blocking_read ~debug_logging ~timeout ~conn:env.conn in
let (env, result) = transform_asynchronous_get_changes_response env response in
(env, Watchman_pushed result))
let get_changes_since_mergebase ~timeout env =
let%map response =
request
~timeout
~debug_logging:env.settings.debug_logging
(get_changes_since_mergebase_query env)
in
extract_file_names env response
let get_mergebase ~timeout instance =
call_on_instance
instance
"get_mergebase"
~on_dead:(fun _dead_env -> Error "Failed to connect to Watchman to get mergebase")
~on_alive:(fun env ->
let%map response =
request
~timeout
~debug_logging:env.settings.debug_logging
(get_changes_since_mergebase_query env)
in
match extract_mergebase response with
| Some (_clock, mergebase) -> (env, Ok mergebase)
| None -> (env, Error "Failed to extract mergebase from response"))
let get_mergebase_and_changes ~timeout instance =
call_on_instance
instance
"get_mergebase_and_changes"
~on_dead:(fun _dead_env -> Error "Failed to connect to Watchman to get mergebase")
~on_alive:(fun env ->
let%map response =
request
~timeout
~debug_logging:env.settings.debug_logging
(get_changes_since_mergebase_query env)
in
match extract_mergebase response with
| Some (_clock, mergebase) ->
let changes = set_of_list @@ extract_file_names env response in
(env, Ok (mergebase, changes))
| None -> (env, Error "Failed to extract mergebase from response"))
let conn_of_instance = function
| Watchman_dead _ -> None
| Watchman_alive { conn; _ } -> Some conn
module Testing = struct
let test_settings =
{
debug_logging = false;
defer_states = [];
expression_terms = [];
mergebase_with = "hash";
roots = [Path.dummy_path];
subscribe_mode = Defer_changes;
subscription_prefix = "dummy_prefix";
sync_timeout = None;
}
let get_test_conn () =
let%lwt reader = Buffered_line_reader_lwt.get_null_reader ()
and oc = Lwt_io.open_file ~mode:Lwt_io.output "/dev/null" in
Lwt.return (reader, oc)
let get_test_env () =
let%map conn = get_test_conn () in
{
settings = test_settings;
conn;
watch_root = "/path/to/root";
clockspec = "";
watched_path_expression_terms =
Some (J.pred "anyof" [J.strlist ["dirname"; "foo"]; J.strlist ["name"; "foo"]]);
subscription = "dummy_prefix.123456789";
}
let transform_asynchronous_get_changes_response env json =
transform_asynchronous_get_changes_response env json
end