Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async scheduler starvation #9416

Merged
merged 15 commits into from
Oct 7, 2021
118 changes: 58 additions & 60 deletions src/app/cli/src/init/coda_run.ml
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ let setup_local_server ?(client_trustlist = []) ?rest_server_port
server_description port )
in
Option.iter rest_server_port ~f:(fun rest_server_port ->
trace_task "REST server" (fun () ->
trace_task "full GraphQL server" (fun () ->
create_graphql_server
~bind_to_address:
Tcp.Bind_to_address.(
Expand All @@ -502,7 +502,7 @@ let setup_local_server ?(client_trustlist = []) ?rest_server_port
rest_server_port ) ) ;
(*Second graphql server with limited queries exopsed*)
Option.iter limited_graphql_port ~f:(fun rest_server_port ->
trace_task "Second REST server (with limited queries)" (fun () ->
trace_task "limited GraphQL server" (fun () ->
create_graphql_server
~bind_to_address:
Tcp.Bind_to_address.(
Expand All @@ -514,64 +514,62 @@ let setup_local_server ?(client_trustlist = []) ?rest_server_port
Tcp.Where_to_listen.bind_to All_addresses
(On_port (Mina_lib.client_port coda))
in
don't_wait_for
(Deferred.ignore
(trace "client RPC handling" (fun () ->
Tcp.Server.create
~on_handler_error:
(`Call
(fun _net exn ->
[%log error]
"Exception while handling TCP server request: $error"
~metadata:
[ ("error", `String (Exn.to_string_mach exn))
; ("context", `String "rpc_tcp_server") ] ))
where_to_listen
(fun address reader writer ->
let address = Socket.Address.Inet.addr address in
if
not
(Set.exists !client_trustlist ~f:(fun cidr ->
Unix.Cidr.does_match cidr address ))
then (
[%log error]
!"Rejecting client connection from $address, it is not \
present in the trustlist."
~metadata:
[("$address", `String (Unix.Inet_addr.to_string address))] ;
Deferred.unit )
else
Rpc.Connection.server_with_close
~handshake_timeout:
(Time.Span.of_sec
Mina_compile_config.rpc_handshake_timeout_sec)
~heartbeat_config:
(Rpc.Connection.Heartbeat_config.create
~timeout:
(Time_ns.Span.of_sec
Mina_compile_config.rpc_heartbeat_timeout_sec)
~send_every:
(Time_ns.Span.of_sec
Mina_compile_config.rpc_heartbeat_send_every_sec))
reader writer
~implementations:
(Rpc.Implementations.create_exn
~implementations:(client_impls @ snark_worker_impls)
~on_unknown_rpc:`Raise)
~connection_state:(fun _ -> ())
~on_handshake_error:
(`Call
(fun exn ->
[%log warn]
"Handshake error while handling RPC server \
request from $address"
~metadata:
[ ("error", `String (Exn.to_string_mach exn))
; ("context", `String "rpc_server")
; ( "address"
, `String (Unix.Inet_addr.to_string address) )
] ;
Deferred.unit )) ) )))
trace_task "client RPC server" (fun () ->
Deferred.ignore
(Tcp.Server.create
~on_handler_error:
(`Call
(fun _net exn ->
[%log error]
"Exception while handling TCP server request: $error"
~metadata:
[ ("error", `String (Exn.to_string_mach exn))
; ("context", `String "rpc_tcp_server") ] ))
where_to_listen
(fun address reader writer ->
let address = Socket.Address.Inet.addr address in
if
not
(Set.exists !client_trustlist ~f:(fun cidr ->
Unix.Cidr.does_match cidr address ))
then (
[%log error]
!"Rejecting client connection from $address, it is not \
present in the trustlist."
~metadata:
[("$address", `String (Unix.Inet_addr.to_string address))] ;
Deferred.unit )
else
Rpc.Connection.server_with_close
~handshake_timeout:
(Time.Span.of_sec
Mina_compile_config.rpc_handshake_timeout_sec)
~heartbeat_config:
(Rpc.Connection.Heartbeat_config.create
~timeout:
(Time_ns.Span.of_sec
Mina_compile_config.rpc_heartbeat_timeout_sec)
~send_every:
(Time_ns.Span.of_sec
Mina_compile_config.rpc_heartbeat_send_every_sec))
reader writer
~implementations:
(Rpc.Implementations.create_exn
~implementations:(client_impls @ snark_worker_impls)
~on_unknown_rpc:`Raise)
~connection_state:(fun _ -> ())
~on_handshake_error:
(`Call
(fun exn ->
[%log warn]
"Handshake error while handling RPC server request \
from $address"
~metadata:
[ ("error", `String (Exn.to_string_mach exn))
; ("context", `String "rpc_server")
; ( "address"
, `String (Unix.Inet_addr.to_string address) ) ] ;
Deferred.unit )) )) )

let coda_crash_message ~log_issue ~action ~error =
let followup =
Expand Down
56 changes: 56 additions & 0 deletions src/external/ocaml-rocksdb/rocks.ml
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,62 @@ and RocksDb : (Rocks_intf.ROCKS with type batch := WriteBatch.t) = struct
| None ->
ReadOptions.with_t inner

let multi_get_raw =
foreign "rocksdb_multi_get"
( t
@-> ReadOptions.t
@-> Views.int_to_size_t
@-> ptr (ptr char)
@-> ptr size_t
@-> ptr (ptr char)
@-> ptr size_t
@-> ptr (ptr char)
@-> returning void )

let multi_get ?opts t keys =
let inner opts =
let cast_array_ptr typ arr = arr |> bigarray_start array1 |> to_voidp |> from_voidp typ in
let count = List.length keys in
(* could optimize this into a single allocation *)
let inputs = Bigarray.(Array1.create Nativeint C_layout) count in
let input_lengths = Bigarray.(Array1.create Nativeint C_layout) count in
let outputs = Bigarray.(Array1.create Nativeint C_layout) count in
let output_lengths = Bigarray.(Array1.create Nativeint C_layout) count in
let errors = Bigarray.(Array1.create Nativeint C_layout) count in
keys |> List.iteri (fun i key ->
mrmr1993 marked this conversation as resolved.
Show resolved Hide resolved
Bigarray.Array1.unsafe_set inputs i (bigarray_start array1 key |> to_voidp |> raw_address_of_ptr) ;
Bigarray.Array1.unsafe_set input_lengths i (Bigarray.Array1.dim key |> Nativeint.of_int) ) ;
multi_get_raw t opts count
(cast_array_ptr (ptr char) inputs)
mrmr1993 marked this conversation as resolved.
Show resolved Hide resolved
(cast_array_ptr size_t input_lengths)
(cast_array_ptr (ptr char) outputs)
(cast_array_ptr size_t output_lengths)
(cast_array_ptr (ptr char) errors) ;
keep_alive (inputs, input_lengths, keys) ;
let results = ref [] in
for i = count - 1 downto 0 do
let res =
let error_ptr = Bigarray.Array1.unsafe_get errors i |> ptr_of_raw_address in
if is_null error_ptr then (
let output_length = Nativeint.to_int (Bigarray.Array1.unsafe_get output_lengths i) in
let output_ptr = Bigarray.Array1.unsafe_get outputs i |> ptr_of_raw_address |> from_voidp char in
let output = bigarray_of_ptr array1 output_length Bigarray.char output_ptr in
Gc.finalise_last (fun () -> free (to_voidp output_ptr)) output ;
Some output )
else (
free error_ptr ;
None )
in
results := res :: !results
done ;
!results
in
match opts with
| Some opts ->
inner opts
| None ->
ReadOptions.with_t inner

let flush_raw =
foreign "rocksdb_flush" (t @-> FlushOptions.t @-> returning_error void)

Expand Down
7 changes: 7 additions & 0 deletions src/external/ocaml-rocksdb/rocks_common.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
open Ctypes
open Foreign

(* Trick taken from Janestreet's core_kernel library.
* This function guarantees `o` will not be garbage collected.
* It is safer than using `ignore` to provide this guarantee, as
* the compiler won't optimize int_of_string away and will not perform
* constant folding on invocations of `keep_alive` *)
let rec keep_alive o = if Sys.opaque_identity (int_of_string "0") <> 0 then keep_alive (Sys.opaque_identity o)

module Views = struct
open Unsigned

Expand Down
3 changes: 3 additions & 0 deletions src/external/ocaml-rocksdb/rocks_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ module type ROCKS = sig
val get : ?pos:int -> ?len:int -> ?opts:ReadOptions.t -> t -> bigarray -> bigarray option
val get_string : ?pos:int -> ?len:int -> ?opts:ReadOptions.t -> t -> string -> string option

(* there are more efficient encodings of this... *)
val multi_get : ?opts:ReadOptions.t -> t -> bigarray list -> bigarray option list

val put : ?key_pos:int -> ?key_len:int -> ?value_pos:int -> ?value_len:int -> ?opts:WriteOptions.t -> t -> bigarray -> bigarray -> unit
val put_string : ?key_pos:int -> ?key_len:int -> ?value_pos:int -> ?value_len:int -> ?opts:WriteOptions.t -> t -> string -> string -> unit

Expand Down
23 changes: 21 additions & 2 deletions src/external/ocaml-rocksdb/rocks_options.ml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,22 @@ module BlockBasedTableOptions =
end

module Options = struct
module SliceTransform = struct
type t = Rocks_common.t

let t = Rocks_common.t

module Noop = struct
include CreateConstructors(struct
let super_name = "slicetransform"
let name = "noop"
let constructor = "rocksdb_" ^ super_name ^ "_create_" ^ name
let destructor = "rocksdb_" ^ super_name ^ "_destroy"
let setter_prefix = "rocksdb_" ^ super_name ^ "_" ^ name ^ "_"
end)
end
end

(* extern rocksdb_options_t* rocksdb_options_create(); *)
(* extern void rocksdb_options_destroy(rocksdb_options_t*\); *)
module C = CreateConstructors_(struct let name = "options" end)
Expand Down Expand Up @@ -210,8 +226,11 @@ module Options = struct

(* extern void rocksdb_options_set_compression_options( *)
(* rocksdb_options_t*, int, int, int); *)
(* extern void rocksdb_options_set_prefix_extractor( *)
(* rocksdb_options_t*, rocksdb_slicetransform_t*\); *)

(* extern void rocksdb_options_set_prefix_extractor(rocksdb_options_t* opt, rocksdb_slicetransform_t* trans); *)
let set_prefix_extractor =
create_setter "set_prefix_extractor" SliceTransform.t

(* extern void rocksdb_options_set_num_levels(rocksdb_options_t*, int); *)
(* extern void rocksdb_options_set_level0_file_num_compaction_trigger( *)
(* rocksdb_options_t*, int); *)
Expand Down
4 changes: 4 additions & 0 deletions src/lib/key_value_database/key_value_database.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ module Intf = struct

val get : t -> key:key -> value option M.t

val get_batch : t -> keys:key list -> value option list M.t

val set : t -> key:key -> data:value -> unit M.t

val remove : t -> key:key -> unit M.t
Expand Down Expand Up @@ -95,6 +97,8 @@ module Make_mock

let get t ~key = Key.Table.find t key

let get_batch t ~keys = List.map keys ~f:(Key.Table.find t)

let set = Key.Table.set

let remove t ~key = Key.Table.remove t key
Expand Down
5 changes: 5 additions & 0 deletions src/lib/merkle_ledger/any_ledger.ml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ module Make_base (Inputs : Inputs_intf) :

let get (T ((module Base), t)) = Base.get t

let get_batch (T ((module Base), t)) = Base.get_batch t

let get_uuid (T ((module Base), t)) = Base.get_uuid t

let get_directory (T ((module Base), t)) = Base.get_directory t
Expand All @@ -148,6 +150,9 @@ module Make_base (Inputs : Inputs_intf) :

let location_of_account (T ((module Base), t)) = Base.location_of_account t

let location_of_account_batch (T ((module Base), t)) =
Base.location_of_account_batch t

let fold_until (T ((module Base), t)) = Base.fold_until t

let accounts (T ((module Base), t)) = Base.accounts t
Expand Down
9 changes: 8 additions & 1 deletion src/lib/merkle_ledger/base_ledger_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ module type S = sig
module Path : Merkle_path.S with type hash := hash

module Location : sig
type t [@@deriving sexp, compare, hash, eq]
type t [@@deriving sexp, compare, hash]

include Comparable.S with type t := t
end

include
Expand Down Expand Up @@ -95,6 +97,9 @@ module type S = sig

val location_of_account : t -> account_id -> Location.t option

val location_of_account_batch :
t -> account_id list -> (account_id * Location.t option) list

(** This may return an error if the ledger is full. *)
val get_or_create_account :
t -> account_id -> account -> ([`Added | `Existed] * Location.t) Or_error.t
Expand All @@ -112,6 +117,8 @@ module type S = sig

val get : t -> Location.t -> account option

val get_batch : t -> Location.t list -> (Location.t * account option) list

val set : t -> Location.t -> account -> unit

val set_batch : t -> (Location.t * account) list -> unit
Expand Down