Skip to content

Commit

Permalink
Merge pull request #10895 from MinaProtocol/georgeee/fix-validation-t…
Browse files Browse the repository at this point in the history
…imeout-for-catchup

Pass validation_callback through catchup process
  • Loading branch information
georgeee committed Jun 7, 2022
2 parents 9d77e4a + 0056e03 commit 2b9d50f
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 180 deletions.
1 change: 1 addition & 0 deletions src/lib/ledger_catchup/dune
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@
o1trace
marlin_plonk_bindings_pasta_fp
pasta
mina_net2
))
11 changes: 7 additions & 4 deletions src/lib/ledger_catchup/ledger_catchup.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ val run :
-> frontier:Transition_frontier.t
-> catchup_job_reader:
( State_hash.t
* ( Mina_block.initial_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t
* ( ( Mina_block.initial_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t
* Mina_net2.Validation_callback.t option )
Rose_tree.t
list )
Strict_pipe.Reader.t
-> catchup_breadcrumbs_writer:
( (Transition_frontier.Breadcrumb.t, State_hash.t) Cached.t Rose_tree.t
( ( (Transition_frontier.Breadcrumb.t, State_hash.t) Cached.t
* Mina_net2.Validation_callback.t option )
Rose_tree.t
list
* [ `Ledger_catchup of unit Ivar.t | `Catchup_scheduler ]
, Strict_pipe.crash Strict_pipe.buffered
Expand Down
122 changes: 67 additions & 55 deletions src/lib/ledger_catchup/normal_catchup.ml

Large diffs are not rendered by default.

139 changes: 88 additions & 51 deletions src/lib/ledger_catchup/super_catchup.ml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/lib/transition_frontier/dune
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
rose_tree
quickcheck_lib
protocol_version
mina_net2
)
(instrumentation (backend bisect_ppx))
(preprocess (pps ppx_jane ppx_coda ppx_version ppx_compare ppx_deriving_yojson)))
16 changes: 9 additions & 7 deletions src/lib/transition_frontier/full_catchup_tree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ module Node = struct
| To_download of Downloader_job.t
| To_initial_validate of Mina_block.t Envelope.Incoming.t
| To_verify of
( Mina_block.initial_valid_block Envelope.Incoming.t
(( Mina_block.initial_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t
Cached.t * Mina_net2.Validation_callback.t option)
| Wait_for_parent of
( Mina_block.almost_valid_block Envelope.Incoming.t
(( Mina_block.almost_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t
Cached.t * Mina_net2.Validation_callback.t option)
| To_build_breadcrumb of
( [`Parent of State_hash.t]
* ( Mina_block.almost_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t )
Cached.t * Mina_net2.Validation_callback.t option)
(* TODO: Name this to Initial_root *)
| Root of Breadcrumb.t Ivar.t

Expand Down Expand Up @@ -293,11 +293,13 @@ let remove_node' t (node : Node.t) =
()
| To_initial_validate _ ->
()
| To_verify c ->
| To_verify (c, vc) ->
Option.value_map ~default:ignore ~f:Mina_net2.Validation_callback.fire_if_not_already_fired vc `Ignore;
ignore
( Cached.invalidate_with_failure c
: Mina_block.initial_valid_block Envelope.Incoming.t )
| To_build_breadcrumb (_parent, c) ->
| To_build_breadcrumb (_parent, c, vc) ->
Option.value_map ~default:ignore ~f:Mina_net2.Validation_callback.fire_if_not_already_fired vc `Ignore;
ignore
( Cached.invalidate_with_failure c
: Mina_block.almost_valid_block Envelope.Incoming.t )
Expand Down
28 changes: 13 additions & 15 deletions src/lib/transition_handler/breadcrumb_builder.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ open Mina_base
open Core
open Async
open Cache_lib
open Mina_block
open Network_peer

let build_subtrees_of_breadcrumbs ~logger ~precomputed_values ~verifier
Expand All @@ -26,7 +25,7 @@ let build_subtrees_of_breadcrumbs ~logger ~precomputed_values ~verifier
, `List
(List.map subtrees_of_enveloped_transitions ~f:(fun subtree ->
Rose_tree.to_yojson
(fun enveloped_transitions ->
(fun (enveloped_transitions, _vc) ->
let transition, _ =
enveloped_transitions |> Cached.peek
|> Envelope.Incoming.data
Expand All @@ -44,21 +43,19 @@ let build_subtrees_of_breadcrumbs ~logger ~precomputed_values ~verifier
in
Deferred.Or_error.List.map subtrees_of_enveloped_transitions
~f:(fun subtree_of_enveloped_transitions ->
let open Deferred.Or_error.Let_syntax in
let%bind init_breadcrumb =
let%bind.Deferred.Or_error init_breadcrumb =
breadcrumb_if_present
(Logger.extend logger
[ ("Check", `String "Before creating breadcrumb") ] )
|> Deferred.return
in
Rose_tree.Deferred.Or_error.fold_map_over_subtrees
subtree_of_enveloped_transitions ~init:(Cached.pure init_breadcrumb)
~f:(fun
cached_parent
(Rose_tree.T (cached_enveloped_transition, _) as subtree)
->
let open Deferred.Let_syntax in
let%map cached_result =
subtree_of_enveloped_transitions
~init:(Cached.pure init_breadcrumb, None)
~f:(fun (cached_parent, _parent_vc)
( Rose_tree.T ((cached_enveloped_transition, valid_cb), _) as
subtree ) ->
let%map.Deferred cached_result =
Cached.transform cached_enveloped_transition
~f:(fun enveloped_transition ->
let open Deferred.Or_error.Let_syntax in
Expand All @@ -73,7 +70,7 @@ let build_subtrees_of_breadcrumbs ~logger ~precomputed_values ~verifier
(* TODO: handle this edge case more gracefully *)
(* since we are building a disconnected subtree of breadcrumbs,
* we skip this step in validation *)
Validation.skip_frontier_dependencies_validation
Mina_block.Validation.skip_frontier_dependencies_validation
`This_block_belongs_to_a_detached_subtree
transition_with_initial_validation
in
Expand All @@ -84,7 +81,7 @@ let build_subtrees_of_breadcrumbs ~logger ~precomputed_values ~verifier
in
let actual_parent_hash =
transition_with_hash |> With_hash.data |> Mina_block.header
|> Header.protocol_state
|> Mina_block.Header.protocol_state
|> Mina_state.Protocol_state.previous_state_hash
in
let%bind () =
Expand Down Expand Up @@ -132,7 +129,7 @@ let build_subtrees_of_breadcrumbs ~logger ~precomputed_values ~verifier
in
List.fold subtree_nodes
~init:(Set.empty (module Network_peer.Peer))
~f:(fun inet_addrs node ->
~f:(fun inet_addrs (node, _vc) ->
match sender_from_tree_node node with
| Local ->
failwith
Expand Down Expand Up @@ -163,4 +160,5 @@ let build_subtrees_of_breadcrumbs ~logger ~precomputed_values ~verifier
Deferred.return (Or_error.of_exn exn) ) ) )
|> Cached.sequence_deferred
in
Cached.sequence_result cached_result ) )
Result.map ~f:(Fn.flip Tuple2.create valid_cb)
@@ Cached.sequence_result cached_result ) )
83 changes: 54 additions & 29 deletions src/lib/transition_handler/catchup_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ open Otp_lib
open Mina_base
open Mina_block
open Network_peer
open Mina_net2

type t =
{ logger : Logger.t
; time_controller : Block_time.Controller.t
; catchup_job_writer :
( State_hash.t
* ( Mina_block.initial_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t
* ( ( Mina_block.initial_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t
* Mina_net2.Validation_callback.t option )
Rose_tree.t
list
, crash buffered
Expand All @@ -45,45 +47,50 @@ type t =
Cached.t
list
State_hash.Table.t
(* Validation callbacks for state hashes that are being processed *)
; validation_callbacks : Mina_net2.Validation_callback.t State_hash.Table.t
(** `parent_root_timeouts` stores the timeouts for catchup job. The
keys are the missing transitions, and the values are the
timeouts. *)
; parent_root_timeouts : unit Block_time.Timeout.t State_hash.Table.t
; breadcrumb_builder_supervisor :
( State_hash.t
* ( Mina_block.initial_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t
* ( ( Mina_block.initial_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t
* Mina_net2.Validation_callback.t option )
Rose_tree.t
list )
Capped_supervisor.t
}

let create ~logger ~precomputed_values ~verifier ~trust_system ~frontier
~time_controller
~(catchup_job_writer :
( State_hash.t
* ( Mina_block.initial_valid_block Envelope.Incoming.t
, State_hash.t )
Cached.t
Rose_tree.t
list
, crash buffered
, unit )
Writer.t )
~time_controller ~catchup_job_writer
~(catchup_breadcrumbs_writer :
( (Transition_frontier.Breadcrumb.t, State_hash.t) Cached.t Rose_tree.t
( ( (Transition_frontier.Breadcrumb.t, State_hash.t) Cached.t
* Validation_callback.t option )
Rose_tree.t
list
* [ `Ledger_catchup of unit Ivar.t | `Catchup_scheduler ]
, crash buffered
, unit )
Writer.t ) ~clean_up_signal =
let collected_transitions = State_hash.Table.create () in
let parent_root_timeouts = State_hash.Table.create () in
let validation_callbacks = State_hash.Table.create () in
upon (Ivar.read clean_up_signal) (fun () ->
Hashtbl.iter collected_transitions ~f:(fun cached_transitions ->
List.iter cached_transitions
~f:(Fn.compose ignore Cached.invalidate_with_failure) ) ;
Hashtbl.iter collected_transitions
~f:
(List.iter ~f:(fun b ->
let hash =
Cached.peek b |> Envelope.Incoming.data
|> Validation.block_with_hash
|> State_hash.With_state_hashes.state_hash
in
let vc = Hashtbl.find validation_callbacks hash in
Option.value_map ~default:ignore
~f:Validation_callback.fire_if_not_already_fired vc `Ignore ;
ignore @@ Cached.invalidate_with_failure b ) ) ;
Hashtbl.iter parent_root_timeouts ~f:(fun timeout ->
Block_time.Timeout.cancel time_controller timeout () ) ) ;
let breadcrumb_builder_supervisor =
Expand All @@ -107,7 +114,12 @@ let create ~logger ~precomputed_values ~verifier ~trust_system ~frontier
$error"
~metadata:[ ("error", Error_json.error_to_yojson err) ] ;
List.iter transition_branches ~f:(fun subtree ->
Rose_tree.iter subtree ~f:(fun cached_transition ->
Rose_tree.iter subtree ~f:(fun (cached_transition, vc) ->
(* TODO consider rejecting the callback in some cases,
see https://github.com/MinaProtocol/mina/issues/11087 *)
Option.value_map vc ~default:ignore
~f:Mina_net2.Validation_callback.fire_if_not_already_fired
`Ignore ;
ignore
( Cached.invalidate_with_failure cached_transition
: Mina_block.initial_valid_block Envelope.Incoming.t ) ) ) )
Expand All @@ -118,6 +130,7 @@ let create ~logger ~precomputed_values ~verifier ~trust_system ~frontier
; catchup_job_writer
; parent_root_timeouts
; breadcrumb_builder_supervisor
; validation_callbacks
}

let mem t transition =
Expand Down Expand Up @@ -159,7 +172,9 @@ let rec extract_subtree t cached_transition =
let successors =
Option.value ~default:[] (Hashtbl.find t.collected_transitions hash)
in
Rose_tree.T (cached_transition, List.map successors ~f:(extract_subtree t))
Rose_tree.T
( (cached_transition, Hashtbl.find t.validation_callbacks hash)
, List.map successors ~f:(extract_subtree t) )

let extract_forest t hash =
let successors =
Expand All @@ -179,7 +194,7 @@ let rec remove_tree t parent_hash =
let transition, _ = Envelope.Incoming.data (Cached.peek child) in
remove_tree t (State_hash.With_state_hashes.state_hash transition) )

let watch t ~timeout_duration ~cached_transition =
let watch t ~timeout_duration ~cached_transition ~valid_cb =
let transition_with_hash, _ =
Envelope.Incoming.data (Cached.peek cached_transition)
in
Expand All @@ -189,6 +204,16 @@ let watch t ~timeout_duration ~cached_transition =
|> Mina_block.header |> Header.protocol_state
|> Mina_state.Protocol_state.previous_state_hash
in
Option.value_map valid_cb ~default:() ~f:(fun data ->
match Hashtbl.add t.validation_callbacks ~key:hash ~data with
| `Ok ->
(* Clean up entry upon callback resolution *)
upon
(Deferred.ignore_m @@ Mina_net2.Validation_callback.await data)
(fun () -> Hashtbl.remove t.validation_callbacks hash)
| `Duplicate ->
[%log' warn t.logger] "Double validation callback for $state_hash"
~metadata:[ ("state_hash", Mina_base.State_hash.to_yojson hash) ] ) ;
let make_timeout duration =
Block_time.Timeout.create t.time_controller duration ~f:(fun _ ->
let forest = extract_forest t parent_hash in
Expand Down Expand Up @@ -336,7 +361,7 @@ let%test_module "Transition_handler.Catchup_scheduler tests" =
create ~frontier ~precomputed_values ~verifier ~catchup_job_writer
~catchup_breadcrumbs_writer ~clean_up_signal:(Ivar.create ())
in
watch scheduler ~timeout_duration
watch scheduler ~timeout_duration ~valid_cb:None
~cached_transition:
(Cached.pure @@ downcast_breadcrumb disjoint_breadcrumb) ;
Async.Thread_safe.block_on_async_exn (fun () ->
Expand Down Expand Up @@ -394,7 +419,7 @@ let%test_module "Transition_handler.Catchup_scheduler tests" =
create ~precomputed_values ~frontier ~verifier ~catchup_job_writer
~catchup_breadcrumbs_writer ~clean_up_signal:(Ivar.create ())
in
watch scheduler ~timeout_duration
watch scheduler ~timeout_duration ~valid_cb:None
~cached_transition:
(Cached.transform ~f:downcast_breadcrumb breadcrumb_2) ;
Async.Thread_safe.block_on_async_exn (fun () ->
Expand Down Expand Up @@ -433,7 +458,7 @@ let%test_module "Transition_handler.Catchup_scheduler tests" =
failwith "pipe closed unexpectedly"
| `Ok
(`Ok
( [ Rose_tree.T (received_breadcrumb, []) ]
( [ Rose_tree.T ((received_breadcrumb, _vc), []) ]
, `Catchup_scheduler ) ) ->
[%test_eq: State_hash.t]
(Transition_frontier.Breadcrumb.state_hash
Expand Down Expand Up @@ -473,7 +498,7 @@ let%test_module "Transition_handler.Catchup_scheduler tests" =
let[@warning "-8"] (oldest_breadcrumb :: dependent_breadcrumbs) =
List.rev branch
in
watch scheduler ~timeout_duration
watch scheduler ~timeout_duration ~valid_cb:None
~cached_transition:
(Cached.pure @@ downcast_breadcrumb oldest_breadcrumb) ;
assert (
Expand All @@ -482,7 +507,7 @@ let%test_module "Transition_handler.Catchup_scheduler tests" =
ignore
( List.fold dependent_breadcrumbs ~init:oldest_breadcrumb
~f:(fun prev_breadcrumb curr_breadcrumb ->
watch scheduler ~timeout_duration
watch scheduler ~timeout_duration ~valid_cb:None
~cached_transition:
(Cached.pure @@ downcast_breadcrumb curr_breadcrumb) ;
assert (
Expand Down
Loading

0 comments on commit 2b9d50f

Please sign in to comment.