Skip to content

Commit

Permalink
Tsan failure fixes (#964)
Browse files Browse the repository at this point in the history
* pull in some changes from CIRCLE_CI_FIXES branch

* update containers library

* add in locking for transmitters

* fix some other errors with `this->` in  lambda

* remove use of nullptr for insert in ValueFederateManager

* fix incorrect unlock call in connect method for commsInterface

* add a thread join in the destructor oc commsInterface to make sure the join is called.

* add a shared_from_this into an additional callback for tcpconnections

* update concurrency library with a different fix for triggerVariable

* switch tsan to the clang-8 version

* move octave build/test to buildenv image

* Automated formatting of source files (#965)

* bump concurrency to a tagged version

* Tweak the formatting of one of the lambdas
  • Loading branch information
phlptp committed Dec 9, 2019
1 parent 9e3b3b7 commit 283b76c
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 63 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Expand Up @@ -47,7 +47,7 @@ aliases:
jobs:
helicsoctave:
docker:
- image: helics/octave:latest
- image: helics/buildenv:octave
environment:
OCTAVETEST: "/root/project/tests/octave"
CMAKE_FLAGS: "-DBUILD_OCTAVE_INTERFACE=ON -DHELICS_BUILD_TESTS=ON"
Expand All @@ -61,7 +61,7 @@ jobs:
- image: helics/clang-tsan:latest
environment:
CMAKE_FLAGS: '-DCMAKE_CXX_COMPILER=clang++ -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_FLAGS="-fsanitize=thread -stdlib=libc++ -L/root/develop/libcxx_tsan/lib -lc++abi -I/root/develop/libcxx_tsan/include -I/root/develop/libcxx_tsan/include/c++/v1 -Wno-unused-command-line-argument -g -O1 -Wl,-rpath=/root/develop/libcxx_tsan/lib" -DHELICS_BUILD_TESTS=ON -DHELICS_ZMQ_SUBPROJECT=ON -DHELICS_ZMQ_FORCE_SUBPROJECT=ON -DHELICS_DISABLE_SYSTEM_CALL_TESTS=ON'
TSAN_OPTIONS: 'second_deadlock_stack=1 suppressions=/root/project/.circleci/tsan_suppression.txt'
TSAN_OPTIONS: 'second_deadlock_stack=1 suppressions=/root/project/.circleci/tsan_suppression.txt history_size=4'

steps:
- checkout
Expand Down
2 changes: 1 addition & 1 deletion .clang-format
Expand Up @@ -19,7 +19,7 @@ AllowShortLoopsOnASingleLine: 'false'
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: 'false'
AlwaysBreakTemplateDeclarations: 'Yes'
AlwaysBreakTemplateDeclarations: 'true'
BinPackArguments: 'false'
BinPackParameters: 'false'
BreakBeforeBraces: Custom
Expand Down
2 changes: 1 addition & 1 deletion ThirdParty/concurrency
Submodule concurrency updated 49 files
+1 −1 .circleci/config.yml
+78 −52 .clang-format
+0 −3 .gitmodules
+0 −18 .travis.yml
+9 −6 CMakeLists.txt
+89 −30 config/cmake/AddGooglebenchmark.cmake
+111 −38 config/cmake/AddGoogletest.cmake
+157 −0 config/cmake/GitUtils.cmake
+32 −0 config/cmake/Utils.cmake
+0 −1 extern/googletest
+7 −4 gmlc/CMakeLists.txt
+38 −47 gmlc/concurrency/Barrier.hpp
+113 −140 gmlc/concurrency/DelayedDestructor.hpp
+81 −90 gmlc/concurrency/DelayedObjects.hpp
+33 −39 gmlc/concurrency/Latch.hpp
+107 −127 gmlc/concurrency/SearchableObjectHolder.hpp
+84 −101 gmlc/concurrency/TriggerVariable.hpp
+79 −104 gmlc/concurrency/TripWire.hpp
+64 −67 gmlc/libguarded/atomic_guarded.hpp
+169 −187 gmlc/libguarded/cow_guarded.hpp
+205 −228 gmlc/libguarded/deferred_guarded.hpp
+87 −82 gmlc/libguarded/guarded.hpp
+92 −89 gmlc/libguarded/guarded_opt.hpp
+163 −206 gmlc/libguarded/handles.hpp
+137 −172 gmlc/libguarded/lr_guarded.hpp
+148 −151 gmlc/libguarded/ordered_guarded.hpp
+100 −110 gmlc/libguarded/rcu_guarded.hpp
+459 −514 gmlc/libguarded/rcu_list.hpp
+109 −115 gmlc/libguarded/shared_guarded.hpp
+117 −132 gmlc/libguarded/shared_guarded_opt.hpp
+112 −139 gmlc/libguarded/staged_guarded.hpp
+2 −2 tests/BarrierTests.cpp
+8 −5 tests/CMakeLists.txt
+2 −2 tests/DelayedObjectsTests.cpp
+2 −2 tests/LatchTests.cpp
+2 −2 tests/SearchableObjectHolderTests.cpp
+6 −10 tests/TriggerVariableTests.cpp
+2 −2 tests/TripWireTests.cpp
+9 −0 tests/libguarded/CMakeLists.txt
+1 −2 tests/libguarded/atomic_guardedTests.cpp
+6 −13 tests/libguarded/cow_guardedTests.cpp
+16 −28 tests/libguarded/deferred_guardedTests.cpp
+8 −15 tests/libguarded/guardedTests.cpp
+8 −15 tests/libguarded/guarded_optTests.cpp
+8 −14 tests/libguarded/lr_guardedTests.cpp
+21 −37 tests/libguarded/ordered_guardedTests.cpp
+33 −52 tests/libguarded/rcu_guardedTests.cpp
+22 −42 tests/libguarded/shared_guardedTests.cpp
+22 −42 tests/libguarded/shared_guarded_optTests.cpp
2 changes: 1 addition & 1 deletion ThirdParty/containers
Submodule containers updated 38 files
+2 −2 .circleci/config.yml
+78 −52 .clang-format
+1 −0 .codecov.yml
+2 −2 CMakeLists.txt
+11 −14 benchmarks/CircularBufferBenchmarks.cpp
+138 −236 benchmarks/SimpleQueueBenchmarks.cpp
+60 −74 gmlc/containers/AirLock.hpp
+140 −114 gmlc/containers/BlockIterator.hpp
+311 −389 gmlc/containers/BlockingPriorityQueue.hpp
+252 −303 gmlc/containers/BlockingQueue.hpp
+236 −286 gmlc/containers/CircularBuffer.hpp
+268 −324 gmlc/containers/DualMappedPointerVector.hpp
+284 −313 gmlc/containers/DualMappedVector.hpp
+11 −14 gmlc/containers/MapTraits.hpp
+129 −154 gmlc/containers/MappedPointerVector.hpp
+151 −148 gmlc/containers/MappedVector.hpp
+183 −254 gmlc/containers/SimpleQueue.hpp
+509 −496 gmlc/containers/StableBlockDeque.hpp
+311 −337 gmlc/containers/StableBlockVector.hpp
+242 −270 gmlc/containers/StackBuffer.hpp
+338 −382 gmlc/containers/WorkQueue.hpp
+11 −14 gmlc/containers/mapOps.hpp
+6 −6 gmlc/containers/optionalDefinition.hpp
+24 −10 tests/AirLockTests.cpp
+26 −53 tests/BlockingQueueTests.cpp
+69 −13 tests/CircularBufferTests.cpp
+3 −3 tests/DualMappedPointerVectorTests.cpp
+162 −6 tests/DualMappedVectorTests.cpp
+132 −37 tests/DualMappedVectorTestsStable.cpp
+3 −3 tests/MappedPointerVectorTests.cpp
+106 −5 tests/MappedVectorTests.cpp
+23 −17 tests/MappedVectorTestsStable.cpp
+114 −134 tests/PriorityBlockingQueueTests.cpp
+79 −45 tests/SimpleQueueTests.cpp
+287 −52 tests/StableBlockDequeTests.cpp
+311 −29 tests/StableBlockVectorTests.cpp
+40 −7 tests/StackBufferTests.cpp
+14 −31 tests/WorkQueueTests.cpp
8 changes: 3 additions & 5 deletions src/helics/application_api/ValueFederateManager.cpp
Expand Up @@ -61,7 +61,7 @@ Publication& ValueFederateManager::registerPublication(
if (!key.empty()) {
active = pubHandle->insert(key, coreID, fed, coreID, key, type, units);
} else {
active = pubHandle->insert(nullptr, coreID, fed, coreID, key, type, units);
active = pubHandle->insert(no_search, coreID, fed, coreID, key, type, units);
}

if (active) {
Expand All @@ -81,7 +81,7 @@ Input& ValueFederateManager::registerInput(
if (!key.empty()) {
active = inpHandle->insert(key, coreID, fed, coreID, key, units);
} else {
active = inpHandle->insert(nullptr, coreID, fed, coreID, key, units);
active = inpHandle->insert(no_search, coreID, fed, coreID, key, units);
}
if (active) {
auto& ref = inpHandle->back();
Expand Down Expand Up @@ -487,9 +487,7 @@ int ValueFederateManager::getInputCount() const

void ValueFederateManager::clearUpdates()
{
for (auto& inp : inputs.lock()) {
inp.clearUpdate();
}
inputs.lock()->apply([](auto& inp) { inp.clearUpdate(); });
}

void ValueFederateManager::clearUpdate(const Input& inp)
Expand Down
25 changes: 11 additions & 14 deletions src/helics/core/CommonCore.cpp
Expand Up @@ -2119,7 +2119,7 @@ void CommonCore::processPriorityCommand(ActionMessage&& command)
case CMD_REG_FED: {
// this one in the core needs to be the thread-safe version
auto fed = getFederate(command.name);
loopFederates.insert(command.name, nullptr, fed);
loopFederates.insert(command.name, no_search, fed);
}
if (global_broker_id_local != parent_broker_id) {
// forward on to Broker
Expand Down Expand Up @@ -2296,11 +2296,11 @@ void CommonCore::sendErrorToFederates(int error_code)
{
ActionMessage errorCom(CMD_ERROR);
errorCom.messageID = error_code;
for (auto& fed : loopFederates) {
loopFederates.apply([&errorCom](auto& fed) {
if ((fed) && (!fed.disconnected)) {
fed->addAction(errorCom);
}
}
});
}

void CommonCore::transmitDelayedMessages(global_federate_id source)
Expand Down Expand Up @@ -2411,9 +2411,7 @@ void CommonCore::processCommand(ActionMessage&& command)
break;
case CMD_BROADCAST_DISCONNECT: {
timeCoord->processTimeMessage(command);
for (auto& fed : loopFederates) {
fed->addAction(command);
}
loopFederates.apply([&command](auto& fed) { fed->addAction(command); });
checkAndProcessDisconnect();
} break;
case CMD_STOP:
Expand Down Expand Up @@ -2686,9 +2684,7 @@ void CommonCore::processCommand(ActionMessage&& command)
if (brokerState.compare_exchange_strong(
exp, broker_state_t::operating)) { // forward the grant to all federates
organizeFilterOperations();
for (auto& fed : loopFederates) {
fed->addAction(command);
}
loopFederates.apply([&command](auto& fed) { fed->addAction(command); });
timeCoord->enteringExecMode();
auto res = timeCoord->checkExecEntry();
if (res == message_processing_result::next_step) {
Expand Down Expand Up @@ -3176,7 +3172,7 @@ void CommonCore::checkDependencies()
{
bool isobs = false;
bool issource = false;
for (auto& fed : loopFederates) {
auto checkdep = [this, &isobs, &issource](auto& fed) {
if (fed->endpointCount() > 0) {
if (fed->getOptionFlag(defs::flags::observer)) {
timeCoord->removeDependency(fed->global_id);
Expand All @@ -3196,7 +3192,8 @@ void CommonCore::checkDependencies()
issource = true;
}
}
}
};
loopFederates.apply(checkdep);

// if the core has filters we need to be a timeCoordinator
if (hasFilters) {
Expand Down Expand Up @@ -3412,14 +3409,14 @@ void CommonCore::processCommandsForCore(const ActionMessage& cmd)
}
ActionMessage bye(CMD_DISCONNECT_FED_ACK);
bye.source_id = parent_broker_id;
for (auto& fed : loopFederates) {
loopFederates.apply([&bye](auto& fed) {
auto state = fed->getState();
if ((HELICS_FINISHED == state) || (HELICS_ERROR == state)) {
continue;
return;
}
bye.dest_id = fed->global_id.load();
fed->addAction(bye);
}
});

addActionMessage(CMD_STOP);
} else {
Expand Down
52 changes: 39 additions & 13 deletions src/helics/core/CommsInterface.cpp
Expand Up @@ -19,15 +19,7 @@ CommsInterface::CommsInterface(thread_generation threads):
/** destructor*/
CommsInterface::~CommsInterface()
{
std::lock_guard<std::mutex> syncLock(threadSyncLock);
if (!singleThread) {
if (queue_watcher.joinable()) {
queue_watcher.join();
}
}
if (queue_transmitter.joinable()) {
queue_transmitter.join();
}
join_tx_rx_thread();
}

void CommsInterface::loadNetworkInfo(const NetworkBrokerData& netInfo)
Expand Down Expand Up @@ -227,18 +219,28 @@ bool CommsInterface::connect()
logError(std::string("error in transmitter >") + e.what());
}
});
syncLock.unlock();
txTrigger.waitActivation();
rxTrigger.waitActivation();
if (rx_status != connection_status::connected) {
logError("receiver connection failure");
if (tx_status == connection_status::connected) {
syncLock.lock();
if (queue_transmitter.joinable()) {
syncLock.unlock();
closeTransmitter();
queue_transmitter.join();
syncLock.lock();
if (queue_transmitter.joinable()) {
queue_transmitter.join();
}
}
syncLock.unlock();
}
if (!singleThread) {
queue_watcher.join();
syncLock.lock();
if (queue_watcher.joinable()) {
queue_watcher.join();
}
}
return false;
}
Expand All @@ -247,13 +249,22 @@ bool CommsInterface::connect()
logError("transmitter connection failure");
if (!singleThread) {
if (rx_status == connection_status::connected) {
syncLock.lock();
if (queue_watcher.joinable()) {
syncLock.unlock();
closeReceiver();
queue_watcher.join();
syncLock.lock();
if (queue_watcher.joinable()) {
queue_watcher.join();
}
}
syncLock.unlock();
}
}
queue_transmitter.join();
syncLock.lock();
if (queue_transmitter.joinable()) {
queue_transmitter.join();
}
return false;
}
return true;
Expand All @@ -273,6 +284,7 @@ void CommsInterface::disconnect()
if (propertyLock()) {
setRxStatus(connection_status::terminated);
setTxStatus(connection_status::terminated);
join_tx_rx_thread();
return;
}
}
Expand Down Expand Up @@ -335,6 +347,20 @@ void CommsInterface::disconnect()
return;
}
}
join_tx_rx_thread();
}

void CommsInterface::join_tx_rx_thread()
{
std::lock_guard<std::mutex> syncLock(threadSyncLock);
if (!singleThread) {
if (queue_watcher.joinable()) {
queue_watcher.join();
}
}
if (queue_transmitter.joinable()) {
queue_transmitter.join();
}
}

bool CommsInterface::reconnect()
Expand Down
2 changes: 2 additions & 0 deletions src/helics/core/CommsInterface.hpp
Expand Up @@ -164,6 +164,8 @@ class CommsInterface {
these functions should be called in a pair*/
bool propertyLock();
void propertyUnLock();
/** function to join the processing threads*/
void join_tx_rx_thread();

private:
gmlc::concurrency::TripWireDetector
Expand Down
40 changes: 19 additions & 21 deletions src/helics/core/CoreBroker.cpp
Expand Up @@ -262,7 +262,7 @@ void CoreBroker::processPriorityCommand(ActionMessage&& command)
transmit(getRoute(command.source_id), badName);
return;
}
_federates.insert(command.name, nullptr, command.name);
_federates.insert(command.name, no_search, command.name);
_federates.back().route = getRoute(command.source_id);
_federates.back().parent = command.source_id;
if (!isRootc) {
Expand Down Expand Up @@ -376,7 +376,7 @@ void CoreBroker::processPriorityCommand(ActionMessage&& command)
}
return;
}
auto inserted = _brokers.insert(command.name, nullptr, command.name);
auto inserted = _brokers.insert(command.name, no_search, command.name);
if (!inserted) {
route_id newroute;
bool route_created = false;
Expand Down Expand Up @@ -482,11 +482,12 @@ void CoreBroker::processPriorityCommand(ActionMessage&& command)
higher_broker_id = command.source_id;
timeCoord->source_id = global_broker_id_local;
transmitDelayedMessages();
for (auto& brk : _brokers) {
_brokers.apply([localid = global_broker_id_local](auto& brk) {
if (!brk._nonLocal) {
brk.parent = global_broker_id_local;
brk.parent = localid;
}
}
});

timeoutMon->setParentId(higher_broker_id);
timeoutMon->reset();
return;
Expand Down Expand Up @@ -620,34 +621,31 @@ void CoreBroker::transmitDelayedMessages()

void CoreBroker::labelAsDisconnected(global_broker_id brkid)
{
for (auto& brk : _brokers) {
if (brk.parent == brkid) {
brk.isDisconnected = true;
}
}
for (auto& fed : _federates) {
if (fed.parent == brkid) {
fed.isDisconnected = true;
auto disconnect_procedure = [brkid](auto& obj) {
if (obj.parent == brkid) {
obj.isDisconnected = true;
}
}
};
_brokers.apply(disconnect_procedure);
_federates.apply(disconnect_procedure);
}

void CoreBroker::sendDisconnect()
{
ActionMessage bye(CMD_DISCONNECT);
bye.source_id = global_broker_id_local;
for (auto& brk : _brokers) {
_brokers.apply([this, &bye](auto& brk) {
if (!brk.isDisconnected) {
if (brk.parent == global_broker_id_local) {
routeMessage(bye, brk.global_id);
this->routeMessage(bye, brk.global_id);
brk.isDisconnected = true;
}
if (hasTimeDependency) {
timeCoord->removeDependency(brk.global_id);
timeCoord->removeDependent(brk.global_id);
}
}
}
});
if (hasTimeDependency) {
timeCoord->disconnect();
}
Expand Down Expand Up @@ -899,17 +897,17 @@ void CoreBroker::processCommand(ActionMessage&& command)
case CMD_DISCONNECT_BROKER_ACK:
if ((command.dest_id == global_broker_id_local) &&
(command.source_id == higher_broker_id)) {
for (auto& brk : _brokers) {
_brokers.apply([this](auto& brk) {
if (!brk._sent_disconnect_ack) {
ActionMessage dis(
(brk._core) ? CMD_DISCONNECT_CORE_ACK : CMD_DISCONNECT_BROKER_ACK);
dis.source_id = global_broker_id_local;
dis.dest_id = brk.global_id;
transmit(brk.route, dis);
this->transmit(brk.route, dis);
brk._sent_disconnect_ack = true;
removeRoute(brk.route);
this->removeRoute(brk.route);
}
}
});
addActionMessage(CMD_STOP);
}
break;
Expand Down
4 changes: 2 additions & 2 deletions src/helics/core/tcp/TcpHelperClasses.cpp
Expand Up @@ -37,8 +37,8 @@ namespace tcp {
if (!triggerhalt) {
socket_.async_receive(
asio::buffer(data.data() + residBufferSize, data.size() - residBufferSize),
[this](const std::error_code& error, size_t bytes_transferred) {
handle_read(error, bytes_transferred);
[ptr = shared_from_this()](const std::error_code& err, size_t bytes) {
ptr->handle_read(err, bytes);
});
if (triggerhalt) {
// cancel previous operation if triggerhalt is now active
Expand Down
2 changes: 1 addition & 1 deletion src/helics/shared_api_library/.clang-format
Expand Up @@ -19,7 +19,7 @@ AllowShortLoopsOnASingleLine: 'false'
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: 'false'
AlwaysBreakTemplateDeclarations: 'Yes'
AlwaysBreakTemplateDeclarations: 'true'
BinPackArguments: 'false'
BinPackParameters: 'false'
BreakBeforeBraces: Custom
Expand Down
2 changes: 1 addition & 1 deletion tests/helics/system_tests/QueryTests.cpp
Expand Up @@ -451,4 +451,4 @@ TEST_F(query_tests, test_queries_query)
vFed1->finalize();
vFed2->finalize();
helics::cleanupHelicsLibrary();
}
}
2 changes: 1 addition & 1 deletion tests/helics/system_tests/networkTests.cpp
Expand Up @@ -118,4 +118,4 @@ TEST_F(network_tests, test_external_udp_ipv4)
vFed1->finalize();
}

#endif
#endif

0 comments on commit 283b76c

Please sign in to comment.