Skip to content

Commit

Permalink
p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
magicbaer committed Sep 1, 2019
1 parent 15daeb3 commit d58baf4
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 31 deletions.
2 changes: 1 addition & 1 deletion build.sh
@@ -1,2 +1,2 @@
#!/bin/bash
make -j4 brcd
make -j8 brcd
20 changes: 10 additions & 10 deletions indexDb/database/src/exchangeOrder.cpp
Expand Up @@ -294,16 +294,16 @@ namespace dev {
}

bool exchange_plugin::commit_disk(int64_t version, bool first_commit) {
db->with_write_lock([&]() {
if (first_commit) {
const auto &obj = db->get<dynamic_object>();
db->modify(obj, [&](dynamic_object &obj) {
obj.version = version;
});
}
db->commit(version);
db->flush();
});
// db->with_write_lock([&]() {
// if (first_commit) {
// const auto &obj = db->get<dynamic_object>();
// db->modify(obj, [&](dynamic_object &obj) {
// obj.version = version;
// });
// }
// db->commit(version);
// db->flush();
// });
return true;
}

Expand Down
23 changes: 19 additions & 4 deletions libbrcdchain/BlockChainSync.cpp
Expand Up @@ -222,7 +222,10 @@ void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
{
if (m_host.peer(_peerID).isConversing())
{
LOG(m_loggerDetail) << "Can't sync with this peer - outstanding asks.";
LOG(m_loggerDetail) << "Can't sync with this peer - outstanding asks."
<< " ask state " << int32_t(m_host.peer(_peerID).asking())
<< " lastest hash " << (m_host.peer(_peerID).latestHash())
<< " id " << _peerID;
return;
}

Expand All @@ -241,11 +244,23 @@ void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
return;
}

bool ignore_sync = false;
if(height != 0 ){
auto latest_block = host().chain().info().timestamp();
if(peer_block_number > height){
int64_t time_offset = (peer_block_number - height) * host().chain().chainParams().blockInterval;
if(latest_block + time_offset > utcTimeMilliSec()){
ignore_sync = true;
LOG(m_loggerDetail) << "ignore sync " << _peerID << " self height " << height << " peer height " << peer_block_number
<< " last import h: " << last_block_num;
}
}
}

if( (_force || std::max(height, last_block_num) < peer_block_number ) && m_state != SyncState::Blocks){
if( (_force || std::max(height, last_block_num) < peer_block_number ) && m_state != SyncState::Blocks && !ignore_sync){
if(m_state == SyncState::Idle || m_state == SyncState::NotSynced){
LOG(m_loggerInfo) << "Starting full sync from " << _peerID << " self height " << height << " peer height " << peer_block_number
<< " last import h: " << last_block_num;
<< " last import h: " << last_block_num;
m_state = SyncState::Blocks;
}
peer.requestBlockHeaders(peer.latestHash(), 1, 0, false);
Expand Down Expand Up @@ -445,7 +460,7 @@ void BlockChainSync::onPeerBlockHeaders(NodeID const& _peerID, RLP const& _r)
clearPeerDownload(_peerID);
if (m_state != SyncState::Blocks && m_state != SyncState::Waiting)
{
LOG(m_logger) << "Ignoring unexpected blocks";
LOG(m_logger) << "Ignoring unexpected blocks " << (int32_t)m_state.load();
return;
}
if (m_state == SyncState::Waiting)
Expand Down
35 changes: 34 additions & 1 deletion libbrcdchain/BrcdChainCapability.cpp
Expand Up @@ -44,6 +44,8 @@ string toString(Asking _a)
return "WarpManifest";
case Asking::WarpData:
return "WarpData";
case Asking::UpdateStatus:
return "UpdateStatus";
}
return "?";
}
Expand Down Expand Up @@ -718,6 +720,7 @@ void BrcdChainCapability::onTransactionImported(

void BrcdChainCapability::onConnect(NodeID const& _peerID, u256 const& _peerCapabilityVersion)
{
LOG(m_logger) << "on connect " << _peerID << " version : " << _peerCapabilityVersion;
m_host->addNote(_peerID, "manners", m_host->isRude(_peerID, name()) ? "RUDE" : "nice");

BrcdChainPeer peer{m_host, _peerID, _peerCapabilityVersion};
Expand Down Expand Up @@ -932,6 +935,33 @@ bool BrcdChainCapability::interpretCapabilityPacket(
}
break;
}
case GetLatestStatus:
{
RLPStream s;
m_host->prep(_peerID, name(), s, UpdateStatus, 6)
<< c_protocolVersion << m_networkId << m_chain.details().totalDifficulty << m_chain.currentHash()
<< m_chain.genesisHash() << m_chain.details().number;
m_host->sealAndSend(_peerID, s);
break;
}
case UpdateStatus:{

auto const peerProtocolVersion = _r[0].toInt<unsigned>();
auto const networkId = _r[1].toInt<u256>();
auto const totalDifficulty = _r[2].toInt<u256>();
auto const latestHash = _r[3].toHash<h256>();
auto const genesisHash = _r[4].toHash<h256>();
auto const height = _r[5].toInt<u256>();

LOG(m_logger) << "update status Status: " << peerProtocolVersion << " / " << networkId << " / "
<< genesisHash << ", TD: " << totalDifficulty << " = " << latestHash << " height : " << height;

peer.setStatus(peerProtocolVersion, networkId, totalDifficulty, latestHash, genesisHash, height);
setIdle(_peerID);
m_peerObserver->onPeerStatus(peer);
LOG(m_logger) << " continue sync blocks.";
break;
};
default:
return false;
}
Expand All @@ -957,8 +987,11 @@ void BrcdChainCapability::setIdle(NodeID const& _peerID)
void BrcdChainCapability::setAsking(NodeID const& _peerID, Asking _a)
{
auto itPeerStatus = m_peers.find(_peerID);
if (itPeerStatus == m_peers.end())
if (itPeerStatus == m_peers.end()){
LOG(m_logger) << "cant find peer " << _peerID;
return;
}


auto& peerStatus = itPeerStatus->second;

Expand Down
11 changes: 11 additions & 0 deletions libbrcdchain/BrcdChainPeer.cpp
Expand Up @@ -31,6 +31,8 @@ string toString(Asking _a)
return "WarpManifest";
case Asking::WarpData:
return "WarpData";
case Asking::UpdateStatus:
return "UpdateStatus";
}
return "?";
}
Expand Down Expand Up @@ -77,6 +79,15 @@ void BrcdChainPeer::requestStatus(
m_host->sealAndSend(m_id, s);
}

void BrcdChainPeer::requestLatestStatus(){

RLPStream s;
setAsking(Asking::UpdateStatus);
LOG(m_logger) << "requestLatestStatus " << ::toString(m_asking) << " m_id: " << m_id;
m_host->prep(m_id, c_brcCapability, s, GetLatestStatus, 0);
m_host->sealAndSend(m_id, s);
}

void BrcdChainPeer::requestBlockHeaders(
unsigned _startNumber, unsigned _count, unsigned _skip, bool _reverse)
{
Expand Down
10 changes: 7 additions & 3 deletions libbrcdchain/BrcdChainPeer.h
Expand Up @@ -57,6 +57,7 @@ class BrcdChainPeer

void requestStatus(u256 _hostNetworkId, u256 _chainTotalDifficulty, h256 _chainCurrentHash, h256 _chainGenesPeersh, u256 height);

void requestLatestStatus();
/// Request hashes for given parent hash.
void requestBlockHeaders(
h256 const& _startHash, unsigned _count, unsigned _skip, bool _reverse);
Expand All @@ -74,8 +75,8 @@ class BrcdChainPeer
uint32_t get_request_zero_times() const { return m_request_zero_times;}
void set_request_zero_times(uint32_t r ) { m_request_zero_times = r;};

void set_last_request_number(uint32_t r ) { m_last_request_number = r;}
uint32_t get_last_request_number() const { return m_last_request_number;}
void set_last_request_times(uint32_t r ) { m_last_request_times = r;}
uint32_t get_last_request_times() const { return m_last_request_times;}
private:
// Request of type _packetType with _hashes as input parameters
void requestByHashes(h256s const& _hashes, Asking _asking, SubprotocolPacketType _packetType);
Expand Down Expand Up @@ -108,8 +109,11 @@ class BrcdChainPeer
unsigned m_unknownNewBlocks = 0; ///< Number of unknown NewBlocks received from this peer
unsigned m_lastAskedHeaders = 0; ///< Number of hashes asked




uint32_t m_request_zero_times = 0;
uint32_t m_last_request_number = 0; /// < request last block_header number.
uint32_t m_last_request_times = 0; /// < request last block_header number.
Logger m_logger{createLogger(VerbosityDebug, "peer")};
};
} // namespace brc
Expand Down
4 changes: 3 additions & 1 deletion libbrcdchain/CommonNet.h
Expand Up @@ -42,7 +42,8 @@ enum SubprotocolPacketType: byte
NodeDataPacket = 0x0e,
GetReceiptsPacket = 0x0f,
ReceiptsPacket = 0x10,

GetLatestStatus = 0x11,
UpdateStatus = 0x12,
PacketCount
};

Expand All @@ -55,6 +56,7 @@ enum class Asking
Receipts,
WarpManifest,
WarpData,
UpdateStatus,
Nothing
};

Expand Down
2 changes: 1 addition & 1 deletion libshdposseal/SHDpos.cpp
Expand Up @@ -103,7 +103,7 @@ bool dev::bacd::SHDpos::checkDeadline(uint64_t _now) {
return false;
}

if (_now < uint64_t(m_next_block_time))
if (_now < uint64_t(m_next_block_time + 50))
return false;

//得到每次出块的整数时间刻度,比较上次,现在和下次
Expand Down
20 changes: 10 additions & 10 deletions libshdposseal/SHDposClient.cpp
Expand Up @@ -322,16 +322,16 @@ void dev::bacd::SHDposClient::init(p2p::Host & _host, int _netWorkId)
{
//about SH-dpos net_host CapabilityHostFace 接口
cdebug << "capabilityHost :: SHDposHostCapability";
auto brcCapability = make_shared<SHDposHostcapality>(_host.capabilityHost(),
_netWorkId,
[this](NodeID _nodeid, unsigned _id, RLP const& _r){
dpos()->onDposMsg(_nodeid, _id, _r);
},
[this](NodeID const& _nodeid, u256 const& _peerCapabilityVersion){
dpos()->requestStatus(_nodeid, _peerCapabilityVersion);
});
_host.registerCapability(brcCapability);
dpos()->initNet(brcCapability);
// auto brcCapability = make_shared<SHDposHostcapality>(_host.capabilityHost(),
// _netWorkId,
// [this](NodeID _nodeid, unsigned _id, RLP const& _r){
// dpos()->onDposMsg(_nodeid, _id, _r);
// },
// [this](NodeID const& _nodeid, u256 const& _peerCapabilityVersion){
// dpos()->requestStatus(_nodeid, _peerCapabilityVersion);
// });
// _host.registerCapability(brcCapability);
// dpos()->initNet(brcCapability);
dpos()->initConfigAndGenesis(m_params);
dpos()->setDposClient(this);
m_bq.setOnBad([this](Exception& ex){ this->importBadBlock(ex); });
Expand Down

0 comments on commit d58baf4

Please sign in to comment.