Skip to content

Commit

Permalink
Merge pull request #16072 from richardkchapman/threadlist
Browse files Browse the repository at this point in the history
HPCC-27580 Revisit the ThreadList

Reviewed-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday committed May 6, 2022
2 parents ddf220d + d9d1f8e commit 9529809
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 109 deletions.
5 changes: 1 addition & 4 deletions dali/base/dadiags.cpp
Expand Up @@ -115,10 +115,7 @@ class CDaliDiagnosticsServer: public IDaliServer, public Thread
StringAttr id;
StringBuffer buf;
params.read(id);
if (0 == stricmp(id,"threads")) {
mb.append(getThreadList(buf).str());
}
else if (0 == stricmp(id, "mpqueue")) {
if (0 == stricmp(id, "mpqueue")) {
mb.append(getReceiveQueueDetails(buf).str());
}
else if (0 == stricmp(id, "locks")) { // Legacy - newer diag clients should use querySDS().getLocks() directly
Expand Down
6 changes: 2 additions & 4 deletions system/jlib/jdebug.cpp
Expand Up @@ -1753,14 +1753,12 @@ class CProcessMonitor
return;
assertex(n);
processes.sort(compare);
StringBuffer name;
ForEachItemIn(i1,processes) {
CProcInfo &pi = processes.item(i1);
if ((pi.delta.system==0)&&(pi.delta.user==0))
break;
getThreadName(pi.pid(),0,name.clear());
str.appendf("\n TT: PI=%d PN=%s PC=%d ST=%d UT=%d%s%s",
pi.pid(),pi.info.cmd,(pi.delta.system+pi.delta.user)*100/tot_time,pi.delta.system,pi.delta.user,name.length()?" TN=":"",name.str());
str.appendf("\n TT: PI=%d PN=%s PC=%d ST=%d UT=%d",
pi.pid(),pi.info.cmd,(pi.delta.system+pi.delta.user)*100/tot_time,pi.delta.system,pi.delta.user);
if (--n==0)
break;
}
Expand Down
7 changes: 1 addition & 6 deletions system/jlib/jexcept.cpp
Expand Up @@ -915,10 +915,7 @@ static void doPrintStackReport( size_t ip, size_t _bp, size_t sp )


StackWalk( ip , _bp);
ModuleWalk();
StringBuffer threadlist;
IERRLOG( "ThreadList:\n%s",getThreadList(threadlist).str());

ModuleWalk();
}


Expand Down Expand Up @@ -1349,8 +1346,6 @@ NO_SANITIZE("alignment") void excsighandler(int signum, siginfo_t *info, void *e

#endif

StringBuffer threadlist;
PROGLOG( "ThreadList:\n%s",getThreadList(threadlist).str());
queryLogMsgManager()->flushQueue(10*1000);

// MCK - really should not return after recv'ing any of these signals
Expand Down
7 changes: 7 additions & 0 deletions system/jlib/jstring.cpp
Expand Up @@ -1436,6 +1436,13 @@ void StringAttr::set(const char * _text)
free(oldtext);
}

void StringAttr::swapWith(StringAttr & other)
{
char * temp = text;
text = other.text;
other.text = temp;
}

void StringAttr::set(const char * _text, size_t _len)
{
char * oldtext = text;
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jstring.hpp
Expand Up @@ -276,7 +276,7 @@ class jlib_decl StringAttr
void setown(StringBuffer & source);
void toLowerCase();
void toUpperCase();

void swapWith(StringAttr & other);
private:
char * text;
};
Expand Down
93 changes: 13 additions & 80 deletions system/jlib/jthread.cpp
Expand Up @@ -117,8 +117,7 @@ void enableThreadSEH() { SEHHandling=true; }
void disableThreadSEH() { SEHHandling=false; } // only prevents new threads from having SEH handler, no mech. for turning off existing threads SEH handling.


static ICopyArrayOf<Thread> ThreadList;
static CriticalSection ThreadListSem;
static std::atomic<unsigned> threadCount;
static size32_t defaultThreadStackSize=0;
static ICopyArrayOf<Thread> ThreadDestroyList;
static CriticalSection ThreadDestroyListLock;
Expand Down Expand Up @@ -347,8 +346,7 @@ void Thread::init(const char *_name)
threadid = 0;
tidlog = 0;
alive = false;
cthreadname.threadname = (NULL == _name) ? NULL : strdup(_name);
ithreadname = &cthreadname;
cthreadname.set(_name);
prioritydelta = 0;
nicelevel = 0;
stacksize = 0; // default is EXE default stack size (set by /STACK)
Expand Down Expand Up @@ -436,9 +434,6 @@ void Thread::startRelease()
IERRLOG("pthread_create returns %d",status);
PrintStackReport();
PrintMemoryReport();
StringBuffer s;
getThreadList(s);
IERRLOG("Running threads:\n %s",s.str());
throw makeOsException(status);
}
unsigned retryCount = 10;
Expand All @@ -454,12 +449,7 @@ void Thread::startRelease()
alive = true;
if (prioritydelta)
adjustPriority(prioritydelta);

{
CriticalBlock block(ThreadListSem);
ThreadList.zap(*this); // just in case restarting
ThreadList.append(*this);
}
threadCount++;
#ifdef _WIN32
DWORD count = ResumeThread(hThread);
assertex(count == 1);
Expand Down Expand Up @@ -520,7 +510,6 @@ bool Thread::join(unsigned timeout)

Thread::~Thread()
{
ithreadname = &cthreadname; // safer (as derived classes destroyed)
#ifdef _DEBUG
if (alive) {
if (!stopped.wait(0)) { // see if fell out of threadmain and signal stopped
Expand All @@ -531,54 +520,15 @@ Thread::~Thread()
}
#endif
Link();

// DBGLOG("Thread %x (%s) destroyed\n", threadid, threadname);
{
CriticalBlock block(ThreadListSem);
ThreadList.zap(*this);
}
free(cthreadname.threadname);
cthreadname.threadname = NULL;
threadCount--;
// DBGLOG("Thread %x (%s) destroyed\n", threadid, cthreadname.str());
}

unsigned getThreadCount()
{
CriticalBlock block(ThreadListSem);
return ThreadList.ordinality();
}

StringBuffer & getThreadList(StringBuffer &str)
{
CriticalBlock block(ThreadListSem);
ForEachItemIn(i,ThreadList) {
Thread &item=ThreadList.item(i);
item.getInfo(str).append("\n");
}
return str;
return threadCount;
}

StringBuffer &getThreadName(int thandle,unsigned tid,StringBuffer &name)
{
CriticalBlock block(ThreadListSem);
bool found=false;
ForEachItemIn(i,ThreadList) {
Thread &item=ThreadList.item(i);
int h;
unsigned t;
const char *s = item.getLogInfo(h,t);
if (s&&*s&&((thandle==0)||(h==thandle))&&((tid==0)||(t==tid))) {
if (found) {
name.clear();
break; // only return if unambiguous
}
name.append(s);
found = true;
}
}
return name;
}


// CThreadedPersistent

CThreadedPersistent::CThreadedPersistent(const char *name, IThreaded *_owner) : athread(*this, name), owner(_owner), state(s_ready)
Expand Down Expand Up @@ -914,7 +864,7 @@ class CPooledThreadWrapper: public Thread
IPooledThread *thread;
Semaphore sem;
CThreadPoolBase &parent;
char *runningname;
StringAttr runningName;
public:
CPooledThreadWrapper(CThreadPoolBase &_parent,
PooledThreadHandle _handle,
Expand All @@ -923,16 +873,15 @@ class CPooledThreadWrapper: public Thread
{
thread = _thread;
handle = _handle;
runningname = strdup(_parent.poolname);
runningName.set(_parent.poolname);
}

~CPooledThreadWrapper()
{
thread->Release();
free(runningname);
}

void setName(const char *name) { free(runningname); runningname=strdup(name); }
void setName(const char *name) { runningName.set(name); }
void setHandle(PooledThreadHandle _handle) { handle = _handle; }
PooledThreadHandle queryHandle() { return handle; }
IPooledThread &queryThread() { return *thread; }
Expand Down Expand Up @@ -965,30 +914,19 @@ class CPooledThreadWrapper: public Thread
parent.notifyStarted(this);
try
{
char *&threadname = cthreadname.threadname;
char *temp = threadname; // swap running name and threadname
threadname = runningname;
runningname = temp;
cthreadname.swapWith(runningName); // swap running name and threadname
thread->threadmain();
temp = threadname; // and back
threadname = runningname;
runningname = temp;
cthreadname.swapWith(runningName); // swap back
}
catch (IException *e)
{
char *&threadname = cthreadname.threadname;
char *temp = threadname; // swap back
threadname = runningname;
runningname = temp;
cthreadname.swapWith(runningName); // swap back
handleException(e);
}
#ifndef NO_CATCHALL
catch (...)
{
char *&threadname = cthreadname.threadname;
char *temp = threadname; // swap back
threadname = runningname;
runningname = temp;
cthreadname.swapWith(runningName); // swap back
handleException(MakeStringException(0, "Unknown exception in Thread from pool %s", parent.poolname.get()));
}
#endif
Expand Down Expand Up @@ -1206,11 +1144,6 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
return _start(param, NULL, true);
}

PooledThreadHandle startNoBlock(void *param,const char *name)
{
return _start(param, name, true);
}

PooledThreadHandle start(void *param)
{
return _start(param, NULL, false);
Expand Down
16 changes: 2 additions & 14 deletions system/jlib/jthread.hpp
Expand Up @@ -95,12 +95,7 @@ class jlib_decl Thread : public CInterface, public IThread
void adjustNiceLevel();

protected:
struct cThreadName: implements IThreadName
{
char *threadname;
const char *get() { return threadname; }
} cthreadname;
IThreadName *ithreadname;
StringAttr cthreadname;
public:
#ifndef _WIN32
Semaphore suspend;
Expand All @@ -118,7 +113,7 @@ class jlib_decl Thread : public CInterface, public IThread
bool isCurrentThread() const;
void setNice(int nicelevel);
void setStackSize(size32_t size); // required stack size in bytes - called before start() (obviously)
const char *getName() { const char *ret = ithreadname?ithreadname->get():NULL; return ret?ret:"unknown"; }
const char *getName() { return cthreadname.isEmpty() ? "unknown" : cthreadname.str(); }
bool isAlive() { return alive; }
bool join(unsigned timeout=INFINITE);

Expand All @@ -140,10 +135,6 @@ class jlib_decl Thread : public CInterface, public IThread

// run method not implemented - concrete derived classes must do so
static void setDefaultStackSize(size32_t size); // NB under windows requires linker setting (/stack:)

IThreadName *queryThreadName() { return ithreadname; }
void setThreadName(IThreadName *name) { ithreadname = name; }

};

interface IThreaded
Expand Down Expand Up @@ -275,7 +266,6 @@ interface IThreadPool : extends IInterface
virtual IPooledThreadIterator *running()=0; // return an iterator for all currently running threads
virtual unsigned runningCount()=0; // number of currently running threads
virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception
virtual PooledThreadHandle startNoBlock(void *param,const char *name)=0; // starts a new thread if it can do so without blocking, else throws exception
virtual void setStartDelayTracing(unsigned secs) = 0; // set start delay tracing period
virtual bool waitAvailable(unsigned timeout) = 0; // wait until a pool member is available
};
Expand All @@ -291,9 +281,7 @@ extern jlib_decl IThreadPool *createThreadPool(
unsigned targetpoolsize=0 // target maximum size of pool (default same as defaultmax)
);

extern jlib_decl StringBuffer &getThreadList(StringBuffer &str);
extern jlib_decl unsigned getThreadCount();
extern jlib_decl StringBuffer &getThreadName(int thandle,unsigned logtid,StringBuffer &name); // either thandle or tid should be 0

// Simple pipe process support
interface ISimpleReadStream;
Expand Down

0 comments on commit 9529809

Please sign in to comment.