Skip to content

Commit

Permalink
github: use an unlimited number of fids and walk in parallel
Browse files Browse the repository at this point in the history
This speeds-up initialisation of the bridge massively.

Signed-off-by: Thomas Gazagnaire <thomas@gazagnaire.org>
  • Loading branch information
samoht committed Dec 12, 2016
1 parent 1203383 commit 21c3b90
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 52 deletions.
97 changes: 52 additions & 45 deletions bridge/github/datakit_github_conv.ml
Expand Up @@ -151,17 +151,20 @@ module Make (DK: Datakit_S.CLIENT) = struct
Log.debug (fun l -> l "repo %a -> true" Repo.pp repo);
Some repo

let reduce_repos = List.fold_left Repo.Set.union Repo.Set.empty

let repos tree =
let root = Datakit_path.empty in
safe_read_dir tree root >>= fun users ->
Lwt_list.fold_left_s (fun acc user ->
Lwt_list.map_p (fun user ->
safe_read_dir tree (root / user) >>= fun repos ->
Lwt_list.fold_left_s (fun acc repo ->
Lwt_list.map_p (fun repo ->
safe_read_file tree (root / user /repo / ".monitor") >|= function
| None -> acc
| Some _ -> Repo.Set.add (Repo.v ~user ~repo) acc
) acc repos
) Repo.Set.empty users >|= fun repos ->
| None -> Repo.Set.empty
| Some _ -> Repo.Set.singleton (Repo.v ~user ~repo)
) repos >|= reduce_repos
) users >|= fun repos ->
let repos = reduce_repos repos in
Log.debug (fun l -> l "repos -> @;@[<2>%a@]" Repo.Set.pp repos);
repos

Expand Down Expand Up @@ -244,14 +247,17 @@ module Make (DK: Datakit_S.CLIENT) = struct
in
Some (PR.v ~state ~title ~base head number)

let reduce_prs = List.fold_left PR.Set.union PR.Set.empty

let prs_of_repo tree repo =
let dir = root repo / "pr" in
safe_read_dir tree dir >>= fun nums ->
Lwt_list.fold_left_s (fun acc n ->
Lwt_list.map_p (fun n ->
pr tree (repo, int_of_string n) >|= function
| None -> acc
| Some p -> PR.Set.add p acc
) PR.Set.empty nums >|= fun prs ->
| None -> PR.Set.empty
| Some p -> PR.Set.singleton p
) nums >|= fun prs ->
let prs = reduce_prs prs in
Log.debug (fun l ->
l "prs_of_repo %a -> @;@[<2>%a@]" Repo.pp repo PR.Set.pp prs);
prs
Expand All @@ -262,11 +268,8 @@ module Make (DK: Datakit_S.CLIENT) = struct

let prs ?repos:rs tree =
maybe_repos tree rs >>= fun repos ->
Lwt_list.fold_left_s (fun acc r ->
prs_of_repo tree r >|= fun prs ->
PR.Set.union prs acc
) PR.Set.empty (Repo.Set.elements repos)
>|= fun prs ->
Lwt_list.map_p (prs_of_repo tree) (Repo.Set.elements repos) >|= fun prs ->
let prs = reduce_prs prs in
Log.debug (fun l -> l "prs -> @;@[<2>%a@]" PR.Set.pp prs);
prs

Expand Down Expand Up @@ -294,13 +297,13 @@ module Make (DK: Datakit_S.CLIENT) = struct
Commit.Set.pp cs);
cs

let reduce_commits = List.fold_left Commit.Set.union Commit.Set.empty

let commits ?repos:rs tree =
maybe_repos tree rs >>= fun repos ->
Lwt_list.fold_left_s (fun acc r ->
commits_of_repo tree r >|= fun commits ->
Commit.Set.union commits acc
) Commit.Set.empty (Repo.Set.elements repos)
Lwt_list.map_p (commits_of_repo tree) (Repo.Set.elements repos)
>|= fun cs ->
let cs = reduce_commits cs in
Log.debug (fun l -> l "commits -> @;@[<2>%a@]" Commit.Set.pp cs);
cs

Expand All @@ -321,7 +324,7 @@ module Make (DK: Datakit_S.CLIENT) = struct
"state" , Some (Status_state.to_string @@ Status.state s);
"target_url" , mapo Uri.to_string (Status.url s);
] in
Lwt_list.iter_s (fun (k, v) -> match v with
Lwt_list.iter_p (fun (k, v) -> match v with
| None -> safe_remove t (dir / k)
| Some v ->
let v = Cstruct.of_string (v ^ "\n") in
Expand Down Expand Up @@ -355,16 +358,17 @@ module Make (DK: Datakit_S.CLIENT) = struct
let url = mapo Uri.of_string url in
Some (Status.v ?description ?url commit context state)

let reduce_status = List.fold_left Status.Set.union Status.Set.empty

let statuses_of_commits tree commits =
Lwt_list.fold_left_s (fun acc commit ->
Lwt_list.map_p (fun commit ->
let dir = root (Commit.repo commit) / "commit" in
let dir = dir / Commit.hash commit / "status" in
walk (module Status.Set) tree dir
("state", fun c -> status tree (commit, c))
>|= fun status ->
Status.Set.union status acc
) Status.Set.empty (Commit.Set.elements commits)
) (Commit.Set.elements commits)
>|= fun status ->
let status = reduce_status status in
Log.debug (fun l -> l "statuses_of_commits %a -> @;@[<2>%a@]"
Commit.Set.pp commits Status.Set.pp status);
status
Expand Down Expand Up @@ -401,13 +405,12 @@ module Make (DK: Datakit_S.CLIENT) = struct
l "refs_of_repo %a -> @;@[<2>%a@]" Repo.pp repo Ref.Set.pp refs);
refs

let reduce_refs = List.fold_left Ref.Set.union Ref.Set.empty

let refs ?repos:rs tree =
maybe_repos tree rs >>= fun repos ->
Lwt_list.fold_left_s (fun acc r ->
refs_of_repo tree r >|= fun refs ->
Ref.Set.union acc refs
) Ref.Set.empty (Repo.Set.elements repos)
>|= fun refs ->
Lwt_list.map_p (refs_of_repo tree) (Repo.Set.elements repos) >|= fun refs ->
let refs = reduce_refs refs in
Log.debug (fun l -> l "refs -> @;@[<2>%a@]" Ref.Set.pp refs);
refs

Expand Down Expand Up @@ -455,29 +458,31 @@ module Make (DK: Datakit_S.CLIENT) = struct

(* Dirty *)

let reduce_elts = List.fold_left Elt.IdSet.union Elt.IdSet.empty

let dirty_repos tree =
let root = Datakit_path.empty in
safe_read_dir tree root >>= fun users ->
Lwt_list.fold_left_s (fun acc user ->
Lwt_list.map_p (fun user ->
safe_read_dir tree (root / user) >>= fun repos ->
Lwt_list.fold_left_s (fun acc repo ->
Lwt_list.map_p (fun repo ->
safe_exists_file tree (root / user /repo / ".dirty") >|= function
| false -> acc
| true -> Elt.IdSet.add (`Repo (Repo.v ~user ~repo)) acc
) acc repos
) Elt.IdSet.empty users
| false -> Elt.IdSet.empty
| true -> Elt.IdSet.singleton (`Repo (Repo.v ~user ~repo))
) repos >|= reduce_elts
) users >|= reduce_elts

let dirty_prs tree repo =
let dir = root repo / "pr" in
safe_read_dir tree dir >>= fun nums ->
Lwt_list.fold_left_s (fun acc n ->
Lwt_list.map_p (fun n ->
let d = dir / n / ".dirty" in
safe_exists_file tree d >|= function
| false -> acc
| false -> Elt.IdSet.empty
| true ->
try Elt.IdSet.add (`PR (repo, int_of_string n)) acc
with Failure _ -> acc
) Elt.IdSet.empty nums
try Elt.IdSet.singleton (`PR (repo, int_of_string n))
with Failure _ -> Elt.IdSet.empty
) nums >|= reduce_elts

let dirty_refs tree repo =
let dir = root repo / "ref" in
Expand All @@ -491,11 +496,13 @@ module Make (DK: Datakit_S.CLIENT) = struct
dirty_repos t >>= fun dirty_repos ->
repos t >>= fun repos ->
(* we only check for dirty prs/refs for monitored repos only *)
Lwt_list.fold_left_s (fun acc r ->
Lwt_list.map_p (fun r ->
dirty_prs t r >>= fun prs ->
dirty_refs t r >|= fun refs ->
acc ++ prs ++ refs
) dirty_repos (Repo.Set.elements repos)
prs ++ refs
) (Repo.Set.elements repos)
>|= fun more ->
dirty_repos ++ reduce_elts more

let dirty_file: Elt.id -> Datakit_path.t = function
| `Repo r -> root r / ".dirty"
Expand Down Expand Up @@ -661,7 +668,7 @@ module Make (DK: Datakit_S.CLIENT) = struct
let f tr =
Log.debug
(fun l -> l "remove_snapshot (from %s):@;%a" debug Elt.IdSet.pp t);
Lwt_list.iter_s (remove_elt tr) (Elt.IdSet.elements t)
Lwt_list.iter_p (remove_elt tr) (Elt.IdSet.elements t)
in
Some f

Expand All @@ -671,7 +678,7 @@ module Make (DK: Datakit_S.CLIENT) = struct
let f tr =
Log.debug
(fun l -> l "update_snapshot (from %s):@;%a" debug Elt.Set.pp t);
Lwt_list.iter_s (update_elt tr) (Elt.Set.elements t)
Lwt_list.iter_p (update_elt tr) (Elt.Set.elements t)
in
Some f

Expand Down
2 changes: 1 addition & 1 deletion bridge/github/main.ml
Expand Up @@ -130,7 +130,7 @@ let start () no_listen listen_urls datakit cap webhook resync_interval =
failwith "connect to datakit"
in
Lwt.catch
(fun () -> Client9p.connect proto address ())
(fun () -> Client9p.connect proto address ~max_fids:Int32.max_int ())
(fun e -> Lwt.fail_with @@ Fmt.strf "%a" Fmt.exn e)
>>= function
| Error (`Msg e) ->
Expand Down
10 changes: 5 additions & 5 deletions tests/test_github.ml
Expand Up @@ -1511,7 +1511,7 @@ let rec read_state ~user ~repo ~commit tree path context =
[ Status.v ?description ?url commit context state]
end
>>= fun this_state ->
items |> Lwt_list.map_s (function
items |> Lwt_list.map_p (function
| "status" | "description" | "target_url" -> Lwt.return []
| item ->
read_state ~user ~repo ~commit tree (path / item) (context @ [item])
Expand All @@ -1528,7 +1528,7 @@ let read_opt_dir tree path =
let read_commits tree ~user ~repo =
let path = Datakit_path.of_steps_exn [user; repo; "commit"] in
read_opt_dir tree path >>=
Lwt_list.map_s (fun commit ->
Lwt_list.map_p (fun commit ->
let path =
Datakit_path.of_steps_exn [user; repo; "commit"; commit; "status"]
in
Expand All @@ -1539,7 +1539,7 @@ let read_commits tree ~user ~repo =
let read_prs tree ~user ~repo =
let path = Datakit_path.of_steps_exn [user; repo; "pr"] in
read_opt_dir tree path >>=
Lwt_list.map_s (fun number ->
Lwt_list.map_p (fun number ->
let path = Datakit_path.of_steps_exn [user; repo; "pr"; number] in
let number = int_of_string number in
let read name =
Expand Down Expand Up @@ -1742,11 +1742,11 @@ let test_random_datakit ~quick _repo conn =
let update_datakit users =
let events = Users.diff_events users (Users.empty ()) in
DK.Branch.with_transaction branch (fun tr ->
Lwt_list.iter_s (fun { Repo.user; repo } ->
Lwt_list.iter_p (fun { Repo.user; repo } ->
safe_remove tr Datakit_path.(empty / user / repo)
) (Repo.Set.elements all_repos)
>>= fun () ->
Lwt_list.iter_s (Conv.update_event tr) events >>= fun () ->
Lwt_list.iter_p (Conv.update_event tr) events >>= fun () ->
DK.Transaction.commit tr ~message:"User updates"
) >>= function
| Error e -> Lwt.fail (DK_error e)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_utils.ml
Expand Up @@ -179,7 +179,7 @@ let run fn =
Server.accept ~root ~msg:"test" for_server >>*= Lwt.return
in
Lwt.finalize
(fun () -> Client.connect for_client () >>*= fn repo)
(fun () -> Client.connect for_client ~max_fids:Int32.max_int () >>*= fn repo)
(fun () -> Lwt.cancel server_thread; Lwt.return ())
end

Expand Down

0 comments on commit 21c3b90

Please sign in to comment.