Skip to content

Commit

Permalink
WIP: add advert/demand system to flooding
Browse files Browse the repository at this point in the history
  • Loading branch information
graydon committed Feb 11, 2020
1 parent 1ea79c8 commit bac04b5
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 13 deletions.
7 changes: 3 additions & 4 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ overlay.error.write | meter | error while sending a mes
overlay.fetch.txset | timer | time to complete fetching of a txset
overlay.fetch.qset | timer | time to complete fetching of a qset
overlay.flood.broadcast | meter | message sent as broadcast per peer
overlay.flood.advertized | meter | message advertized to other peers
overlay.flood.demanded | meter | message demanded in response to advert from other peer
overlay.flood.fulfilled | meter | message message sent in response to demand from other peer
overlay.flood.duplicate_recv | meter | number of bytes of flooded messages that have already been received
overlay.flood.unique_recv | meter | number of bytes of flooded messages that have not yet been received
overlay.inbound.attempt | meter | inbound connection attempted (accepted on socket)
Expand All @@ -97,10 +100,6 @@ overlay.outbound.establish | meter | outbound connection estab
overlay.recv.<X> | timer | received message <X>
overlay.send.<X> | meter | sent message <X>
overlay.timeout.idle | meter | idle peer timeout
overlay.recv.survey-request | timer | time spent in processing survey request
overlay.recv.survey-response | timer | time spent in processing survey response
overlay.send.survey-request | meter | sent survey request
overlay.send.survey-response | meter | sent survey response
scp.envelope.emit | meter | SCP message sent
scp.envelope.invalidsig | meter | envelope failed signature verification
scp.envelope.receive | meter | SCP message received
Expand Down
2 changes: 1 addition & 1 deletion src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
MAXIMUM_LEDGER_CLOSETIME_DRIFT = 50;

OVERLAY_PROTOCOL_MIN_VERSION = 10;
OVERLAY_PROTOCOL_VERSION = 11;
OVERLAY_PROTOCOL_VERSION = 12;

VERSION_STR = STELLAR_CORE_VERSION;

Expand Down
163 changes: 158 additions & 5 deletions src/overlay/Floodgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ namespace stellar
{
Floodgate::FloodRecord::FloodRecord(StellarMessage const& msg, uint32_t ledger,
Peer::pointer peer)
: mLedgerSeq(ledger), mMessage(msg)
: mLedgerSeq(ledger), mMessage(std::make_unique<StellarMessage>(msg))
{
if (peer)
mPeersTold.insert(peer->toString());
}
Floodgate::FloodRecord::FloodRecord(uint32_t ledger, Peer::pointer peer)
: mLedgerSeq(ledger), mMessage(nullptr)
{
if (peer)
mPeersTold.insert(peer->toString());
Expand All @@ -30,8 +36,15 @@ Floodgate::Floodgate(Application& app)
app.getMetrics().NewCounter({"overlay", "memory", "flood-known"}))
, mSendFromBroadcast(app.getMetrics().NewMeter(
{"overlay", "flood", "broadcast"}, "message"))
, mMessagesAdvertized(app.getMetrics().NewMeter(
{"overlay", "flood", "advertized"}, "message"))
, mMessagesDemanded(app.getMetrics().NewMeter(
{"overlay", "flood", "demanded"}, "message"))
, mMessagesFulfilled(app.getMetrics().NewMeter(
{"overlay", "flood", "fulfilled"}, "message"))
, mShuttingDown(false)
{
mId = KeyUtils::toShortString(mApp.getConfig().NODE_SEED.getPublicKey());
}

// remove old flood records
Expand All @@ -40,9 +53,10 @@ Floodgate::clearBelow(uint32_t currentLedger)
{
for (auto it = mFloodMap.cbegin(); it != mFloodMap.cend();)
{
// give one ledger of leeway
// give ten ledgers of leeway
if (it->second->mLedgerSeq + 10 < currentLedger)
{
mPendingDemanded.erase(it->first);
it = mFloodMap.erase(it);
}
else
Expand All @@ -61,6 +75,7 @@ Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, Hash& index)
{
return false;
}
mPendingDemanded.erase(index);
auto result = mFloodMap.find(index);
if (result == mFloodMap.end())
{ // we have never seen this message
Expand All @@ -71,6 +86,16 @@ Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, Hash& index)
}
else
{
if (!result->second->mMessage)
{
// We're receiving the actual message for one we only
// knew about, but didn't have yet.
CLOG(TRACE, "Overlay")
<< mId << " upgrading " << hexAbbrev(index)
<< " from only-known to actually-have (in addRecord)";
result->second->mMessage = std::make_unique<StellarMessage>(msg);
return true;
}
result->second->mPeersTold.insert(peer->toString());
return false;
}
Expand All @@ -88,6 +113,7 @@ Floodgate::broadcast(StellarMessage const& msg, bool force,
Hash index = sha256(xdr::xdr_to_opaque(msg));
CLOG(TRACE, "Overlay") << "broadcast " << hexAbbrev(index);

mPendingDemanded.erase(index);
auto result = mFloodMap.find(index);
if (result == mFloodMap.end() || force)
{ // no one has sent us this message
Expand All @@ -96,6 +122,16 @@ Floodgate::broadcast(StellarMessage const& msg, bool force,
result = mFloodMap.insert(std::make_pair(index, record)).first;
mFloodMapSize.set_count(mFloodMap.size());
}
else if (result != mFloodMap.end() && !result->second->mMessage)
{
// We're receiving the actual message for one we only
// knew about, but didn't have yet.
CLOG(TRACE, "Overlay")
<< mId << " upgrading " << hexAbbrev(index)
<< " from only-known to actually-have (in broadcast)";
result->second->mMessage = std::make_unique<StellarMessage>(msg);
}

// send it to people that haven't sent it to us
auto& peersTold = result->second->mPeersTold;

Expand All @@ -108,15 +144,130 @@ Floodgate::broadcast(StellarMessage const& msg, bool force,
if (peersTold.find(peer.second->toString()) == peersTold.end() &&
peer.second->getRemoteOverlayVersion() >= minOverlayVersion)
{
mSendFromBroadcast.Mark();
peer.second->sendMessage(msg);
peersTold.insert(peer.second->toString());
if (peer.second->supportsAdverts())
{
CLOG(TRACE, "Overlay")
<< mId << " advertizing " << hexAbbrev(index) << " to "
<< KeyUtils::toShortString(peer.second->getPeerID());
mMessagesAdvertized.Mark();
peer.second->advertizeMessage(index);
}
else
{
mSendFromBroadcast.Mark();
peer.second->sendMessage(msg);
peersTold.insert(peer.second->toString());
}
}
}
CLOG(TRACE, "Overlay") << "broadcast " << hexAbbrev(index) << " told "
<< peersTold.size();
}

void
Floodgate::demandMissing(FloodAdvert const& adv, Peer::pointer fromPeer)
{
StellarMessage msg;
msg.type(FLOOD_DEMAND);
FloodDemand& demand = msg.floodDemand();
for (Hash const& h : adv.hashes)
{
auto i = mFloodMap.find(h);
bool haveMessage = false;
// Add to floodMap so it can be found by item-fetching.
if (i == mFloodMap.end())
{
CLOG(TRACE, "Overlay")
<< mId << " marking message " << hexAbbrev(h)
<< " advertized by "
<< KeyUtils::toShortString(fromPeer->getPeerID())
<< " known (but don't have it)";
mFloodMap[h] = std::make_shared<FloodRecord>(
mApp.getHerder().getCurrentLedgerSeq(), fromPeer);
mFloodMapSize.set_count(mFloodMap.size());
}
else
{
i->second->mPeersTold.insert(fromPeer->toString());
haveMessage = static_cast<bool>(i->second->mMessage);
if (haveMessage)
{
CLOG(TRACE, "Overlay")
<< mId << " know of message " << hexAbbrev(h)
<< " advertized by "
<< KeyUtils::toShortString(fromPeer->getPeerID())
<< " and already have it";
}
else
{
CLOG(TRACE, "Overlay")
<< mId << " know of message " << hexAbbrev(h)
<< " advertized by "
<< KeyUtils::toShortString(fromPeer->getPeerID())
<< " and don't have it";
}
}
if (mPendingDemanded.find(h) != mPendingDemanded.end())
{
CLOG(TRACE, "Overlay")
<< mId << " already demanded " << hexAbbrev(h)
<< " advertized by "
<< KeyUtils::toShortString(fromPeer->getPeerID());
}
if (!haveMessage && mPendingDemanded.find(h) == mPendingDemanded.end())
{
CLOG(TRACE, "Overlay")
<< mId << " demanding " << hexAbbrev(h) << " from "
<< KeyUtils::toShortString(fromPeer->getPeerID());
// We don't have this message in full and haven't
// demanded it yet from anyone who advertized it; ask
// now and leave a record that we've done so to avoid
// demanding it from others.
mMessagesDemanded.Mark();
mPendingDemanded.insert(h);
demand.hashes.emplace_back(h);
}
}
fromPeer->sendMessage(msg);
}

void
Floodgate::fulfillDemand(FloodDemand const& dmd, Peer::pointer fromPeer)
{
for (Hash const& h : dmd.hashes)
{
auto i = mFloodMap.find(h);
if (i != mFloodMap.end())
{
if (i->second->mMessage)
{
CLOG(TRACE, "Overlay")
<< mId << " fulfilling demand for " << hexAbbrev(h)
<< " demanded by "
<< KeyUtils::toShortString(fromPeer->getPeerID());
mMessagesFulfilled.Mark();
fromPeer->sendMessage(*(i->second->mMessage));
}
else
{
CLOG(TRACE, "Overlay")
<< mId << " can't fulfill demand for " << hexAbbrev(h)
<< " demanded by "
<< KeyUtils::toShortString(fromPeer->getPeerID())
<< " -- know of message but don't have it";
}
}
else
{
CLOG(TRACE, "Overlay")
<< mId << " can't fulfill demand for " << hexAbbrev(h)
<< " demanded by "
<< KeyUtils::toShortString(fromPeer->getPeerID())
<< " -- don't know of message";
}
}
}

std::set<Peer::pointer>
Floodgate::getPeersKnows(Hash const& h)
{
Expand Down Expand Up @@ -147,6 +298,8 @@ Floodgate::shutdown()
void
Floodgate::forgetRecord(Hash const& h)
{
CLOG(TRACE, "Overlay") << mId << " forgetting " << hexAbbrev(h);
mFloodMap.erase(h);
mPendingDemanded.erase(h);
}
}
13 changes: 12 additions & 1 deletion src/overlay/Floodgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

#include "overlay/Peer.h"
#include "overlay/StellarXDR.h"
#include <memory>
#include <map>
#include <set>

/**
* FloodGate keeps track of which peers have sent us which broadcast messages,
Expand Down Expand Up @@ -37,17 +39,23 @@ class Floodgate
typedef std::shared_ptr<FloodRecord> pointer;

uint32_t mLedgerSeq;
StellarMessage mMessage;
std::unique_ptr<StellarMessage> mMessage;
std::set<std::string> mPeersTold;

FloodRecord(StellarMessage const& msg, uint32_t ledger,
Peer::pointer peer);
FloodRecord(uint32_t ledger, Peer::pointer peer);
};

std::map<Hash, FloodRecord::pointer> mFloodMap;
std::set<Hash> mPendingDemanded;
Application& mApp;
std::string mId;
medida::Counter& mFloodMapSize;
medida::Meter& mSendFromBroadcast;
medida::Meter& mMessagesAdvertized;
medida::Meter& mMessagesDemanded;
medida::Meter& mMessagesFulfilled;
bool mShuttingDown;

public:
Expand All @@ -63,6 +71,9 @@ class Floodgate
void broadcast(StellarMessage const& msg, bool force,
uint32_t minOverlayVersion);

void demandMissing(FloodAdvert const& adv, Peer::pointer fromPeer);
void fulfillDemand(FloodDemand const& dmd, Peer::pointer fromPeer);

// returns the list of peers that sent us the item with hash `msgID`
// NB: `msgID` is the hash of a `StellarMessage`
std::set<Peer::pointer> getPeersKnows(Hash const& msgID);
Expand Down
5 changes: 5 additions & 0 deletions src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ class OverlayManager
// message with the ID msgID will cause it to be broadcast to all peers
virtual void forgetFloodedMsg(Hash const& msgID) = 0;

virtual void demandMissing(FloodAdvert const& adv,
Peer::pointer fromPeer) = 0;
virtual void fulfillDemand(FloodDemand const& dmd,
Peer::pointer fromPeer) = 0;

// Return a list of random peers from the set of authenticated peers.
virtual std::vector<Peer::pointer> getRandomAuthenticatedPeers() = 0;

Expand Down
14 changes: 14 additions & 0 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,20 @@ OverlayManagerImpl::broadcastMessage(StellarMessage const& msg, bool force,
mFloodGate.broadcast(msg, force, minOverlayVersion);
}

void
OverlayManagerImpl::demandMissing(FloodAdvert const& adv,
Peer::pointer fromPeer)
{
mFloodGate.demandMissing(adv, fromPeer);
}

void
OverlayManagerImpl::fulfillDemand(FloodDemand const& dmd,
Peer::pointer fromPeer)
{
mFloodGate.fulfillDemand(dmd, fromPeer);
}

void
OverlayManager::dropAll(Database& db)
{
Expand Down
2 changes: 2 additions & 0 deletions src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class OverlayManagerImpl : public OverlayManager
void forgetFloodedMsg(Hash const& msgID) override;
void broadcastMessage(StellarMessage const& msg, bool force = false,
uint32_t minOverlayVersion = 0) override;
void demandMissing(FloodAdvert const& adv, Peer::pointer fromPeer) override;
void fulfillDemand(FloodDemand const& dmd, Peer::pointer fromPeer) override;
void connectTo(PeerBareAddress const& address) override;

void addInboundConnection(Peer::pointer peer) override;
Expand Down
9 changes: 9 additions & 0 deletions src/overlay/OverlayMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ OverlayMetrics::OverlayMetrics(Application& app)
, mRecvSurveyResponseTimer(
app.getMetrics().NewTimer({"overlay", "recv", "survey-response"}))

, mRecvFloodAdvertTimer(
app.getMetrics().NewTimer({"overlay", "recv", "flood-advert"}))
, mRecvFloodDemandTimer(
app.getMetrics().NewTimer({"overlay", "recv", "flood-demand"}))

, mMessageDelayInWriteQueueTimer(
app.getMetrics().NewTimer({"overlay", "delay", "write-queue"}))
, mMessageDelayInAsyncWriteTimer(
Expand Down Expand Up @@ -103,6 +108,10 @@ OverlayMetrics::OverlayMetrics(Application& app)
{"overlay", "send", "survey-request"}, "message"))
, mSendSurveyResponseMeter(app.getMetrics().NewMeter(
{"overlay", "send", "survey-response"}, "message"))
, mSendFloodAdvertMeter(app.getMetrics().NewMeter(
{"overlay", "send", "flood-advert"}, "message"))
, mSendFloodDemandMeter(app.getMetrics().NewMeter(
{"overlay", "send", "flood-demand"}, "message"))
, mMessagesBroadcast(app.getMetrics().NewMeter(
{"overlay", "message", "broadcast"}, "message"))
, mPendingPeersSize(
Expand Down
6 changes: 6 additions & 0 deletions src/overlay/OverlayMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ struct OverlayMetrics
medida::Timer& mRecvSurveyRequestTimer;
medida::Timer& mRecvSurveyResponseTimer;

medida::Timer& mRecvFloodAdvertTimer;
medida::Timer& mRecvFloodDemandTimer;

medida::Timer& mMessageDelayInWriteQueueTimer;
medida::Timer& mMessageDelayInAsyncWriteTimer;

Expand All @@ -79,6 +82,9 @@ struct OverlayMetrics
medida::Meter& mSendSurveyRequestMeter;
medida::Meter& mSendSurveyResponseMeter;

medida::Meter& mSendFloodAdvertMeter;
medida::Meter& mSendFloodDemandMeter;

medida::Meter& mMessagesBroadcast;
medida::Counter& mPendingPeersSize;
medida::Counter& mAuthenticatedPeersSize;
Expand Down
Loading

0 comments on commit bac04b5

Please sign in to comment.