Skip to content

Commit

Permalink
Merge branch 'develop' into issue1393
Browse files Browse the repository at this point in the history
  • Loading branch information
oxarbitrage authored Dec 15, 2018
2 parents 115ed51 + 1471c05 commit 0171861
Show file tree
Hide file tree
Showing 46 changed files with 733 additions and 420 deletions.
13 changes: 7 additions & 6 deletions libraries/app/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,18 @@ namespace graphene { namespace app {
}
}

void network_broadcast_api::broadcast_transaction(const signed_transaction& trx)
void network_broadcast_api::broadcast_transaction(const precomputable_transaction& trx)
{
trx.validate();
_app.chain_database()->precompute_parallel( trx ).wait();
_app.chain_database()->push_transaction(trx);
if( _app.p2p_node() != nullptr )
_app.p2p_node()->broadcast_transaction(trx);
}

fc::variant network_broadcast_api::broadcast_transaction_synchronous(const signed_transaction& trx)
fc::variant network_broadcast_api::broadcast_transaction_synchronous(const precomputable_transaction& trx)
{
fc::promise<fc::variant>::ptr prom( new fc::promise<fc::variant>() );
broadcast_transaction_with_callback( [=]( const fc::variant& v ){
broadcast_transaction_with_callback( [prom]( const fc::variant& v ){
prom->set_value(v);
}, trx );

Expand All @@ -179,14 +179,15 @@ namespace graphene { namespace app {

void network_broadcast_api::broadcast_block( const signed_block& b )
{
_app.chain_database()->precompute_parallel( b ).wait();
_app.chain_database()->push_block(b);
if( _app.p2p_node() != nullptr )
_app.p2p_node()->broadcast( net::block_message( b ));
}

void network_broadcast_api::broadcast_transaction_with_callback(confirmation_callback cb, const signed_transaction& trx)
void network_broadcast_api::broadcast_transaction_with_callback(confirmation_callback cb, const precomputable_transaction& trx)
{
trx.validate();
_app.chain_database()->precompute_parallel( trx ).wait();
_callbacks[trx.id()] = cb;
_app.chain_database()->push_transaction(trx);
if( _app.p2p_node() != nullptr )
Expand Down
51 changes: 40 additions & 11 deletions libraries/app/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <graphene/app/application.hpp>
#include <graphene/app/plugin.hpp>

#include <graphene/chain/db_with.hpp>
#include <graphene/chain/genesis_state.hpp>
#include <graphene/chain/protocol/fee_schedule.hpp>
#include <graphene/chain/protocol/types.hpp>
Expand Down Expand Up @@ -394,12 +395,34 @@ void application_impl::startup()
_chain_db->enable_standby_votes_tracking( _options->at("enable-standby-votes-tracking").as<bool>() );
}

if( _options->count("replay-blockchain") )
if( _options->count("replay-blockchain") || _options->count("revalidate-blockchain") )
_chain_db->wipe( _data_dir / "blockchain", false );

try
{
_chain_db->open( _data_dir / "blockchain", initial_state, GRAPHENE_CURRENT_DB_VERSION );
// these flags are used in open() only, i. e. during replay
uint32_t skip;
if( _options->count("revalidate-blockchain") ) // see also handle_block()
{
if( !loaded_checkpoints.empty() )
wlog( "Warning - revalidate will not validate before last checkpoint" );
if( _options->count("force-validate") )
skip = graphene::chain::database::skip_nothing;
else
skip = graphene::chain::database::skip_transaction_signatures;
}
else // no revalidate, skip most checks
skip = graphene::chain::database::skip_witness_signature |
graphene::chain::database::skip_block_size_check |
graphene::chain::database::skip_merkle_check |
graphene::chain::database::skip_transaction_signatures |
graphene::chain::database::skip_transaction_dupe_check |
graphene::chain::database::skip_tapos_check |
graphene::chain::database::skip_witness_schedule_check;

graphene::chain::detail::with_skip_flags( *_chain_db, skip, [this,&initial_state] () {
_chain_db->open( _data_dir / "blockchain", initial_state, GRAPHENE_CURRENT_DB_VERSION );
});
}
catch( const fc::exception& e )
{
Expand Down Expand Up @@ -517,13 +540,17 @@ bool application_impl::handle_block(const graphene::net::block_message& blk_msg,
FC_ASSERT( (latency.count()/1000) > -5000, "Rejecting block with timestamp in the future" );

try {
// TODO: in the case where this block is valid but on a fork that's too old for us to switch to,
// you can help the network code out by throwing a block_older_than_undo_history exception.
// when the net code sees that, it will stop trying to push blocks from that chain, but
// leave that peer connected so that they can get sync blocks from us
bool result = _chain_db->push_block( blk_msg.block,
(_is_block_producer | _force_validate) ?
database::skip_nothing : database::skip_transaction_signatures );
const uint32_t skip = (_is_block_producer | _force_validate) ?
database::skip_nothing : database::skip_transaction_signatures;
bool result = valve.do_serial( [this,&blk_msg,skip] () {
_chain_db->precompute_parallel( blk_msg.block, skip ).wait();
}, [this,&blk_msg,skip] () {
// TODO: in the case where this block is valid but on a fork that's too old for us to switch to,
// you can help the network code out by throwing a block_older_than_undo_history exception.
// when the net code sees that, it will stop trying to push blocks from that chain, but
// leave that peer connected so that they can get sync blocks from us
return _chain_db->push_block( blk_msg.block, skip );
});

// the block was accepted, so we now know all of the transactions contained in the block
if (!sync_mode)
Expand Down Expand Up @@ -573,6 +600,7 @@ void application_impl::handle_transaction(const graphene::net::trx_message& tran
trx_count = 0;
}

_chain_db->precompute_parallel( transaction_message.trx ).wait();
_chain_db->push_transaction( transaction_message.trx );
} FC_CAPTURE_AND_RETHROW( (transaction_message) ) }

Expand Down Expand Up @@ -961,9 +989,10 @@ void application::set_program_options(boost::program_options::options_descriptio
"Path to create a Genesis State at. If a well-formed JSON file exists at the path, it will be parsed and any "
"missing fields in a Genesis State will be added, and any unknown fields will be removed. If no file or an "
"invalid file is found, it will be replaced with an example Genesis State.")
("replay-blockchain", "Rebuild object graph by replaying all blocks")
("replay-blockchain", "Rebuild object graph by replaying all blocks without validation")
("revalidate-blockchain", "Rebuild object graph by replaying all blocks with full validation")
("resync-blockchain", "Delete all blocks and re-sync with network from scratch")
("force-validate", "Force validation of all transactions")
("force-validate", "Force validation of all transactions during normal operation")
("genesis-timestamp", bpo::value<uint32_t>(),
"Replace timestamp from genesis.json with current time plus this many seconds (experts only!)")
;
Expand Down
4 changes: 4 additions & 0 deletions libraries/app/application_impl.hxx
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <fc/network/http/websocket.hpp>
#include <fc/thread/parallel.hpp>

#include <graphene/app/application.hpp>
#include <graphene/app/api_access.hpp>
#include <graphene/chain/genesis_state.hpp>
Expand Down Expand Up @@ -194,6 +196,8 @@ class application_impl : public net::node_delegate
std::map<string, std::shared_ptr<abstract_plugin>> _available_plugins;

bool _is_finished_syncing = false;
private:
fc::serial_valve valve;
};

}}} // namespace graphene namespace app namespace detail
6 changes: 3 additions & 3 deletions libraries/app/database_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ vector<vector<account_id_type>> database_api_impl::get_key_references( vector<pu
auto itr = refs.account_to_address_memberships.find(a);
if( itr != refs.account_to_address_memberships.end() )
{
result.reserve( itr->second.size() );
result.reserve( result.size() + itr->second.size() );
for( auto item : itr->second )
{
result.push_back(item);
Expand All @@ -587,7 +587,7 @@ vector<vector<account_id_type>> database_api_impl::get_key_references( vector<pu

if( itr != refs.account_to_key_memberships.end() )
{
result.reserve( itr->second.size() );
result.reserve( result.size() + itr->second.size() );
for( auto item : itr->second ) result.push_back(item);
}
final_result.emplace_back( std::move(result) );
Expand Down Expand Up @@ -2448,7 +2448,7 @@ void database_api_impl::on_applied_block()
}
if( market.valid() && _market_subscriptions.count(*market) )
// FIXME this may cause fill_order_operation be pushed before order creation
subscribed_markets_ops[*market].emplace_back( std::move( std::make_pair( op.op, op.result ) ) );
subscribed_markets_ops[*market].emplace_back(std::make_pair(op.op, op.result));
}
/// we need to ensure the database_api is not deleted for the life of the async operation
auto capture_this = shared_from_this();
Expand Down
6 changes: 3 additions & 3 deletions libraries/app/include/graphene/app/api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,19 @@ namespace graphene { namespace app {
* The transaction will be checked for validity in the local database prior to broadcasting. If it fails to
* apply locally, an error will be thrown and the transaction will not be broadcast.
*/
void broadcast_transaction(const signed_transaction& trx);
void broadcast_transaction(const precomputable_transaction& trx);

/** this version of broadcast transaction registers a callback method that will be called when the transaction is
* included into a block. The callback method includes the transaction id, block number, and transaction number in the
* block.
*/
void broadcast_transaction_with_callback( confirmation_callback cb, const signed_transaction& trx);
void broadcast_transaction_with_callback( confirmation_callback cb, const precomputable_transaction& trx);

/** this version of broadcast transaction registers a callback method that will be called when the transaction is
* included into a block. The callback method includes the transaction id, block number, and transaction number in the
* block.
*/
fc::variant broadcast_transaction_synchronous(const signed_transaction& trx);
fc::variant broadcast_transaction_synchronous(const precomputable_transaction& trx);

/**
* @brief Broadcast a signed block to the network
Expand Down
6 changes: 3 additions & 3 deletions libraries/chain/account_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ set<account_id_type> account_member_index::get_account_members(const account_obj
result.insert(auth.first);
return result;
}
set<public_key_type, account_member_index::key_compare> account_member_index::get_key_members(const account_object& a)const
set<public_key_type, pubkey_comparator> account_member_index::get_key_members(const account_object& a)const
{
set<public_key_type, key_compare> result;
set<public_key_type, pubkey_comparator> result;
for( auto auth : a.owner.key_auths )
result.insert(auth.first);
for( auto auth : a.active.key_auths )
Expand Down Expand Up @@ -215,7 +215,7 @@ void account_member_index::object_modified(const object& after)


{
set<public_key_type, key_compare> after_key_members = get_key_members(a);
set<public_key_type, pubkey_comparator> after_key_members = get_key_members(a);

vector<public_key_type> removed; removed.reserve(before_key_members.size());
std::set_difference(before_key_members.begin(), before_key_members.end(),
Expand Down
99 changes: 79 additions & 20 deletions libraries/chain/db_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <graphene/chain/exceptions.hpp>
#include <graphene/chain/evaluator.hpp>

#include <fc/thread/parallel.hpp>
#include <fc/smart_ref_impl.hpp>

namespace graphene { namespace chain {
Expand Down Expand Up @@ -227,7 +228,7 @@ bool database::_push_block(const signed_block& new_block)
* queues full as well, it will be kept in the queue to be propagated later when a new block flushes out the pending
* queues.
*/
processed_transaction database::push_transaction( const signed_transaction& trx, uint32_t skip )
processed_transaction database::push_transaction( const precomputable_transaction& trx, uint32_t skip )
{ try {
processed_transaction result;
detail::with_skip_flags( *this, skip, [&]()
Expand All @@ -237,7 +238,7 @@ processed_transaction database::push_transaction( const signed_transaction& trx,
return result;
} FC_CAPTURE_AND_RETHROW( (trx) ) }

processed_transaction database::_push_transaction( const signed_transaction& trx )
processed_transaction database::_push_transaction( const precomputable_transaction& trx )
{
// If this is the first transaction pushed after applying a block, start a new undo session.
// This allows us to quickly rewind to the clean state of the head block, in case a new block arrives.
Expand Down Expand Up @@ -465,15 +466,17 @@ signed_block database::_generate_block(
void database::pop_block()
{ try {
_pending_tx_session.reset();
auto head_id = head_block_id();
optional<signed_block> head_block = fetch_block_by_id( head_id );
GRAPHENE_ASSERT( head_block.valid(), pop_empty_chain, "there are no blocks to pop" );

_fork_db.pop_block();
auto fork_db_head = _fork_db.head();
FC_ASSERT( fork_db_head, "Trying to pop() from empty fork database!?" );
if( fork_db_head->id == head_block_id() )
_fork_db.pop_block();
else
{
fork_db_head = _fork_db.fetch_block( head_block_id() );
FC_ASSERT( fork_db_head, "Trying to pop() block that's not in fork database!?" );
}
pop_undo();

_popped_tx.insert( _popped_tx.begin(), head_block->transactions.begin(), head_block->transactions.end() );

_popped_tx.insert( _popped_tx.begin(), fork_db_head->data.transactions.begin(), fork_db_head->data.transactions.end() );
} FC_CAPTURE_AND_RETHROW() }

void database::clear_pending()
Expand Down Expand Up @@ -621,22 +624,17 @@ processed_transaction database::_apply_transaction(const signed_transaction& trx
{ try {
uint32_t skip = get_node_properties().skip_flags;

if( true || !(skip&skip_validate) ) /* issue #505 explains why this skip_flag is disabled */
trx.validate();
trx.validate();

auto& trx_idx = get_mutable_index_type<transaction_index>();
const chain_id_type& chain_id = get_chain_id();
transaction_id_type trx_id;
if( !(skip & skip_transaction_dupe_check) )
{
trx_id = trx.id();
FC_ASSERT( trx_idx.indices().get<by_trx_id>().find(trx_id) == trx_idx.indices().get<by_trx_id>().end() );
}
FC_ASSERT( trx_idx.indices().get<by_trx_id>().find(trx.id()) == trx_idx.indices().get<by_trx_id>().end() );
transaction_evaluation_state eval_state(this);
const chain_parameters& chain_parameters = get_global_properties().parameters;
eval_state._trx = &trx;

if( !(skip & (skip_transaction_signatures | skip_authority_check) ) )
if( !(skip & skip_transaction_signatures) )
{
auto get_active = [&]( account_id_type id ) { return &id(*this).active; };
auto get_owner = [&]( account_id_type id ) { return &id(*this).owner; };
Expand Down Expand Up @@ -665,8 +663,8 @@ processed_transaction database::_apply_transaction(const signed_transaction& trx
//Insert transaction into unique transactions database.
if( !(skip & skip_transaction_dupe_check) )
{
create<transaction_object>([&trx_id,&trx](transaction_object& transaction) {
transaction.trx_id = trx_id;
create<transaction_object>([&trx](transaction_object& transaction) {
transaction.trx_id = trx.id();
transaction.trx = trx;
});
}
Expand Down Expand Up @@ -750,4 +748,65 @@ bool database::before_last_checkpoint()const
return (_checkpoints.size() > 0) && (_checkpoints.rbegin()->first >= head_block_num());
}


static const uint32_t skip_expensive = database::skip_transaction_signatures | database::skip_witness_signature
| database::skip_merkle_check | database::skip_transaction_dupe_check;

template<typename Trx>
void database::_precompute_parallel( const Trx* trx, const size_t count, const uint32_t skip )const
{
for( size_t i = 0; i < count; ++i, ++trx )
{
trx->validate(); // TODO - parallelize wrt confidential operations
if( !(skip&skip_transaction_dupe_check) )
trx->id();
if( !(skip&skip_transaction_signatures) )
trx->get_signature_keys( get_chain_id() );
}
}

fc::future<void> database::precompute_parallel( const signed_block& block, const uint32_t skip )const
{ try {
std::vector<fc::future<void>> workers;
if( !block.transactions.empty() )
{
if( (skip & skip_expensive) == skip_expensive )
_precompute_parallel( &block.transactions[0], block.transactions.size(), skip );
else
{
uint32_t chunks = fc::asio::default_io_service_scope::get_num_threads();
uint32_t chunk_size = ( block.transactions.size() + chunks - 1 ) / chunks;
workers.reserve( chunks + 1 );
for( size_t base = 0; base < block.transactions.size(); base += chunk_size )
workers.push_back( fc::do_parallel( [this,&block,base,chunk_size,skip] () {
_precompute_parallel( &block.transactions[base],
base + chunk_size < block.transactions.size() ? chunk_size : block.transactions.size() - base,
skip );
}) );
}
}

if( !(skip&skip_witness_signature) )
workers.push_back( fc::do_parallel( [&block] () { block.signee(); } ) );
if( !(skip&skip_merkle_check) )
block.calculate_merkle_root();
block.id();

if( workers.empty() )
return fc::future< void >( fc::promise< void >::ptr( new fc::promise< void >( true ) ) );

auto first = workers.begin();
auto worker = first;
while( ++worker != workers.end() )
worker->wait();
return *first;
} FC_LOG_AND_RETHROW() }

fc::future<void> database::precompute_parallel( const precomputable_transaction& trx )const
{
return fc::do_parallel([this,&trx] () {
_precompute_parallel( &trx, 1, skip_nothing );
});
}

} }
Loading

0 comments on commit 0171861

Please sign in to comment.