/
clone.ml
98 lines (90 loc) · 2.89 KB
/
clone.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
open Lwt
open Common
open Unix.LargeFile
module Clone = struct
let receive_files (ic,oc) ~target_dir =
let request f =
let buf = Buffer.create 32 in
let () = f buf in
Lwt_io.write oc (Buffer.contents buf)
>>= fun () ->
Lwt_io.flush oc
in
let outgoing buf = command_to buf CLONE in
let incoming ic =
Llio.input_int ic >>= fun n_files ->
Lwt_log.debug_f "n_files:%i" n_files >>= fun ()->
let rec loop i =
if i = 0
then Lwt.return ()
else
begin
Llio.input_string ic >>= fun name ->
Llio.input_int64 ic >>= fun length ->
Lwt_log.debug_f "receiving file %s with length %Li" name length >>= fun () ->
let file_name = Filename.concat target_dir name in
Lwt_io.with_file
~flags:[Unix.O_CREAT;Unix.O_WRONLY]
~mode:Lwt_io.output
file_name (fun oc -> Llio.copy_stream ~length ~ic ~oc)
>>= fun () ->
loop (i-1)
end
in
loop n_files >>= fun () ->
Lwt.return ()
in
request outgoing >>= fun () ->
response ic incoming
let send_files (ic,oc) home_dir =
Tlc2.get_tlog_names home_dir >>= fun file_names ->
(* let ok_names = List.filter Tlc2.is_compressed file_names in *)
let ok_names = file_names in
let n_files = List.length ok_names in
Llio.output_int oc 0 >>= fun () ->
Llio.output_int oc n_files >>= fun () ->
let rec loop = function
| []-> Lwt.return ()
| name :: rest ->
begin
let file_name = Filename.concat home_dir name in
let stat = Unix.LargeFile.stat file_name in (*Lwt_unix.stat *)
let length = stat.st_size in
Lwt_log.debug_f "sending file %s with length %Li" file_name length
>>= fun () ->
Llio.output_string oc name >>= fun () ->
Llio.output_int64 oc length >>= fun () ->
Lwt_io.with_file
~flags:[Unix.O_RDONLY]
~mode:Lwt_io.input
file_name (fun ic -> Llio.copy_stream ~length ~ic ~oc)
>>= fun () ->
loop rest
end
in loop ok_names >>= fun () ->
Lwt.return ()
let fill_store ~tlog_dir ~store_dir ~node_name =
let db_name = store_dir ^ "/" ^ node_name ^ ".db" in
Local_store.make_local_store db_name >>= fun store ->
Tlc2.make_tlc2 tlog_dir >>= fun tlc ->
tlc # get_last_i () >>= fun too_far_i ->
Catchup.catchup_store node_name store tlc too_far_i >>= fun _ ->
store # close ()
let clone_node ip port ~tlog_dir ~store_dir ~node_name =
let logger = Lwt_log.channel
~close_mode:`Keep
~channel:Lwt_io.stderr
~template:"$(date): $(level): $(message)"
()
in
Lwt_log.default := logger;
Lwt_log.Section.set_level Lwt_log.Section.main Lwt_log.Debug;
let t =
let sa = Network.make_address ip port in
Lwt_io.with_connection sa (receive_files ~target_dir:tlog_dir) >>= fun () ->
fill_store ~tlog_dir ~store_dir ~node_name >>= fun () ->
Lwt.return ()
in
Lwt_main.run t;
0
end