Skip to content
This repository has been archived by the owner on Oct 28, 2021. It is now read-only.

Remove ThreadSanitizer warnings #4579

Merged
merged 10 commits into from Nov 29, 2017
5 changes: 3 additions & 2 deletions libethereum/Client.h
Expand Up @@ -320,6 +320,9 @@ class Client: public ClientBase, protected Worker

std::weak_ptr<EthereumHost> m_host; ///< Our Ethereum Host. Don't do anything if we can't lock.

std::condition_variable m_signalled;
Mutex x_signalled;

Handler<> m_tqReady;
Handler<h256 const&> m_tqReplaced;
Handler<> m_bqReady;
Expand All @@ -339,8 +342,6 @@ class Client: public ClientBase, protected Worker
SharedMutex x_functionQueue;
std::queue<std::function<void()>> m_functionQueue; ///< Functions waiting to be executed in the main thread.

std::condition_variable m_signalled;
Mutex x_signalled;
std::atomic<bool> m_syncTransactionQueue = {false};
std::atomic<bool> m_syncBlockQueue = {false};

Expand Down
28 changes: 14 additions & 14 deletions libethereum/EthereumHost.cpp
Expand Up @@ -54,19 +54,19 @@ namespace
class EthereumPeerObserver: public EthereumPeerObserverFace
{
public:
EthereumPeerObserver(BlockChainSync& _sync, TransactionQueue& _tq): m_sync(_sync), m_tq(_tq) {}
EthereumPeerObserver(shared_ptr<BlockChainSync> _sync, TransactionQueue& _tq): m_sync(_sync), m_tq(_tq) {}

void onPeerStatus(std::shared_ptr<EthereumPeer> _peer) override
{
try
{
m_sync.onPeerStatus(_peer);
m_sync->onPeerStatus(_peer);
}
catch (FailedInvariant const&)
{
// "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
clog(NetWarn) << "Failed invariant during sync, restarting sync";
m_sync.restartSync();
m_sync->restartSync();
}
}

Expand All @@ -81,7 +81,7 @@ class EthereumPeerObserver: public EthereumPeerObserverFace
{
try
{
m_sync.onPeerAborting();
m_sync->onPeerAborting();
}
catch (Exception&)
{
Expand All @@ -93,55 +93,55 @@ class EthereumPeerObserver: public EthereumPeerObserverFace
{
try
{
m_sync.onPeerBlockHeaders(_peer, _headers);
m_sync->onPeerBlockHeaders(_peer, _headers);
}
catch (FailedInvariant const&)
{
// "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
clog(NetWarn) << "Failed invariant during sync, restarting sync";
m_sync.restartSync();
m_sync->restartSync();
}
}

void onPeerBlockBodies(std::shared_ptr<EthereumPeer> _peer, RLP const& _r) override
{
try
{
m_sync.onPeerBlockBodies(_peer, _r);
m_sync->onPeerBlockBodies(_peer, _r);
}
catch (FailedInvariant const&)
{
// "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
clog(NetWarn) << "Failed invariant during sync, restarting sync";
m_sync.restartSync();
m_sync->restartSync();
}
}

void onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, std::vector<std::pair<h256, u256>> const& _hashes) override
{
try
{
m_sync.onPeerNewHashes(_peer, _hashes);
m_sync->onPeerNewHashes(_peer, _hashes);
}
catch (FailedInvariant const&)
{
// "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
clog(NetWarn) << "Failed invariant during sync, restarting sync";
m_sync.restartSync();
m_sync->restartSync();
}
}

void onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r) override
{
try
{
m_sync.onPeerNewBlock(_peer, _r);
m_sync->onPeerNewBlock(_peer, _r);
}
catch (FailedInvariant const&)
{
// "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
clog(NetWarn) << "Failed invariant during sync, restarting sync";
m_sync.restartSync();
m_sync->restartSync();
}
}

Expand All @@ -158,7 +158,7 @@ class EthereumPeerObserver: public EthereumPeerObserverFace
}

private:
BlockChainSync& m_sync;
shared_ptr<BlockChainSync> m_sync;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid we have the same problem with m_syncMutex below. If EthereumHost gets destroyed before EthereumPeerObserver mutex is destroyed with it... Not sure yet what to do with it

m_tq is destroyed with the Client, also not sure if EthereumPeers can outlive Client

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(for myself) try to remove m_syncMutex.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow I'm making other classes hold m_tq as a weak pointer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I was able to remove m_syncMutex and still seeing no warnings from ThreadSanitizer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About m_tq, I decided to use shared/weak pointers.

TransactionQueue& m_tq;
};

Expand Down Expand Up @@ -380,7 +380,7 @@ EthereumHost::EthereumHost(BlockChain const& _ch, OverlayDB const& _db, Transact
// TODO: Composition would be better. Left like that to avoid initialization
// issues as BlockChainSync accesses other EthereumHost members.
m_sync.reset(new BlockChainSync(*this));
m_peerObserver = make_shared<EthereumPeerObserver>(*m_sync, m_tq);
m_peerObserver = make_shared<EthereumPeerObserver>(m_sync, m_tq);
m_latestBlockSent = _ch.currentHash();
m_tq.onImport([this](ImportResult _ir, h256 const& _h, h512 const& _nodeId) { onTransactionImported(_ir, _h, _nodeId); });
}
Expand Down
2 changes: 1 addition & 1 deletion libethereum/EthereumHost.h
Expand Up @@ -133,7 +133,7 @@ class EthereumHost: public p2p::HostCapability<EthereumPeer>, Worker
bool m_newBlocks = false;

mutable Mutex x_transactions;
std::unique_ptr<BlockChainSync> m_sync;
std::shared_ptr<BlockChainSync> m_sync;
std::atomic<time_t> m_lastTick = { 0 };

std::shared_ptr<EthereumHostDataFace> m_hostData;
Expand Down
33 changes: 18 additions & 15 deletions libethereum/EthereumPeer.cpp
Expand Up @@ -109,8 +109,8 @@ void EthereumPeer::setRude()

void EthereumPeer::abortSync()
{
if (m_observer)
m_observer->onPeerAborting();
if (auto observer = m_observer.lock())
observer->onPeerAborting();
}


Expand Down Expand Up @@ -236,7 +236,10 @@ bool EthereumPeer::isCriticalSyncing() const

bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
{
assert(m_observer);
auto observer = m_observer.lock();
auto hostData = m_hostData.lock();
if (!observer || !hostData)
return false;

m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
try
Expand All @@ -255,12 +258,12 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)

clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
setIdle();
m_observer->onPeerStatus(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())));
observer->onPeerStatus(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())));
break;
}
case TransactionsPacket:
{
m_observer->onPeerTransactions(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())), _r);
observer->onPeerTransactions(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())), _r);
break;
}
case GetBlockHeadersPacket:
Expand All @@ -280,7 +283,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
break;
}

pair<bytes, unsigned> const rlpAndItemCount = m_hostData->blockHeaders(blockId, numHeadersToSend, skip, reverse);
pair<bytes, unsigned> const rlpAndItemCount = hostData->blockHeaders(blockId, numHeadersToSend, skip, reverse);

RLPStream s;
prep(s, BlockHeadersPacket, rlpAndItemCount.second).appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
Expand All @@ -295,7 +298,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
else
{
setIdle();
m_observer->onPeerBlockHeaders(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
observer->onPeerBlockHeaders(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
}
break;
}
Expand All @@ -311,7 +314,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
break;
}

pair<bytes, unsigned> const rlpAndItemCount = m_hostData->blockBodies(_r);
pair<bytes, unsigned> const rlpAndItemCount = hostData->blockBodies(_r);

addRating(0);
RLPStream s;
Expand All @@ -326,13 +329,13 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
else
{
setIdle();
m_observer->onPeerBlockBodies(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
observer->onPeerBlockBodies(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
}
break;
}
case NewBlockPacket:
{
m_observer->onPeerNewBlock(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
observer->onPeerNewBlock(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
break;
}
case NewBlockHashesPacket:
Expand All @@ -351,7 +354,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
for (unsigned i = 0; i < itemCount; ++i)
hashes[i] = std::make_pair(_r[i][0].toHash<h256>(), _r[i][1].toInt<u256>());

m_observer->onPeerNewHashes(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), hashes);
observer->onPeerNewHashes(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), hashes);
break;
}
case GetNodeDataPacket:
Expand All @@ -365,7 +368,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
}
clog(NetMessageSummary) << "GetNodeData (" << dec << count << " entries)";

strings const data = m_hostData->nodeData(_r);
strings const data = hostData->nodeData(_r);

addRating(0);
RLPStream s;
Expand All @@ -386,7 +389,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
}
clog(NetMessageSummary) << "GetReceipts (" << dec << count << " entries)";

pair<bytes, unsigned> const rlpAndItemCount = m_hostData->receipts(_r);
pair<bytes, unsigned> const rlpAndItemCount = hostData->receipts(_r);

addRating(0);
RLPStream s;
Expand All @@ -401,7 +404,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
else
{
setIdle();
m_observer->onPeerNodeData(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
observer->onPeerNodeData(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
}
break;
}
Expand All @@ -412,7 +415,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
else
{
setIdle();
m_observer->onPeerReceipts(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
observer->onPeerReceipts(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
}
break;
}
Expand Down
4 changes: 2 additions & 2 deletions libethereum/EthereumPeer.h
Expand Up @@ -192,8 +192,8 @@ class EthereumPeer: public p2p::Capability
unsigned m_unknownNewBlocks = 0; ///< Number of unknown NewBlocks received from this peer
unsigned m_lastAskedHeaders = 0; ///< Number of hashes asked

std::shared_ptr<EthereumPeerObserverFace> m_observer;
std::shared_ptr<EthereumHostDataFace> m_hostData;
std::weak_ptr<EthereumPeerObserverFace> m_observer;
std::weak_ptr<EthereumHostDataFace> m_hostData;
};

}
Expand Down