Skip to content

Commit

Permalink
enh(#30): Add ability for ARGoS to pin threads to cores
Browse files Browse the repository at this point in the history
  • Loading branch information
jharwell committed May 18, 2021
1 parent 7c8a51b commit 3fc27a4
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 127 deletions.
2 changes: 2 additions & 0 deletions src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ set(ARGOS3_HEADERS_SIMULATOR_SPACE
simulator/space/space.h
simulator/space/space_multi_thread_balance_length.h
simulator/space/space_multi_thread_balance_quantity.h
simulator/space/space_multi_thread.h
simulator/space/space_no_threads.h)
# argos3/core/wrappers/lua
set(ARGOS3_HEADERS_WRAPPERS_LUA
Expand Down Expand Up @@ -197,6 +198,7 @@ if(ARGOS_BUILD_FOR_SIMULATOR)
simulator/space/space.cpp
simulator/space/space_multi_thread_balance_length.cpp
simulator/space/space_multi_thread_balance_quantity.cpp
simulator/space/space_multi_thread.cpp
simulator/space/space_no_threads.cpp)
else(ARGOS_BUILD_FOR_SIMULATOR)
# Real-robot only code
Expand Down
72 changes: 35 additions & 37 deletions src/core/simulator/simulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ namespace argos {
m_pcLoopFunctions(nullptr),
m_unMaxSimulationClock(0),
m_bWasRandomSeedSet(false),
m_unThreads(0),
m_pcProfiler(nullptr),
m_bHumanReadableProfile(true),
m_bRealTimeClock(false),
Expand Down Expand Up @@ -304,42 +303,8 @@ namespace argos {

void CSimulator::InitFramework(TConfigurationNode& t_tree) {
try {
/* Parse the 'system' node */
if(NodeExists(t_tree, "system")) {
TConfigurationNode tSystem;
tSystem = GetNode(t_tree, "system");
GetNodeAttributeOrDefault(tSystem, "threads", m_unThreads, m_unThreads);
if(m_unThreads == 0) {
LOG << "[INFO] Not using threads" << std::endl;
m_pcSpace = new CSpaceNoThreads();
}
else {
LOG << "[INFO] Using " << m_unThreads << " parallel threads" << std::endl;
std::string strThreadingMethod = "balance_quantity";
GetNodeAttributeOrDefault(tSystem, "method", strThreadingMethod, strThreadingMethod);
if(strThreadingMethod == "balance_quantity") {
LOG << "[INFO] Chosen method \"balance_quantity\": threads will be assigned the same"
<< std::endl
<< "[INFO] number of tasks, independently of the task length."
<< std::endl;
m_pcSpace = new CSpaceMultiThreadBalanceQuantity();
}
else if(strThreadingMethod == "balance_length") {
LOG << "[INFO] Chosen method \"balance_length\": threads will be assigned different"
<< std::endl
<< "[INFO] numbers of tasks, depending on the task length."
<< std::endl;
m_pcSpace = new CSpaceMultiThreadBalanceLength();
}
else {
THROW_ARGOSEXCEPTION("Error parsing the <system> tag. Unknown threading method \"" << strThreadingMethod << "\". Available methods: \"balance_quantity\" and \"balance_length\".");
}
}
}
else {
LOG << "[INFO] Not using threads" << std::endl;
m_pcSpace = new CSpaceNoThreads();
}
/* Parse the 'system' node and initialize */
InitFrameworkSystem(t_tree);
/* Get 'experiment' node */
TConfigurationNode tExperiment;
tExperiment = GetNode(t_tree, "experiment");
Expand Down Expand Up @@ -413,6 +378,39 @@ namespace argos {
THROW_ARGOSEXCEPTION_NESTED("Failed to initialize the simulator. Parse error inside the <framework> tag.", ex);
}
}
/****************************************/
/****************************************/

void CSimulator::InitFrameworkSystem(TConfigurationNode& t_tree) {
if(NodeExists(t_tree, "system")) {
TConfigurationNode tSystem;
tSystem = GetNode(t_tree, "system");
GetNodeAttributeOrDefault(tSystem, "threads", m_unThreads, m_unThreads);

if(m_unThreads == 0) {
m_pcSpace = new CSpaceNoThreads();
}
else {
bool bPinThreadsToCores = false;
GetNodeAttributeOrDefault(tSystem, "pin_threads_to_cores", bPinThreadsToCores, bPinThreadsToCores);
std::string strThreadingMethod = "balance_quantity";
GetNodeAttributeOrDefault(tSystem, "method", strThreadingMethod, strThreadingMethod);

if(strThreadingMethod == "balance_quantity") {
m_pcSpace = new CSpaceMultiThreadBalanceQuantity(m_unThreads, bPinThreadsToCores);
}
else if(strThreadingMethod == "balance_length") {
m_pcSpace = new CSpaceMultiThreadBalanceLength(m_unThreads, bPinThreadsToCores);
}
else {
THROW_ARGOSEXCEPTION("Error parsing the <system> tag. Unknown threading method \"" << strThreadingMethod << "\". Available methods: \"balance_quantity\" and \"balance_length\".");
}
}
}
else {
m_pcSpace = new CSpaceNoThreads();
}
}

/****************************************/
/****************************************/
Expand Down
1 change: 1 addition & 0 deletions src/core/simulator/simulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ namespace argos {
private:

void InitFramework(TConfigurationNode& t_tree);
void InitFrameworkSystem(TConfigurationNode& t_tree);
void InitLoopFunctions(TConfigurationNode& t_tree);
void InitControllers(TConfigurationNode& t_tree);
void InitSpace(TConfigurationNode& t_tree);
Expand Down
78 changes: 78 additions & 0 deletions src/core/simulator/space/space_multi_thread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* @file <argos3/core/simulator/space/space_multi_thread.cpp>
*
* @author John Harwell - <john.r.harwell@gmail.com>
*/

#include <argos3/core/simulator/space/space_multi_thread.h>

namespace argos {

/****************************************/
/****************************************/

CSpaceMultiThread::CSpaceMultiThread(UInt32 un_n_threads,
bool b_pin_threads_to_cores) :
m_bPinThreadsToCores(b_pin_threads_to_cores),
m_vecThreads(un_n_threads) {
LOG << "[INFO] Using " << GetNumThreads() << " parallel threads" << std::endl;
if (m_bPinThreadsToCores) {
LOG << "[INFO] Pinning threads to cores by thread ID" << std::endl;
}
}

/****************************************/
/****************************************/
void CSpaceMultiThread::DestroyAllThreads(void) {
int nErrors;
if (!m_vecThreads.empty()) {
for(UInt32 i = 0; i < GetNumThreads(); ++i) {
if((nErrors = pthread_cancel(m_vecThreads[i]))) {
THROW_ARGOSEXCEPTION("Error canceling threads " << ::strerror(nErrors));
}
}
auto** ppJoinResult = new void*[GetNumThreads()];
for(UInt32 i = 0; i < GetNumThreads(); ++i) {
if((nErrors = pthread_join(m_vecThreads[i], ppJoinResult + i))) {
THROW_ARGOSEXCEPTION("Error joining threads " << ::strerror(nErrors));
}
if(ppJoinResult[i] != PTHREAD_CANCELED) {
LOGERR << "[WARNING] Thread #" << i<< " not canceled" << std::endl;
}
}
delete[] ppJoinResult;
}
}

/****************************************/
/****************************************/

void CSpaceMultiThread::CreateSingleThread(UInt32 unThreadId,
void *(*start_routine) (void *),
void *arg) {
if (unThreadId > GetNumThreads()) {
THROW_ARGOSEXCEPTION("Cannot create thread " << unThreadId << ": Id out of range");
}
/* Create the thread */
int nErrors = pthread_create(&m_vecThreads[unThreadId],
nullptr,
start_routine,
arg);
if(nErrors != 0) {
THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
}
/* pin thread i to core i */
if (m_bPinThreadsToCores) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(unThreadId, &cpuset);
int rc = pthread_setaffinity_np(m_vecThreads[unThreadId],
sizeof(cpu_set_t),
&cpuset);
if (rc != 0) {
THROW_ARGOSEXCEPTION("Error setting thread " << unThreadId << " affinity: " << ::strerror(rc));
}
}
}

}
71 changes: 71 additions & 0 deletions src/core/simulator/space/space_multi_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* @file <argos3/core/simulator/space/space_multi_thread.h>
*
* @brief This file provides the definition of the Swarmanoid 3D Space with
* threads.
*
* @author John Harwell - <john.r.harwell@gmail.com>
*/

#ifndef INCLUDE_SPACE_MULTI_THREAD_H_
#define INCLUDE_SPACE_MULTI_THREAD_H_

#include <vector>

namespace argos {
class CSpace;
}

#include <argos3/core/simulator/space/space.h>

namespace argos {

/**
* @brief Base class for common threading functionality used by @ref
* CSpaceMultithreadBalanceLength, @ref CSpaceMultithreadBalanceQuantity.
*/
class CSpaceMultiThread : public CSpace {
public:

CSpaceMultiThread(UInt32 un_n_threads, bool b_pin_threads);

/**
* @brief Return the # of configured threads. If threads were not
* specified, returns 0.
*/
inline UInt32 GetNumThreads() const { return m_vecThreads.size(); }

protected:

/**
* @brief After ARGoS finishes running the experiment, dispose of all
* threads, which are no longer needed.
*/
void DestroyAllThreads();

/**
* @brief During initialization, create a single thread.
*
* @param unThreadId The index/ID of the thread.
* @param start_routine Pointer to the routine that the thread should start
* execution in.
* @param arg The single argument which will be passed to @p
* start_routine. Can be NULL.
*/

void CreateSingleThread(UInt32 unThreadId,
void *(*start_routine) (void *),
void *arg);

private:

/** Should threads be pinned to cores ? */
bool m_bPinThreadsToCores;

/** The slave thread array */
std::vector<pthread_t> m_vecThreads;
};

}

#endif /* INCLUDE_SPACE_MULTI_THREAD_H_ */
78 changes: 34 additions & 44 deletions src/core/simulator/space/space_multi_thread_balance_length.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ namespace argos {
/****************************************/
/****************************************/

CSpaceMultiThreadBalanceLength::CSpaceMultiThreadBalanceLength(UInt32 un_n_threads,
bool b_pin_threads_to_cores) :
CSpaceMultiThread(un_n_threads, b_pin_threads_to_cores) {
LOG << "[INFO] Chosen method \"balance_length\": threads will be assigned different"
<< std::endl
<< "[INFO] numbers of tasks, depending on the task length."
<< std::endl;
}

/****************************************/
/****************************************/

void CSpaceMultiThreadBalanceLength::Init(TConfigurationNode& t_tree) {
/* Initialize the space */
CSpace::Init(t_tree);
Expand All @@ -88,11 +100,11 @@ namespace argos {
THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors));
}
/* Reset the idle thread count */
m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unMediaPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unEntityIterPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unSenseControlPhaseIdleCounter = GetNumThreads();
m_unActPhaseIdleCounter = GetNumThreads();
m_unPhysicsPhaseIdleCounter = GetNumThreads();
m_unMediaPhaseIdleCounter = GetNumThreads();
m_unEntityIterPhaseIdleCounter = GetNumThreads();
/* Start threads */
StartThreads();
}
Expand All @@ -101,29 +113,12 @@ namespace argos {
/****************************************/

void CSpaceMultiThreadBalanceLength::Destroy() {
/* Destroy the threads to update the controllable entities */
int nErrors;
if(m_ptThreads != nullptr) {
for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
if((nErrors = pthread_cancel(m_ptThreads[i]))) {
THROW_ARGOSEXCEPTION("Error canceling threads " << ::strerror(nErrors));
}
}
auto** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()];
for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
if((nErrors = pthread_join(m_ptThreads[i], ppJoinResult + i))) {
THROW_ARGOSEXCEPTION("Error joining threads " << ::strerror(nErrors));
}
if(ppJoinResult[i] != PTHREAD_CANCELED) {
LOGERR << "[WARNING] Thread #" << i<< " not canceled" << std::endl;
}
}
delete[] ppJoinResult;
}
delete[] m_ptThreads;
/* Destroy the threads */
DestroyAllThreads();

/* Destroy the thread launch info */
if(m_psThreadData != nullptr) {
for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
for(UInt32 i = 0; i < GetNumThreads(); ++i) {
delete m_psThreadData[i];
}
}
Expand Down Expand Up @@ -151,11 +146,11 @@ namespace argos {

void CSpaceMultiThreadBalanceLength::Update() {
/* Reset the idle thread count */
m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unMediaPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unEntityIterPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
m_unSenseControlPhaseIdleCounter = GetNumThreads();
m_unActPhaseIdleCounter = GetNumThreads();
m_unPhysicsPhaseIdleCounter = GetNumThreads();
m_unMediaPhaseIdleCounter = GetNumThreads();
m_unEntityIterPhaseIdleCounter = GetNumThreads();
/* Update the space */
CSpace::Update();
}
Expand All @@ -172,7 +167,7 @@ namespace argos {

#define MAIN_WAIT_FOR_END_OF(PHASE) \
pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
while(m_un ## PHASE ## PhaseIdleCounter < CSimulator::GetInstance().GetNumThreads()) { \
while(m_un ## PHASE ## PhaseIdleCounter < GetNumThreads()) { \
pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
} \
pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);
Expand Down Expand Up @@ -232,20 +227,15 @@ namespace argos {
/****************************************/

void CSpaceMultiThreadBalanceLength::StartThreads() {
int nErrors;
/* Create the threads to update the controllable entities */
m_ptThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads()];
m_psThreadData = new SThreadLaunchData*[CSimulator::GetInstance().GetNumThreads()];
for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
m_psThreadData = new SThreadLaunchData*[GetNumThreads()];
/* Create the threads */
for(UInt32 i = 0; i < GetNumThreads(); ++i) {
/* Create the struct with the info to launch the thread */
m_psThreadData[i] = new SThreadLaunchData(i, this);
/* Create the thread */
if((nErrors = pthread_create(m_ptThreads + i,
nullptr,
LaunchThreadBalanceLength,
reinterpret_cast<void*>(m_psThreadData[i])))) {
THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
}
CreateSingleThread(i,
LaunchThreadBalanceLength,
reinterpret_cast<void*>(m_psThreadData[i]));
}
}

Expand All @@ -254,7 +244,7 @@ namespace argos {

#define THREAD_WAIT_FOR_START_OF(PHASE) \
pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
while(m_un ## PHASE ## PhaseIdleCounter == CSimulator::GetInstance().GetNumThreads()) { \
while(m_un ## PHASE ## PhaseIdleCounter == GetNumThreads()) { \
pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
} \
pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); \
Expand Down
Loading

0 comments on commit 3fc27a4

Please sign in to comment.