Skip to content

Commit

Permalink
Merge branch 'candidate-5.2.4' into candidate-5.4.0
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information
richardkchapman committed May 18, 2015
2 parents 04569ca + f905ee6 commit 771aca1
Show file tree
Hide file tree
Showing 46 changed files with 820 additions and 275 deletions.
1 change: 1 addition & 0 deletions common/deftype/deftype.hpp
Expand Up @@ -128,6 +128,7 @@ enum typemod_t
typemod_outofline = 8,
typemod_attr = 9,
typemod_indirect = 10, // type definition needs to go via an ecl definition
typemod_mutable = 11,
typemod_max
};

Expand Down
2 changes: 2 additions & 0 deletions common/remote/remoteerr.hpp
Expand Up @@ -58,6 +58,7 @@
#define RFSERR_NoConnectSlave 8045
#define RFSERR_NoConnectSlaveXY 8046
#define RFSERR_VersionMismatch 8047
#define RFSERR_SetThrottleFailed 8048

//---- Text for all errors (make it easy to internationalise) ---------------------------

Expand All @@ -67,6 +68,7 @@
#define RFSERR_TimeoutFileIOHandle_Text "Remote fileio has been closed because of timeout"
#define RFSERR_MasterSeemsToHaveDied_Text "Master program seems to have died..."
#define RFSERR_VersionMismatch_Text "Slave version does not match, expected %d got %d"
#define RFSERR_SetThrottleFailed_Text "Failed to set throttle limit"

#define RFSERR_TimeoutWaitSlave_Text "Timeout waiting for slave %s to respond"
#define RFSERR_TimeoutWaitConnect_Text "Timeout waiting to connect to slave %s"
Expand Down
18 changes: 18 additions & 0 deletions common/remote/rmtfile.cpp
Expand Up @@ -504,6 +504,24 @@ extern REMOTE_API int setDafileSvrTraceFlags(const SocketEndpoint &_ep,byte flag
return -2;
}

extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit)
{
SocketEndpoint ep(_ep);
setDafsEndpointPort(ep);
if (ep.isNull())
return -3;
try {
Owned<ISocket> socket = ISocket::connect_wait(ep,5000);
return setDafsThrottleLimit(socket, throttleLimit, throttleDelayMs, throttleCPULimit);
}
catch (IException *e)
{
EXCLOG(e,"setDafileSvrThrottleLimit");
e->Release();
}
return -2;
}

extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &_ep,StringBuffer &retstr)
{
SocketEndpoint ep(_ep);
Expand Down
1 change: 1 addition & 0 deletions common/remote/rmtfile.hpp
Expand Up @@ -68,6 +68,7 @@ extern REMOTE_API int remoteExec(const SocketEndpoint &ep,const char *cmdline, c
extern REMOTE_API void remoteExtractBlobElements(const char * prefix, const RemoteFilename &file, ExtractedBlobArray & extracted);

extern REMOTE_API int setDafileSvrTraceFlags(const SocketEndpoint &ep,byte flags);
extern REMOTE_API int setDafileSvrThrottleLimit(const SocketEndpoint &_ep, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit);
extern REMOTE_API int getDafileSvrInfo(const SocketEndpoint &ep,StringBuffer &retstr);

extern REMOTE_API void disconnectRemoteFile(IFile *file);
Expand Down
236 changes: 188 additions & 48 deletions common/remote/sockfile.cpp
Expand Up @@ -161,7 +161,7 @@ struct dummyReadWrite
// backward compatible modes
typedef enum { compatIFSHnone, compatIFSHread, compatIFSHwrite, compatIFSHexec, compatIFSHall} compatIFSHmode;

static const char *VERSTRING= "DS V1.7e - 7 " // dont forget FILESRV_VERSION in header
static const char *VERSTRING= "DS V1.8" // dont forget FILESRV_VERSION in header
#ifdef _WIN32
"Windows ";
#else
Expand Down Expand Up @@ -308,6 +308,8 @@ enum {
RFCtreecopy,
// 1.7e - 1
RFCtreecopytmp,
// 1.8
RFCsetthrottle,
RFCmax,
};

Expand Down Expand Up @@ -336,45 +338,16 @@ static void mergeOnce(OnceKey &key,size32_t sz,const void *data)

//---------------------------------------------------------------------------

class CRemoteFileServer;
class CThrottler
{
Semaphore &sem;
CRemoteFileServer &owner;
bool got;
public:
CThrottler(Semaphore &_sem) : sem(_sem), got(false)
{
take();
}
~CThrottler()
{
release();
}
bool take()
{
assertex(!got);
got = false;
loop {
if (sem.wait(5000)) {
got = true;
break;
}
unsigned cpu = getLatestCPUUsage();
PROGLOG("Throttler stalled (%d%% cpu)",cpu);
if (getLatestCPUUsage()<75)
break;
}
return got;
}
bool release()
{
if (got)
{
got = false;
sem.signal();
return true;
}
return false;
}
CThrottler(CRemoteFileServer &_owner);
~CThrottler() { release(); }
void take();
bool release();
};

// temporarily release a throttler slot
Expand Down Expand Up @@ -2752,6 +2725,26 @@ int setDafsTrace(ISocket * socket,byte flags)
return -1;
}

int setDafsThrottleLimit(ISocket * socket, unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit)
{
assertex(socket);
MemoryBuffer sendbuf;
initSendBuffer(sendbuf);
sendbuf.append((RemoteFileCommandType)RFCsetthrottle).append(throttleLimit).append(throttleDelayMs).append(throttleCPULimit);
MemoryBuffer replybuf;
try {
sendBuffer(socket, sendbuf);
receiveBuffer(socket, replybuf, NORMAL_RETRIES, 1024);
int retcode;
replybuf.read(retcode);
return retcode;
}
catch (IException *e) {
EXCLOG(e);
::Release(e);
}
return -1;
}

int getDafsInfo(ISocket * socket,StringBuffer &retstr)
{
Expand Down Expand Up @@ -3028,6 +3021,7 @@ static unsigned ClientCount = 0;
static unsigned MaxClientCount = 0;
static CriticalSection ClientCountSect;

#define TOTAL_THROTTLE_TIME_SECS 60 // log total throttled delay period

class CRemoteFileServer : public CInterface, implements IRemoteFileServer, implements IThreadFactory
{
Expand All @@ -3042,6 +3036,10 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
unsigned closedclients;
CAsyncCommandManager asyncCommandManager;
Semaphore throttlesem;
unsigned throttleLimit, throttleDelayMs, throttleCPULimit, disabledThrottleLimit;
unsigned __int64 totalThrottleDelay;
CCycleTimer totalThrottleDelayTimer;
CriticalSection setThrottleCrit;
atomic_t globallasttick;

int getNextHandle()
Expand Down Expand Up @@ -3212,14 +3210,18 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
PROGLOG("Previous handle(%d): %s",handles.item(previdx),opennames.item(previdx).text.get());
}


void processCommand()
{
CThrottler throttler(parent->throttleSem());
MemoryBuffer reply;
RemoteFileCommandType cmd;
buf.read(cmd);
parent->dispatchCommand(cmd, buf, initSendBuffer(reply), this, &throttler);
if (parent->throttleLimit && (RFCsetthrottle != cmd))
{
CThrottler throttler(*parent);
parent->dispatchCommand(cmd, buf, initSendBuffer(reply), this, &throttler);
}
else
parent->dispatchCommand(cmd, buf, initSendBuffer(reply), this, NULL);
buf.clear();
sendBuffer(socket, reply);
}
Expand Down Expand Up @@ -3399,9 +3401,13 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple

IMPLEMENT_IINTERFACE

CRemoteFileServer()
CRemoteFileServer(unsigned _throttleLimit, unsigned _throttleDelayMs, unsigned _throttleCPULimit)
: throttleLimit(_throttleLimit), throttleDelayMs(_throttleDelayMs), throttleCPULimit(_throttleCPULimit)
{
throttlesem.signal(10);
if (throttleLimit) // if 0, throttling not used
throttlesem.signal(throttleLimit);
totalThrottleDelay = 0;
disabledThrottleLimit = 0;
lasthandle = 0;
selecthandler.setown(createSocketSelectHandler(NULL));
threads.setown(createThreadPool("CRemoteFileServerPool",this,NULL,MAX_THREADS,60*1000,
Expand Down Expand Up @@ -3430,6 +3436,51 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
#endif
}

bool takeThrottleSem()
{
bool got = false;
CCycleTimer timer;
loop {
if (throttlesem.wait(throttleDelayMs)) {
got = true;
break;
}
unsigned cpu = getLatestCPUUsage();
PROGLOG("Throttler: transaction delayed (%d%% cpu)", cpu);

// NB: getLatestCPUUsage() is based on interval monitoring, typically 60 secs
if (cpu<throttleCPULimit)
break;
}
unsigned ms = timer.elapsedMs();
if (ms >= 1000) {
if (ms>throttleDelayMs)
PROGLOG("Throttle: transaction delayed for : %d seconds", ms/1000);
}
totalThrottleDelay += ms;
if (totalThrottleDelay && (totalThrottleDelayTimer.elapsedCycles() >= (queryOneSecCycles() * TOTAL_THROTTLE_TIME_SECS)))
{
unsigned elapsedSecs = totalThrottleDelayTimer.elapsedMs()/1000;
time_t simple;
time(&simple);
simple -= elapsedSecs;

CDateTime dt;
dt.set(simple);
StringBuffer dateStr;
dt.getTimeString(dateStr, true);
PROGLOG("Throttler: total delay of %0.2f seconds, since: %s", ((double)totalThrottleDelay)/1000, dateStr.str());

totalThrottleDelayTimer.reset();
totalThrottleDelay = 0;
}
return got;
}

void releaseThrottleSem()
{
throttlesem.signal();
}

//MORE: The file handles should timeout after a while, and accessing an old (invalid handle)
// should throw a different exception
Expand Down Expand Up @@ -4357,6 +4408,77 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
return false;
}

bool cmdSetThrottle(MemoryBuffer & msg, MemoryBuffer & reply,CRemoteClientHandler &client)
{
CriticalBlock b(setThrottleCrit);
try
{
unsigned _throttleLimit, _throttleDelayMs, _throttleCPULimit;
msg.read(_throttleLimit);
msg.read(_throttleDelayMs);
msg.read(_throttleCPULimit);
int delta = 0;
if (_throttleLimit)
{
if (disabledThrottleLimit) // if transitioning from disabled to some throttling
{
assertex(0 == throttleLimit);
delta = _throttleLimit - disabledThrottleLimit; // + or -
disabledThrottleLimit = 0;
}
else
delta = _throttleLimit - throttleLimit; // + or -
}
else if (0 == disabledThrottleLimit)
{
PROGLOG("Throttling disabled, previous limit: %d", throttleLimit);
/* disabling - set limit immediately to let all new transaction through.
* NB: the semaphore signals are not consumed in this case, because transactions could be waiting on it.
* Instead the existing 'throttleLimit' is kept in 'disabledThrottleLimit', so that if/when throttling is
* re-enabled, it is used as a basis for increasing or consuming the semaphore signal count.
*/
disabledThrottleLimit = throttleLimit;
throttleLimit = 0;
}
if (delta > 0)
{
PROGLOG("Increasing throttleLimit from %d to %d", throttleLimit, _throttleLimit);
throttlesem.signal(delta);
throttleLimit = _throttleLimit;
// NB: If throttling was off, this doesn't effect transactions in progress, i.e. will only throttle new transactions coming in.
}
else if (delta < 0)
{
PROGLOG("Reducing throttleLimit from %d to %d", throttleLimit, _throttleLimit);
// NB: This is not expected to take long
CCycleTimer timer;
while (delta < 0)
{
if (throttlesem.wait(1000))
++delta;
else
PROGLOG("Waited %0.2f seconds so far for a total of %d transactions to complete, %d completed", ((double)timer.elapsedMs())/1000, throttleLimit, -delta);
}
throttleLimit = _throttleLimit;
// NB: doesn't include transactions in progress, i.e. will only throttle new transactions coming in.
}
PROGLOG("New throttleDelayMs=%d, previous: %d", _throttleDelayMs, throttleDelayMs);
PROGLOG("New throttleCPULimit=%d, previous: %d", _throttleCPULimit, throttleCPULimit);
throttleDelayMs = _throttleDelayMs;
throttleCPULimit = _throttleCPULimit;
reply.append((unsigned)RFEnoerror);
return true;
}
catch (IException *e)
{
StringBuffer s;
e->errorMessage(s);
throwErr3(RFSERR_SetThrottleFailed,e->errorCode(),s.str());
e->Release();
}
return false;
}

bool dispatchCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
{
bool ret = true;
Expand Down Expand Up @@ -4396,6 +4518,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
MAPCOMMANDCLIENT(RFCcopysection, cmdCopySection, *client);
MAPCOMMANDCLIENTTHROTTLER(RFCtreecopy, cmdTreeCopy, *client, throttler);
MAPCOMMANDCLIENTTHROTTLER(RFCtreecopytmp, cmdTreeCopyTmp, *client, throttler);
MAPCOMMANDCLIENT(RFCsetthrottle, cmdSetThrottle, *client);

default:
ret = cmdUnknown(msg,reply,cmd);
Expand Down Expand Up @@ -4757,11 +4880,6 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
return threads->runningCount();
}

Semaphore &throttleSem()
{
return throttlesem;
}

unsigned idleTime()
{
unsigned t = (unsigned)atomic_read(&globallasttick);
Expand All @@ -4771,12 +4889,34 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
};


CThrottler::CThrottler(CRemoteFileServer &_owner) : owner(_owner), got(false)
{
take();
}

void CThrottler::take()
{
assertex(!got);
got = owner.takeThrottleSem();
}

bool CThrottler::release()
{
if (got)
{
got = false;
owner.releaseThrottleSem();
return true;
}
return false;
}


IRemoteFileServer * createRemoteFileServer()
IRemoteFileServer * createRemoteFileServer(unsigned throttleLimit, unsigned throttleDelayMs, unsigned throttleCPULimit)
{
#if SIMULATE_PACKETLOSS
errorSimulationOn = false;
#endif
return new CRemoteFileServer();
return new CRemoteFileServer(throttleLimit, throttleDelayMs, throttleCPULimit);
}

0 comments on commit 771aca1

Please sign in to comment.