Skip to content

Commit

Permalink
Merge pull request #9416 from MinaProtocol/fix/async-scheduler-starva…
Browse files Browse the repository at this point in the history
…tion

Fix async scheduler starvation
  • Loading branch information
lk86 committed Oct 7, 2021
2 parents 15a9c91 + 7b5bc5f commit 31e23f7
Show file tree
Hide file tree
Showing 24 changed files with 604 additions and 334 deletions.
118 changes: 58 additions & 60 deletions src/app/cli/src/init/coda_run.ml
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ let setup_local_server ?(client_trustlist = []) ?rest_server_port
server_description port )
in
Option.iter rest_server_port ~f:(fun rest_server_port ->
trace_task "REST server" (fun () ->
trace_task "full GraphQL server" (fun () ->
create_graphql_server
~bind_to_address:
Tcp.Bind_to_address.(
Expand All @@ -502,7 +502,7 @@ let setup_local_server ?(client_trustlist = []) ?rest_server_port
rest_server_port ) ) ;
(*Second graphql server with limited queries exopsed*)
Option.iter limited_graphql_port ~f:(fun rest_server_port ->
trace_task "Second REST server (with limited queries)" (fun () ->
trace_task "limited GraphQL server" (fun () ->
create_graphql_server
~bind_to_address:
Tcp.Bind_to_address.(
Expand All @@ -514,64 +514,62 @@ let setup_local_server ?(client_trustlist = []) ?rest_server_port
Tcp.Where_to_listen.bind_to All_addresses
(On_port (Mina_lib.client_port coda))
in
don't_wait_for
(Deferred.ignore
(trace "client RPC handling" (fun () ->
Tcp.Server.create
~on_handler_error:
(`Call
(fun _net exn ->
[%log error]
"Exception while handling TCP server request: $error"
~metadata:
[ ("error", `String (Exn.to_string_mach exn))
; ("context", `String "rpc_tcp_server") ] ))
where_to_listen
(fun address reader writer ->
let address = Socket.Address.Inet.addr address in
if
not
(Set.exists !client_trustlist ~f:(fun cidr ->
Unix.Cidr.does_match cidr address ))
then (
[%log error]
!"Rejecting client connection from $address, it is not \
present in the trustlist."
~metadata:
[("$address", `String (Unix.Inet_addr.to_string address))] ;
Deferred.unit )
else
Rpc.Connection.server_with_close
~handshake_timeout:
(Time.Span.of_sec
Mina_compile_config.rpc_handshake_timeout_sec)
~heartbeat_config:
(Rpc.Connection.Heartbeat_config.create
~timeout:
(Time_ns.Span.of_sec
Mina_compile_config.rpc_heartbeat_timeout_sec)
~send_every:
(Time_ns.Span.of_sec
Mina_compile_config.rpc_heartbeat_send_every_sec))
reader writer
~implementations:
(Rpc.Implementations.create_exn
~implementations:(client_impls @ snark_worker_impls)
~on_unknown_rpc:`Raise)
~connection_state:(fun _ -> ())
~on_handshake_error:
(`Call
(fun exn ->
[%log warn]
"Handshake error while handling RPC server \
request from $address"
~metadata:
[ ("error", `String (Exn.to_string_mach exn))
; ("context", `String "rpc_server")
; ( "address"
, `String (Unix.Inet_addr.to_string address) )
] ;
Deferred.unit )) ) )))
trace_task "client RPC server" (fun () ->
Deferred.ignore
(Tcp.Server.create
~on_handler_error:
(`Call
(fun _net exn ->
[%log error]
"Exception while handling TCP server request: $error"
~metadata:
[ ("error", `String (Exn.to_string_mach exn))
; ("context", `String "rpc_tcp_server") ] ))
where_to_listen
(fun address reader writer ->
let address = Socket.Address.Inet.addr address in
if
not
(Set.exists !client_trustlist ~f:(fun cidr ->
Unix.Cidr.does_match cidr address ))
then (
[%log error]
!"Rejecting client connection from $address, it is not \
present in the trustlist."
~metadata:
[("$address", `String (Unix.Inet_addr.to_string address))] ;
Deferred.unit )
else
Rpc.Connection.server_with_close
~handshake_timeout:
(Time.Span.of_sec
Mina_compile_config.rpc_handshake_timeout_sec)
~heartbeat_config:
(Rpc.Connection.Heartbeat_config.create
~timeout:
(Time_ns.Span.of_sec
Mina_compile_config.rpc_heartbeat_timeout_sec)
~send_every:
(Time_ns.Span.of_sec
Mina_compile_config.rpc_heartbeat_send_every_sec))
reader writer
~implementations:
(Rpc.Implementations.create_exn
~implementations:(client_impls @ snark_worker_impls)
~on_unknown_rpc:`Raise)
~connection_state:(fun _ -> ())
~on_handshake_error:
(`Call
(fun exn ->
[%log warn]
"Handshake error while handling RPC server request \
from $address"
~metadata:
[ ("error", `String (Exn.to_string_mach exn))
; ("context", `String "rpc_server")
; ( "address"
, `String (Unix.Inet_addr.to_string address) ) ] ;
Deferred.unit )) )) )

let coda_crash_message ~log_issue ~action ~error =
let followup =
Expand Down
13 changes: 9 additions & 4 deletions src/app/trace-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ extern crate memmap;

use std::collections::HashMap;
use std::io::prelude::*;
use std::str;

use clap::{App, Arg};
use nom::{le_u64, le_u8};
Expand Down Expand Up @@ -92,7 +93,7 @@ fn complete_event(
Some(tid) => match tids.get(&tid) {
Some(tname) => println!(
r#"{{"name":"{}","pid":{},"ph":"X","ts":{},"dur":{},"tid":{}}},"#,
tname,
escape(tname),
pid,
prev_ts / 1000.0,
(cur_ts - prev_ts) / 1000.0,
Expand All @@ -111,6 +112,10 @@ fn complete_event(
}
}

fn escape(s: &str) -> String {
str::replace(&s, "\"", "\\\"")
}

fn main() {
let matches = App::new("trace-tool")
.arg(
Expand Down Expand Up @@ -166,7 +171,7 @@ fn main() {
let real = recurring_map.entry(s.clone()).or_insert(t);
tidmap.insert(t, *real);
}
println!(r#"{{"name":"thread_name","ph":"M","pid":{},"tid":{},"args":{{"name":"{}"}}}},"#, cur_pid, t.0, s);
println!(r#"{{"name":"thread_name","ph":"M","pid":{},"tid":{},"args":{{"name":"{}"}}}},"#, cur_pid, t.0, escape(&s));
seen_tids.insert(t, s);
(prev_ts, prev_task)
}
Expand Down Expand Up @@ -196,13 +201,13 @@ fn main() {
(prev_ts, prev_task)
}
Event(s) => {
println!(r#"{{"name":"{}","ph":"i","ts":{},"pid":{},"tid":{},"s":"t"}},"#, s, cur_ts/1000.0, cur_pid, prev_task.unwrap_or(Tid(0)).0);
println!(r#"{{"name":"{}","ph":"i","ts":{},"pid":{},"tid":{},"s":"t"}},"#, escape(&s), cur_ts/1000.0, cur_pid, prev_task.unwrap_or(Tid(0)).0);
(prev_ts, prev_task)
}
Start(s) => {
println!(
r#"{{"name":"{}","ph":"B","ts":{},"pid":{},"tid":{}}},"#,
s,
escape(&s),
cur_ts / 1000.0,
cur_pid,
prev_task.unwrap_or(Tid(0)).0
Expand Down
57 changes: 57 additions & 0 deletions src/external/ocaml-rocksdb/rocks.ml
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,63 @@ and RocksDb : (Rocks_intf.ROCKS with type batch := WriteBatch.t) = struct
| None ->
ReadOptions.with_t inner

let multi_get_raw =
foreign "rocksdb_multi_get"
( t
@-> ReadOptions.t
@-> Views.int_to_size_t
@-> ptr (ptr char)
@-> ptr size_t
@-> ptr (ptr char)
@-> ptr size_t
@-> ptr (ptr char)
@-> returning void )

let multi_get ?opts t keys =
let inner opts =
let cast_array_ptr typ arr = arr |> bigarray_start array1 |> to_voidp |> from_voidp typ in
let count = List.length keys in
(* could optimize this into a single allocation *)
let inputs = Bigarray.(Array1.create Nativeint C_layout) count in
let input_lengths = Bigarray.(Array1.create Nativeint C_layout) count in
let outputs = Bigarray.(Array1.create Nativeint C_layout) count in
let output_lengths = Bigarray.(Array1.create Nativeint C_layout) count in
let errors = Bigarray.(Array1.create Nativeint C_layout) count in
keys |> List.iteri (fun i key ->
Bigarray.Array1.unsafe_set inputs i (bigarray_start array1 key |> to_voidp |> raw_address_of_ptr) ;
Bigarray.Array1.unsafe_set input_lengths i (Bigarray.Array1.dim key |> Nativeint.of_int) ) ;
multi_get_raw t opts count
(cast_array_ptr (ptr char) inputs)
(cast_array_ptr size_t input_lengths)
(cast_array_ptr (ptr char) outputs)
(cast_array_ptr size_t output_lengths)
(cast_array_ptr (ptr char) errors) ;
keep_alive (inputs, input_lengths, keys) ;
let results = ref [] in
for i = count - 1 downto 0 do
let res =
let error_ptr = Bigarray.Array1.unsafe_get errors i |> ptr_of_raw_address in
if is_null error_ptr then (
let output_length = Nativeint.to_int (Bigarray.Array1.unsafe_get output_lengths i) in
let output_ptr = Bigarray.Array1.unsafe_get outputs i |> ptr_of_raw_address in
if output_ptr = null then None else (
let output = bigarray_of_ptr array1 output_length Bigarray.char (from_voidp char output_ptr) in
Gc.finalise_last (fun () -> free output_ptr) output ;
Some output ))
else (
free error_ptr ;
None )
in
results := res :: !results
done ;
!results
in
match opts with
| Some opts ->
inner opts
| None ->
ReadOptions.with_t inner

let flush_raw =
foreign "rocksdb_flush" (t @-> FlushOptions.t @-> returning_error void)

Expand Down
7 changes: 7 additions & 0 deletions src/external/ocaml-rocksdb/rocks_common.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
open Ctypes
open Foreign

(* Trick taken from Janestreet's core_kernel library.
* This function guarantees `o` will not be garbage collected.
* It is safer than using `ignore` to provide this guarantee, as
* the compiler won't optimize int_of_string away and will not perform
* constant folding on invocations of `keep_alive` *)
let rec keep_alive o = if Sys.opaque_identity (int_of_string "0") <> 0 then keep_alive (Sys.opaque_identity o)

module Views = struct
open Unsigned

Expand Down
3 changes: 3 additions & 0 deletions src/external/ocaml-rocksdb/rocks_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ module type ROCKS = sig
val get : ?pos:int -> ?len:int -> ?opts:ReadOptions.t -> t -> bigarray -> bigarray option
val get_string : ?pos:int -> ?len:int -> ?opts:ReadOptions.t -> t -> string -> string option

(* there are more efficient encodings of this... *)
val multi_get : ?opts:ReadOptions.t -> t -> bigarray list -> bigarray option list

val put : ?key_pos:int -> ?key_len:int -> ?value_pos:int -> ?value_len:int -> ?opts:WriteOptions.t -> t -> bigarray -> bigarray -> unit
val put_string : ?key_pos:int -> ?key_len:int -> ?value_pos:int -> ?value_len:int -> ?opts:WriteOptions.t -> t -> string -> string -> unit

Expand Down
23 changes: 21 additions & 2 deletions src/external/ocaml-rocksdb/rocks_options.ml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,22 @@ module BlockBasedTableOptions =
end

module Options = struct
module SliceTransform = struct
type t = Rocks_common.t

let t = Rocks_common.t

module Noop = struct
include CreateConstructors(struct
let super_name = "slicetransform"
let name = "noop"
let constructor = "rocksdb_" ^ super_name ^ "_create_" ^ name
let destructor = "rocksdb_" ^ super_name ^ "_destroy"
let setter_prefix = "rocksdb_" ^ super_name ^ "_" ^ name ^ "_"
end)
end
end

(* extern rocksdb_options_t* rocksdb_options_create(); *)
(* extern void rocksdb_options_destroy(rocksdb_options_t*\); *)
module C = CreateConstructors_(struct let name = "options" end)
Expand Down Expand Up @@ -210,8 +226,11 @@ module Options = struct

(* extern void rocksdb_options_set_compression_options( *)
(* rocksdb_options_t*, int, int, int); *)
(* extern void rocksdb_options_set_prefix_extractor( *)
(* rocksdb_options_t*, rocksdb_slicetransform_t*\); *)

(* extern void rocksdb_options_set_prefix_extractor(rocksdb_options_t* opt, rocksdb_slicetransform_t* trans); *)
let set_prefix_extractor =
create_setter "set_prefix_extractor" SliceTransform.t

(* extern void rocksdb_options_set_num_levels(rocksdb_options_t*, int); *)
(* extern void rocksdb_options_set_level0_file_num_compaction_trigger( *)
(* rocksdb_options_t*, int); *)
Expand Down
4 changes: 4 additions & 0 deletions src/lib/key_value_database/key_value_database.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ module Intf = struct

val get : t -> key:key -> value option M.t

val get_batch : t -> keys:key list -> value option list M.t

val set : t -> key:key -> data:value -> unit M.t

val remove : t -> key:key -> unit M.t
Expand Down Expand Up @@ -95,6 +97,8 @@ module Make_mock

let get t ~key = Key.Table.find t key

let get_batch t ~keys = List.map keys ~f:(Key.Table.find t)

let set = Key.Table.set

let remove t ~key = Key.Table.remove t key
Expand Down
5 changes: 5 additions & 0 deletions src/lib/merkle_ledger/any_ledger.ml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ module Make_base (Inputs : Inputs_intf) :

let get (T ((module Base), t)) = Base.get t

let get_batch (T ((module Base), t)) = Base.get_batch t

let get_uuid (T ((module Base), t)) = Base.get_uuid t

let get_directory (T ((module Base), t)) = Base.get_directory t
Expand All @@ -148,6 +150,9 @@ module Make_base (Inputs : Inputs_intf) :

let location_of_account (T ((module Base), t)) = Base.location_of_account t

let location_of_account_batch (T ((module Base), t)) =
Base.location_of_account_batch t

let fold_until (T ((module Base), t)) = Base.fold_until t

let accounts (T ((module Base), t)) = Base.accounts t
Expand Down
9 changes: 8 additions & 1 deletion src/lib/merkle_ledger/base_ledger_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ module type S = sig
module Path : Merkle_path.S with type hash := hash

module Location : sig
type t [@@deriving sexp, compare, hash, eq]
type t [@@deriving sexp, compare, hash]

include Comparable.S with type t := t
end

include
Expand Down Expand Up @@ -95,6 +97,9 @@ module type S = sig

val location_of_account : t -> account_id -> Location.t option

val location_of_account_batch :
t -> account_id list -> (account_id * Location.t option) list

(** This may return an error if the ledger is full. *)
val get_or_create_account :
t -> account_id -> account -> ([`Added | `Existed] * Location.t) Or_error.t
Expand All @@ -112,6 +117,8 @@ module type S = sig

val get : t -> Location.t -> account option

val get_batch : t -> Location.t list -> (Location.t * account option) list

val set : t -> Location.t -> account -> unit

val set_batch : t -> (Location.t * account) list -> unit
Expand Down

0 comments on commit 31e23f7

Please sign in to comment.