Skip to content

Commit

Permalink
add logging to nanobit (#85)
Browse files Browse the repository at this point in the history
* add blockchain logging to nanobit

* switch to logger for errors
  • Loading branch information
es92 committed Feb 16, 2018
1 parent 973a473 commit 810e269
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 30 deletions.
5 changes: 3 additions & 2 deletions app/nanobit/src/blockchain_accumulator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ module Update = struct
| New_chain of Blockchain.t
end

let accumulate ~init ~prover ~updates ~strongest_chain =
let accumulate ~init ~parent_log ~prover ~updates ~strongest_chain =
let log = Logger.child parent_log "blockchain_accumulator" in
don't_wait_for begin
let%map _last_block =
Linear_pipe.fold updates ~init ~f:(fun chain (Update.New_chain new_chain) ->
match%bind Prover.verify prover new_chain with
| Error e ->
eprintf "%s\n" (Error.to_string_hum (Error.tag e ~tag:"prover verify failed"));
Logger.error log "%s" (Error.to_string_hum (Error.tag e ~tag:"prover verify failed"));
return chain
| Ok false -> return chain
| Ok true ->
Expand Down
1 change: 1 addition & 0 deletions app/nanobit/src/blockchain_accumulator.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ end

val accumulate
: init:Blockchain.t
-> parent_log:Logger.t
-> prover:Prover.t
-> updates:Update.t Linear_pipe.Reader.t
-> strongest_chain:Blockchain.t Linear_pipe.Writer.t
Expand Down
2 changes: 2 additions & 0 deletions app/nanobit/src/cli.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ let daemon =
| None -> Find_ip.find ()
| Some ip -> return ip
in
let log = Logger.create () in
Main.main
log
prover
(conf_dir ^/ "storage")
{ Blockchain.state = Blockchain.State.zero; proof = genesis_proof }
Expand Down
30 changes: 19 additions & 11 deletions app/nanobit/src/gossip_net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module type S =

type t =
{ timeout : Time.Span.t
; log : Logger.t
; target_peer_count : int
; new_peer_reader : Peer.t Linear_pipe.Reader.t
; broadcast_writer : Message.msg Linear_pipe.Writer.t
Expand All @@ -35,6 +36,7 @@ module type S =
val create
: Peer.Event.t Linear_pipe.Reader.t
-> Params.t
-> Logger.t
-> unit Rpc.Implementation.t list
-> t

Expand Down Expand Up @@ -64,6 +66,7 @@ module Make (Message : Message_intf) = struct

type t =
{ timeout : Time.Span.t
; log : Logger.t
; target_peer_count : int
; new_peer_reader : Peer.t Linear_pipe.Reader.t
; broadcast_writer : Message.msg Linear_pipe.Writer.t
Expand Down Expand Up @@ -102,30 +105,32 @@ module Make (Message : Message_intf) = struct
| Ok Error exn -> Error exn
| Error exn -> Or_error.of_exn exn

let broadcast_selected timeout peers msg =
let broadcast_selected t peers msg =
let send peer =
try_call_rpc
peer timeout (fun conn m -> return (Message.dispatch_multi conn m)) msg
peer t.timeout (fun conn m -> return (Message.dispatch_multi conn m)) msg
in
Deferred.List.iter
~how:`Parallel
peers
~f:(fun p -> match%map (send p) with
| Ok () -> ()
| Error e -> eprintf "%s\n" (Error.to_string_hum e))
| Error e -> Logger.error t.log "%s" (Error.to_string_hum e))
;;

let broadcast_random timeout peers n msg =
let selected_peers = random_sublist (Hash_set.to_list peers) n in
broadcast_selected timeout selected_peers msg
let broadcast_random t n msg =
let selected_peers = random_sublist (Hash_set.to_list t.peers) n in
broadcast_selected t selected_peers msg
;;

let create (peer_events : Peer.Event.t Linear_pipe.Reader.t) (params : Params.t) implementations =
let create (peer_events : Peer.Event.t Linear_pipe.Reader.t) (params : Params.t) parent_log implementations =
let log = Logger.child parent_log "gossip_net" in
let new_peer_reader, new_peer_writer = Linear_pipe.create () in
let broadcast_reader, broadcast_writer = Linear_pipe.create () in
let received_reader, received_writer = Linear_pipe.create () in
let t =
{ timeout = params.timeout
; log
; target_peer_count = params.target_peer_count
; new_peer_reader
; broadcast_writer
Expand All @@ -137,7 +142,9 @@ module Make (Message : Message_intf) = struct
Linear_pipe.iter_unordered
~max_concurrency:64
broadcast_reader
~f:(fun m -> broadcast_random t.timeout t.peers t.target_peer_count m)
~f:(fun m ->
Logger.trace log "broadcasting message";
broadcast_random t t.target_peer_count m)
end;
let broadcast_received_capacity = 64 in
let implementations =
Expand Down Expand Up @@ -166,7 +173,7 @@ module Make (Message : Message_intf) = struct
end;
ignore begin
Tcp.Server.create
~on_handler_error:(`Call (fun net exn -> eprintf "%s\n" (Exn.to_string_mach exn)))
~on_handler_error:(`Call (fun net exn -> Logger.error log "%s" (Exn.to_string_mach exn)))
(Tcp.Where_to_listen.of_port (Host_and_port.port params.address))
(fun address reader writer ->
Rpc.Connection.server_with_close
Expand All @@ -175,7 +182,7 @@ module Make (Message : Message_intf) = struct
~connection_state:(fun _ -> ())
~on_handshake_error:
(`Call (fun exn ->
eprintf "%s\n" (Exn.to_string_mach exn);
Logger.error log "%s" (Exn.to_string_mach exn);
Deferred.unit)))
end;
t
Expand All @@ -192,10 +199,11 @@ module Make (Message : Message_intf) = struct
fun () ->
let selected = List.take !to_broadcast t.target_peer_count in
to_broadcast := List.drop !to_broadcast t.target_peer_count;
let%map () = broadcast_selected t.timeout selected msg in
let%map () = broadcast_selected t selected msg in
if List.length !to_broadcast = 0 then `Done else `Continue)

let query_peer t (peer : Peer.t) rpc query =
Logger.trace t.log "querying peer" ~attrs:[ ("peer", [%sexp_of: Peer.t] peer) ];
try_call_rpc peer t.timeout rpc query

let query_random_peers t n rpc query =
Expand Down
2 changes: 2 additions & 0 deletions app/nanobit/src/gossip_net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module type S =

type t =
{ timeout : Time.Span.t
; log : Logger.t
; target_peer_count : int
; new_peer_reader : Peer.t Linear_pipe.Reader.t
; broadcast_writer : Message.msg Linear_pipe.Writer.t
Expand All @@ -35,6 +36,7 @@ module type S =
val create
: Peer.Event.t Linear_pipe.Reader.t
-> Params.t
-> Logger.t
-> unit Rpc.Implementation.t list
-> t

Expand Down
1 change: 1 addition & 0 deletions app/nanobit/src/jbuild
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
swimlib
nanobit_base
camlsnark
logger
async
async_extra
async_ssl
Expand Down
77 changes: 70 additions & 7 deletions app/nanobit/src/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct
module Gossip_net = Gossip_net(Message)
module Swim = Swim

let peer_strongest_blocks gossip_net
let peer_strongest_blocks gossip_net log
: Blockchain_accumulator.Update.t Linear_pipe.Reader.t
=
let from_new_peers_reader, from_new_peers_writer = Linear_pipe.create () in
Expand All @@ -90,7 +90,7 @@ struct
(Gossip_net.query_random_peers gossip_net fetch_peer_count Rpcs.Get_strongest_block.dispatch_multi ())
~f:(fun x -> match%bind x with
| Ok b -> Pipe.write from_new_peers_writer (Blockchain_accumulator.Update.New_chain b)
| Error e -> eprintf "%s\n" (Error.to_string_hum e); return ())
| Error e -> Logger.error log "%s" (Error.to_string_hum e); return ())
in
timer ()
in
Expand Down Expand Up @@ -137,9 +137,64 @@ struct
; body_changes_writer
}

let init_pipes_with_log log : pipes =
let strongest_block_reader, strongest_block_writer = Linear_pipe.create () in
let gossip_net_strongest_block_reader,
gossip_net_strongest_block_propagator,
body_changes_strongest_block_reader,
storage_strongest_block_reader,
latest_strongest_block_reader,
log_strongest_block_reader
= Linear_pipe.fork6 strongest_block_reader in
let body_changes_reader, body_changes_writer = Linear_pipe.create () in
let body_changes_reader, log_body_changes_reader = Linear_pipe.fork2 body_changes_reader in
don't_wait_for begin
let last_time = ref None in
Linear_pipe.iter
log_strongest_block_reader
~f:(fun blockchain ->
let state = blockchain.Blockchain.state in
let number = state.Blockchain_state.number in
let time = state.Blockchain_state.previous_time in
let target = (Nanobit_base.Target.to_bigint state.Blockchain_state.target) in
let strength = Bignum.Bigint.((Nanobit_base.Target.to_bigint Nanobit_base.Target.max) / target) in
let diff =
match !last_time with
| None -> ""
| Some previous_time ->
let a = Block_time.to_time time in
let b = Block_time.to_time previous_time in
Time.Span.to_string (Time.diff a b)
in
Logger.info log ~attrs:[ ("number", [%sexp_of: Int64.t] number )
; ("strength", [%sexp_of: Bignum.Bigint.t] strength)
; ("mining time", [%sexp_of: String.t] diff) ]
"new strongest blockchain";
last_time := Some time;
Deferred.unit)
end;
don't_wait_for begin
Linear_pipe.iter
log_body_changes_reader
~f:(function
| Miner.Update.Change_body body ->
Logger.debug log "new block body %s" (Int64.to_string body);
Deferred.unit)
end;
{ strongest_block_writer
; gossip_net_strongest_block_reader
; gossip_net_strongest_block_propagator
; body_changes_strongest_block_reader
; storage_strongest_block_reader
; latest_strongest_block_reader
; body_changes_reader
; body_changes_writer
}

let init_gossip_net
~me
~pipes:{gossip_net_strongest_block_reader}
~log
~swim
~latest_strongest_block
~latest_mined_block
Expand Down Expand Up @@ -170,6 +225,7 @@ struct
| Disconnect peers -> Disconnect (remap_ports peers)
))
params
log
implementations
in
(* someday this could be much more sophisticated
Expand Down Expand Up @@ -212,10 +268,11 @@ struct
end;
gossip_net

let start_mining ~prover ~pipes ~initial_blockchain =
let start_mining ~prover ~parent_log ~pipes ~initial_blockchain =
let mined_blocks_reader =
Miner_impl.mine ~prover
~initial:initial_blockchain
~parent_log
~body:(Int64.succ initial_blockchain.state.number)
(Linear_pipe.merge_unordered
[ Linear_pipe.map pipes.body_changes_strongest_block_reader ~f:(fun b -> Miner.Update.Change_previous b)
Expand All @@ -225,6 +282,7 @@ struct
Linear_pipe.fork2 mined_blocks_reader

let main_nowait
log
prover
storage_location
genesis_blockchain
Expand All @@ -236,21 +294,22 @@ struct
=
let open Let_syntax in
let%map initial_blockchain =
match%map Storage.load storage_location with
match%map Storage.load storage_location log with
| Some x -> x
| None -> genesis_blockchain
in
(* TODO: fix mined_block vs mined_blocks *)
let (blockchain_mined_block_reader, latest_mined_blocks_reader) =
if should_mine then
start_mining ~prover ~pipes ~initial_blockchain
start_mining ~prover ~parent_log:log ~pipes ~initial_blockchain
else
(Linear_pipe.of_list [], Linear_pipe.of_list [])
in
let swim = Swim.connect ~config:(Swim_config.create ()) ~initial_peers ~me in
let gossip_net =
init_gossip_net
~me
~log
~pipes
~latest_strongest_block:(
Linear_pipe.latest_ref ~initial:genesis_blockchain pipes.latest_strongest_block_reader)
Expand All @@ -271,17 +330,19 @@ struct
(Linear_pipe.map pipes.storage_strongest_block_reader ~f:(fun b -> `Change_head b));
Blockchain_accumulator.accumulate
~prover
~parent_log:log
~init:initial_blockchain
~strongest_chain:pipes.strongest_block_writer
~updates:(
Linear_pipe.merge_unordered
[ peer_strongest_blocks gossip_net
[ peer_strongest_blocks gossip_net log
; Linear_pipe.map blockchain_mined_block_reader ~f:(fun b ->
Blockchain_accumulator.Update.New_chain b)
]);
swim

let main
log
prover
storage_location
genesis_blockchain
Expand All @@ -291,16 +352,18 @@ struct
?(remap_addr_port=(fun addr -> addr))
()
=
Logger.info log "starting nanobit";
let%bind _ =
main_nowait
log
prover
storage_location
genesis_blockchain
initial_peers
should_mine
me
remap_addr_port
(init_pipes ())
(init_pipes_with_log log)
in
Async.never ()
;;
Expand Down
2 changes: 2 additions & 0 deletions app/nanobit/src/main_rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ let main () =
in

let run_main _ { Rpcs.Main.start_prover; prover_port; storage_location; initial_peers; should_mine; me } =
let log = Logger.create () in
let pipes, rpc_strongest_block_reader = init_pipes () in
don't_wait_for begin
Linear_pipe.iter
Expand All @@ -110,6 +111,7 @@ let main () =
let genesis_chain = { Blockchain.state = Blockchain.State.zero; proof = genesis_proof } in
let%bind () = Main.assert_chain_verifies prover genesis_chain in
Main.main_nowait
log
prover
storage_location
{ Blockchain.state = Blockchain.State.zero; proof = genesis_proof }
Expand Down
7 changes: 5 additions & 2 deletions app/nanobit/src/miner.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ end
module type S = sig
val mine
: prover:Prover.t
-> parent_log:Logger.t
-> initial:Blockchain.t
-> body:Block.Body.t
-> Update.t Linear_pipe.Reader.t
Expand Down Expand Up @@ -41,7 +42,7 @@ module Cpu = struct
{ block0 with header = { header0 with nonce } }
in
let hash = Block.hash block in
if Target.meets_target_unchecked target ~hash && Random.int 1000 < 10
if Target.meets_target_unchecked target ~hash
then Some (block, hash)
else go (Nonce.succ nonce) (i + 1)
in
Expand All @@ -58,10 +59,12 @@ module Cpu = struct

let mine
~(prover : Prover.t)
~(parent_log : Logger.t)
~(initial : Blockchain.t)
~body
(updates : Update.t Linear_pipe.Reader.t)
=
let log = Logger.child parent_log "miner" in
let state =
{ State.previous = initial
; body
Expand Down Expand Up @@ -89,7 +92,7 @@ module Cpu = struct
state.id <- state.id + 1;
go ()
| Error e ->
eprintf "%s\n%!" Error.(to_string_hum (tag e ~tag:"Blockchain extend error"));
Logger.error log "%s" Error.(to_string_hum (tag e ~tag:"Blockchain extend error"));
go ()
end else
go ()
Expand Down

0 comments on commit 810e269

Please sign in to comment.