Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
bnet progress
Browse files Browse the repository at this point in the history
  • Loading branch information
bytemaster committed May 21, 2018
1 parent 1f832dc commit 01b0ffa
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 56 deletions.
148 changes: 92 additions & 56 deletions plugins/bnet_plugin/bnet_plugin.cpp
Expand Up @@ -151,6 +151,7 @@ namespace eosio {

struct transaction_status {
bool known_by_peer = false;
bool notice_to_peer = false;
time_point received;
time_point expired;
transaction_id_type id;
Expand Down Expand Up @@ -280,18 +281,20 @@ namespace eosio {
wlog( "resolve success" );
boost::asio::async_connect( _ws->next_layer(),
results.begin(), results.end(),
std::bind( &session::on_connect,
shared_from_this(),
std::placeholders::_1 ) );
boost::asio::bind_executor( _strand,
std::bind( &session::on_connect,
shared_from_this(),
std::placeholders::_1 ) ) );
}

void on_connect( boost::system::error_code ec ) {
if( ec ) return on_fail( ec, "connect" );

_ws->async_handshake( _remote_host, "/",
boost::asio::bind_executor( _strand,
std::bind( &session::on_handshake,
shared_from_this(),
std::placeholders::_1 ) );
std::placeholders::_1 ) ) );
}

void on_handshake( boost::system::error_code ec ) {
Expand All @@ -309,7 +312,7 @@ namespace eosio {
return; /// this peer obviously knows this trx
}

idump((t->id));
//idump((t->id));
transaction_status stat;
stat.id = t->id;
stat.trx = t;
Expand Down Expand Up @@ -355,7 +358,9 @@ namespace eosio {
}
}

void on_accepted_block( block_id_type id, block_id_type prev, shared_ptr<vector<char>> msgdata ) {
void on_accepted_block( block_id_type id, block_id_type prev, shared_ptr<vector<char>> msgdata, block_state_ptr s ) {
if( !_strand.running_in_this_thread() ) { elog( "wrong strand" ); }

mark_block_known_by_peer( id );
auto itr = _block_status.find( id );

Expand All @@ -367,6 +372,17 @@ namespace eosio {
_local_head_block_id = id;
_local_head_block_num = block_header::num_from_id(id);

for( const auto& receipt: s->block->transactions ) {
if( receipt.trx.which() == 1 ) {
auto id = receipt.trx.get<packed_transaction>().id();
auto itr = _transaction_status.find( id );
_transaction_status.erase(itr);
}
}
if( _transaction_status.size() > 1000 ) {
idump((_transaction_status.size()));
}

do_send_next_message();
}

Expand Down Expand Up @@ -425,42 +441,50 @@ namespace eosio {
hello_msg.pending_block_ids = ids;

self->_local_lib = lib;
self->_state = sending_state;
self->send( hello_msg );
});
}

void send( const shared_ptr<vector<char> >& buffer ) {
try {
FC_ASSERT( !_out_buffer.size() );
_out_buffer.resize(buffer->size());
memcpy( _out_buffer.data(), buffer->data(), _out_buffer.size() );
// wdump((_out_buffer.size())(buffer->size()));
FC_ASSERT( !_out_buffer.size() );
_out_buffer.resize(buffer->size());
memcpy( _out_buffer.data(), buffer->data(), _out_buffer.size() );

_state = sending_state;
_ws->async_write( boost::asio::buffer(_out_buffer),
std::bind( &session::on_write,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2 ) );
_state = sending_state;
// wlog( "state = sending" );
_ws->async_write( boost::asio::buffer(_out_buffer),
boost::asio::bind_executor(
_strand,
std::bind( &session::on_write,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2 ) ) );
} FC_LOG_AND_RETHROW()
}


void send( const bnet_message& msg ) {
void send( const bnet_message& msg ) { try {

FC_ASSERT( !_out_buffer.size() );

auto ps = fc::raw::pack_size(msg);
_out_buffer.resize(ps);
// wdump((_out_buffer.size()));
fc::datastream<char*> ds(_out_buffer.data(), ps);
fc::raw::pack(ds, msg);

_state = sending_state;
// wlog( "state = sending" );
_ws->async_write( boost::asio::buffer(_out_buffer),
std::bind( &session::on_write,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2 ) );
}
boost::asio::bind_executor(
_strand,
std::bind( &session::on_write,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2 ) ) );
} FC_LOG_AND_RETHROW() }

void mark_block_known_by_peer( block_id_type id, bool noticed_by_peer = false, bool recv_from_peer = false ) {
auto itr = _block_status.find(id);
Expand All @@ -478,14 +502,17 @@ namespace eosio {
void do_send_next_message() {
//wlog("");
if( _state == sending_state ) return; /// in process of sending
if( _out_buffer.size() ) return;
if( !_recv_remote_hello ) return;
// wlog( "do send next message" );

//wdump((_remote_head_block_num)(_local_lib) );

/// until we get caught up with last irreversible block we
/// will simply fetch blocks from the block log and send
if( _last_sent_block_num <= _local_lib ) {
_state = sending_state;
// wlog( "state = sending" );
async_get_block_num( _last_sent_block_num + 1,
[self=shared_from_this()]( auto sblockptr ) {
self->_last_sent_block_num++;
Expand All @@ -507,12 +534,15 @@ namespace eosio {
return itr->known_by_peer;
}

void send_next_trx() {
void send_next_trx() { try {
//wlog( "send next trx" );
auto& idx = _transaction_status.get<by_received>();
auto start = idx.begin();
while( start != idx.end() && start->expired < fc::time_point::now() ) {
idx.erase( start );
start = idx.begin();
}
if( idx.size() > 2000 ) {
idump((idx.size()));
}

Expand All @@ -523,40 +553,45 @@ namespace eosio {
stat.known_by_peer = true;
});
auto ptrx_ptr = std::make_shared<packed_transaction>( start->trx->packed_trx );
wlog("sending trx ${id}", ("id",start->id) );
// wlog("sending trx ${id}", ("id",start->id) );
send(ptrx_ptr);
return;
}
++start;
}
}
} FC_LOG_AND_RETHROW() }

void send_next_block() {
_state = sending_state;
async_get_block_num( _last_sent_block_num + 1,
[self=shared_from_this()]( auto sblockptr ) {
auto prev = sblockptr->previous;

bool peer_knows_prev = self->_last_sent_block_id == prev; /// shortcut
if( !peer_knows_prev )
peer_knows_prev = self->is_known_by_peer( prev );

if( peer_knows_prev ) {
self->_last_sent_block_id = sblockptr->id();
self->_last_sent_block_num = block_header::num_from_id(self->_last_sent_block_id);
self->mark_block_known_by_peer( self->_last_sent_block_id );
//ilog( "sending pending........... ${n}", ("n", sblockptr->block_num()) );
self->send( sblockptr );
} else {
wlog( "looks like we had a fork... see if peer knows previous block num" );
self->_state = idle_state;
/// we must have forked... peer doesn't know about previous,
/// we need to find the most recent block the peer does know about
self->_last_sent_block_num--;
self->send_next_block();
}
}
);
//wlog( "send next trx" );
try {
_state = sending_state;
// wlog( "state = sending " );
async_get_block_num( _last_sent_block_num + 1,
[self=shared_from_this()]( auto sblockptr ) {
auto prev = sblockptr->previous;

bool peer_knows_prev = self->_last_sent_block_id == prev; /// shortcut
if( !peer_knows_prev )
peer_knows_prev = self->is_known_by_peer( prev );

if( peer_knows_prev ) {
self->_last_sent_block_id = sblockptr->id();
self->_last_sent_block_num = block_header::num_from_id(self->_last_sent_block_id);
self->mark_block_known_by_peer( self->_last_sent_block_id );
//ilog( "sending pending........... ${n}", ("n", sblockptr->block_num()) );
self->send( sblockptr );
} else {
wlog( "looks like we had a fork... see if peer knows previous block num" );
self->_state = idle_state;
wlog( "state = idle" );
/// we must have forked... peer doesn't know about previous,
/// we need to find the most recent block the peer does know about
self->_last_sent_block_num--;
self->send_next_block();
}
}
);
} FC_LOG_AND_RETHROW()
}

void on_fail( boost::system::error_code ec, const char* what ) {
Expand Down Expand Up @@ -658,7 +693,7 @@ namespace eosio {

void on( const packed_transaction_ptr& p ) {
auto id = p->id();
wlog( "received trx ${id}", ("id",id) );
//wlog( "received trx ${id}", ("id",id) );
auto itr = _transaction_status.find( id );
if( itr != _transaction_status.end() ) {
_transaction_status.modify( itr, [&]( auto& stat ) {
Expand Down Expand Up @@ -689,11 +724,12 @@ namespace eosio {

void on_write( boost::system::error_code ec, std::size_t bytes_transferred ) {
boost::ignore_unused(bytes_transferred);
_state = idle_state;

if( ec ) {
_ws->next_layer().close();
return on_fail( ec, "write" );
}
_state = idle_state;
//wlog( "state = idle" );
_out_buffer.resize(0);
do_send_next_message();
}
Expand Down Expand Up @@ -833,12 +869,12 @@ namespace eosio {

vector<const session*> removed;
for( const auto& item : _sessions ) {
auto ses = item.second.lock();
shared_ptr<session> ses = item.second.lock();
if( ses ) {
ses->_ios.post( boost::asio::bind_executor(
ses->_strand,
[ses,id,prev,msgdata](){
ses->on_accepted_block(id,prev,msgdata);
[ses,id,prev,msgdata,s](){
ses->on_accepted_block(id,prev,msgdata,s);
}
));
} else {
Expand Down
7 changes: 7 additions & 0 deletions plugins/producer_plugin/producer_plugin.cpp
Expand Up @@ -6,6 +6,7 @@
#include <eosio/chain/producer_object.hpp>
#include <eosio/chain/plugin_interface.hpp>
#include <eosio/chain/global_property_object.hpp>
#include <eosio/chain/transaction_object.hpp>

#include <fc/io/json.hpp>
#include <fc/smart_ref_impl.hpp>
Expand Down Expand Up @@ -232,6 +233,12 @@ class producer_plugin_impl {
}

transaction_trace_ptr on_incoming_transaction(const packed_transaction_ptr& trx) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
auto id = trx->id();
if( chain.db().find<transaction_object, by_trx_id>(id) ) {
return transaction_trace_ptr();

This comment has been minimized.

Copy link
@wanderingbort

wanderingbort May 24, 2018

Contributor

This is not an expected return value from this function and we'd need to audit callers of the methods/channels it backs to see if they can handle a null shared pointer return. @heifner has identified that at least the RPC endpoint was not expecting this and handling it poorly.

We can either return a newly created empty trace with the except field set to the appropriate exception for duplicates and avoid the cost of getting into chain controller OR just throw the appropriate exception from here and avoid the cost.

}

return publish_results_of(trx, _transaction_ack_channel, [&]() -> transaction_trace_ptr {
while (true) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
Expand Down

0 comments on commit 01b0ffa

Please sign in to comment.