Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-27580 Revisit the ThreadList #16072

Merged
merged 3 commits into from May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 1 addition & 4 deletions dali/base/dadiags.cpp
Expand Up @@ -115,10 +115,7 @@ class CDaliDiagnosticsServer: public IDaliServer, public Thread
StringAttr id;
StringBuffer buf;
params.read(id);
if (0 == stricmp(id,"threads")) {
mb.append(getThreadList(buf).str());
}
else if (0 == stricmp(id, "mpqueue")) {
if (0 == stricmp(id, "mpqueue")) {
mb.append(getReceiveQueueDetails(buf).str());
}
else if (0 == stricmp(id, "locks")) { // Legacy - newer diag clients should use querySDS().getLocks() directly
Expand Down
6 changes: 2 additions & 4 deletions system/jlib/jdebug.cpp
Expand Up @@ -1753,14 +1753,12 @@ class CProcessMonitor
return;
assertex(n);
processes.sort(compare);
StringBuffer name;
ForEachItemIn(i1,processes) {
CProcInfo &pi = processes.item(i1);
if ((pi.delta.system==0)&&(pi.delta.user==0))
break;
getThreadName(pi.pid(),0,name.clear());
str.appendf("\n TT: PI=%d PN=%s PC=%d ST=%d UT=%d%s%s",
pi.pid(),pi.info.cmd,(pi.delta.system+pi.delta.user)*100/tot_time,pi.delta.system,pi.delta.user,name.length()?" TN=":"",name.str());
str.appendf("\n TT: PI=%d PN=%s PC=%d ST=%d UT=%d",
pi.pid(),pi.info.cmd,(pi.delta.system+pi.delta.user)*100/tot_time,pi.delta.system,pi.delta.user);
if (--n==0)
break;
}
Expand Down
7 changes: 1 addition & 6 deletions system/jlib/jexcept.cpp
Expand Up @@ -915,10 +915,7 @@ static void doPrintStackReport( size_t ip, size_t _bp, size_t sp )


StackWalk( ip , _bp);
ModuleWalk();
StringBuffer threadlist;
IERRLOG( "ThreadList:\n%s",getThreadList(threadlist).str());

ModuleWalk();
}


Expand Down Expand Up @@ -1349,8 +1346,6 @@ NO_SANITIZE("alignment") void excsighandler(int signum, siginfo_t *info, void *e

#endif

StringBuffer threadlist;
PROGLOG( "ThreadList:\n%s",getThreadList(threadlist).str());
queryLogMsgManager()->flushQueue(10*1000);

// MCK - really should not return after recv'ing any of these signals
Expand Down
7 changes: 7 additions & 0 deletions system/jlib/jstring.cpp
Expand Up @@ -1436,6 +1436,13 @@ void StringAttr::set(const char * _text)
free(oldtext);
}

void StringAttr::swapWith(StringAttr & other)
{
char * temp = text;
text = other.text;
other.text = temp;
}

void StringAttr::set(const char * _text, size_t _len)
{
char * oldtext = text;
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jstring.hpp
Expand Up @@ -276,7 +276,7 @@ class jlib_decl StringAttr
void setown(StringBuffer & source);
void toLowerCase();
void toUpperCase();

void swapWith(StringAttr & other);
private:
char * text;
};
Expand Down
93 changes: 13 additions & 80 deletions system/jlib/jthread.cpp
Expand Up @@ -117,8 +117,7 @@ void enableThreadSEH() { SEHHandling=true; }
void disableThreadSEH() { SEHHandling=false; } // only prevents new threads from having SEH handler, no mech. for turning off existing threads SEH handling.


static ICopyArrayOf<Thread> ThreadList;
static CriticalSection ThreadListSem;
static std::atomic<unsigned> threadCount;
static size32_t defaultThreadStackSize=0;
static ICopyArrayOf<Thread> ThreadDestroyList;
static CriticalSection ThreadDestroyListLock;
Expand Down Expand Up @@ -347,8 +346,7 @@ void Thread::init(const char *_name)
threadid = 0;
tidlog = 0;
alive = false;
cthreadname.threadname = (NULL == _name) ? NULL : strdup(_name);
ithreadname = &cthreadname;
cthreadname.set(_name);
prioritydelta = 0;
nicelevel = 0;
stacksize = 0; // default is EXE default stack size (set by /STACK)
Expand Down Expand Up @@ -436,9 +434,6 @@ void Thread::startRelease()
IERRLOG("pthread_create returns %d",status);
PrintStackReport();
PrintMemoryReport();
StringBuffer s;
getThreadList(s);
IERRLOG("Running threads:\n %s",s.str());
throw makeOsException(status);
}
unsigned retryCount = 10;
Expand All @@ -454,12 +449,7 @@ void Thread::startRelease()
alive = true;
if (prioritydelta)
adjustPriority(prioritydelta);

{
CriticalBlock block(ThreadListSem);
ThreadList.zap(*this); // just in case restarting
ThreadList.append(*this);
}
threadCount++;
#ifdef _WIN32
DWORD count = ResumeThread(hThread);
assertex(count == 1);
Expand Down Expand Up @@ -520,7 +510,6 @@ bool Thread::join(unsigned timeout)

Thread::~Thread()
{
ithreadname = &cthreadname; // safer (as derived classes destroyed)
#ifdef _DEBUG
if (alive) {
if (!stopped.wait(0)) { // see if fell out of threadmain and signal stopped
Expand All @@ -531,54 +520,15 @@ Thread::~Thread()
}
#endif
Link();

// DBGLOG("Thread %x (%s) destroyed\n", threadid, threadname);
{
CriticalBlock block(ThreadListSem);
ThreadList.zap(*this);
}
free(cthreadname.threadname);
cthreadname.threadname = NULL;
threadCount--;
// DBGLOG("Thread %x (%s) destroyed\n", threadid, cthreadname.str());
}

unsigned getThreadCount()
{
CriticalBlock block(ThreadListSem);
return ThreadList.ordinality();
}

StringBuffer & getThreadList(StringBuffer &str)
{
CriticalBlock block(ThreadListSem);
ForEachItemIn(i,ThreadList) {
Thread &item=ThreadList.item(i);
item.getInfo(str).append("\n");
}
return str;
return threadCount;
}

StringBuffer &getThreadName(int thandle,unsigned tid,StringBuffer &name)
{
CriticalBlock block(ThreadListSem);
bool found=false;
ForEachItemIn(i,ThreadList) {
Thread &item=ThreadList.item(i);
int h;
unsigned t;
const char *s = item.getLogInfo(h,t);
if (s&&*s&&((thandle==0)||(h==thandle))&&((tid==0)||(t==tid))) {
if (found) {
name.clear();
break; // only return if unambiguous
}
name.append(s);
found = true;
}
}
return name;
}


// CThreadedPersistent

CThreadedPersistent::CThreadedPersistent(const char *name, IThreaded *_owner) : athread(*this, name), owner(_owner), state(s_ready)
Expand Down Expand Up @@ -914,7 +864,7 @@ class CPooledThreadWrapper: public Thread
IPooledThread *thread;
Semaphore sem;
CThreadPoolBase &parent;
char *runningname;
StringAttr runningName;
public:
CPooledThreadWrapper(CThreadPoolBase &_parent,
PooledThreadHandle _handle,
Expand All @@ -923,16 +873,15 @@ class CPooledThreadWrapper: public Thread
{
thread = _thread;
handle = _handle;
runningname = strdup(_parent.poolname);
runningName.set(_parent.poolname);
}

~CPooledThreadWrapper()
{
thread->Release();
free(runningname);
}

void setName(const char *name) { free(runningname); runningname=strdup(name); }
void setName(const char *name) { runningName.set(name); }
void setHandle(PooledThreadHandle _handle) { handle = _handle; }
PooledThreadHandle queryHandle() { return handle; }
IPooledThread &queryThread() { return *thread; }
Expand Down Expand Up @@ -965,30 +914,19 @@ class CPooledThreadWrapper: public Thread
parent.notifyStarted(this);
try
{
char *&threadname = cthreadname.threadname;
char *temp = threadname; // swap running name and threadname
threadname = runningname;
runningname = temp;
cthreadname.swapWith(runningName); // swap running name and threadname
thread->threadmain();
temp = threadname; // and back
threadname = runningname;
runningname = temp;
cthreadname.swapWith(runningName); // swap back
}
catch (IException *e)
{
char *&threadname = cthreadname.threadname;
char *temp = threadname; // swap back
threadname = runningname;
runningname = temp;
cthreadname.swapWith(runningName); // swap back
handleException(e);
}
#ifndef NO_CATCHALL
catch (...)
{
char *&threadname = cthreadname.threadname;
char *temp = threadname; // swap back
threadname = runningname;
runningname = temp;
cthreadname.swapWith(runningName); // swap back
handleException(MakeStringException(0, "Unknown exception in Thread from pool %s", parent.poolname.get()));
}
#endif
Expand Down Expand Up @@ -1206,11 +1144,6 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
return _start(param, NULL, true);
}

PooledThreadHandle startNoBlock(void *param,const char *name)
{
return _start(param, name, true);
}

PooledThreadHandle start(void *param)
{
return _start(param, NULL, false);
Expand Down
16 changes: 2 additions & 14 deletions system/jlib/jthread.hpp
Expand Up @@ -95,12 +95,7 @@ class jlib_decl Thread : public CInterface, public IThread
void adjustNiceLevel();

protected:
struct cThreadName: implements IThreadName
{
char *threadname;
const char *get() { return threadname; }
} cthreadname;
IThreadName *ithreadname;
StringAttr cthreadname;
public:
#ifndef _WIN32
Semaphore suspend;
Expand All @@ -118,7 +113,7 @@ class jlib_decl Thread : public CInterface, public IThread
bool isCurrentThread() const;
void setNice(int nicelevel);
void setStackSize(size32_t size); // required stack size in bytes - called before start() (obviously)
const char *getName() { const char *ret = ithreadname?ithreadname->get():NULL; return ret?ret:"unknown"; }
const char *getName() { return cthreadname.isEmpty() ? "unknown" : cthreadname.str(); }
bool isAlive() { return alive; }
bool join(unsigned timeout=INFINITE);

Expand All @@ -140,10 +135,6 @@ class jlib_decl Thread : public CInterface, public IThread

// run method not implemented - concrete derived classes must do so
static void setDefaultStackSize(size32_t size); // NB under windows requires linker setting (/stack:)

IThreadName *queryThreadName() { return ithreadname; }
void setThreadName(IThreadName *name) { ithreadname = name; }

};

interface IThreaded
Expand Down Expand Up @@ -275,7 +266,6 @@ interface IThreadPool : extends IInterface
virtual IPooledThreadIterator *running()=0; // return an iterator for all currently running threads
virtual unsigned runningCount()=0; // number of currently running threads
virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception
virtual PooledThreadHandle startNoBlock(void *param,const char *name)=0; // starts a new thread if it can do so without blocking, else throws exception
virtual void setStartDelayTracing(unsigned secs) = 0; // set start delay tracing period
virtual bool waitAvailable(unsigned timeout) = 0; // wait until a pool member is available
};
Expand All @@ -291,9 +281,7 @@ extern jlib_decl IThreadPool *createThreadPool(
unsigned targetpoolsize=0 // target maximum size of pool (default same as defaultmax)
);

extern jlib_decl StringBuffer &getThreadList(StringBuffer &str);
extern jlib_decl unsigned getThreadCount();
extern jlib_decl StringBuffer &getThreadName(int thandle,unsigned logtid,StringBuffer &name); // either thandle or tid should be 0

// Simple pipe process support
interface ISimpleReadStream;
Expand Down