Skip to content

Commit

Permalink
Merge branch 'soapCallMS' of https://github.com/RussWhitehead/HPCC-Pl…
Browse files Browse the repository at this point in the history
…atform into RussWhitehead-soapCallMS

Conflicts:
	rtl/include/eclhelper.hpp

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information
richardkchapman committed Jul 13, 2012
2 parents cbffcea + 704abaa commit 18d3460
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 57 deletions.
122 changes: 75 additions & 47 deletions common/thorhelper/thorsoapcall.cpp
Expand Up @@ -335,7 +335,7 @@ class BlackLister : public CInterface, implements IThreadFactory
ISocket* connect(SocketEndpoint &ep,
const IContextLogger &logctx,
unsigned retries,
unsigned timeout,
unsigned timeoutMS,
IRoxieAbortMonitor * roxieAbortMonitor)
{
if (lookup(ep, logctx))
Expand All @@ -349,10 +349,10 @@ class BlackLister : public CInterface, implements IThreadFactory
{
checkRoxieAbortMonitor(roxieAbortMonitor);
Owned<ISocket> sock;
Owned<ISocketConnectWait> scw = nonBlockingConnect(ep, timeout == WAIT_FOREVER ? 60000 : timeout*retries*1000);
Owned<ISocketConnectWait> scw = nonBlockingConnect(ep, timeoutMS == WAIT_FOREVER ? 60000 : timeoutMS*(retries+1));
loop
{
sock.setown(scw->wait(1000));//throws if connect fails or timeout
sock.setown(scw->wait(1000));//throws if connect fails or timeoutMS
checkRoxieAbortMonitor(roxieAbortMonitor);
if (sock)
return sock.getLink();
Expand Down Expand Up @@ -388,11 +388,11 @@ class BlackLister : public CInterface, implements IThreadFactory
char const* host,
const IContextLogger &logctx,
unsigned retries,
unsigned timeout,
unsigned timeoutMS,
IRoxieAbortMonitor * roxieAbortMonitor )
{
SocketEndpoint ep(host, port);
return connect(ep, logctx, retries, timeout, roxieAbortMonitor);
return connect(ep, logctx, retries, timeoutMS, roxieAbortMonitor);
}

virtual IPooledThread *createNew()
Expand Down Expand Up @@ -564,7 +564,7 @@ interface IWSCAsyncFor: public IInterface
{
virtual void processException(const Url &url, const void *row, IException *e) = 0;
virtual void processException(const Url &url, ConstPointerArray &inputRows, IException *e) = 0;
virtual void checkTimeLimitExceeded() = 0;
virtual void checkTimeLimitExceeded(unsigned * _remainingMS) = 0;

virtual void createHttpRequest(Url &url, StringBuffer &request) = 0;
virtual int readHttpResponse(StringBuffer &response, ISocket *socket) = 0;
Expand Down Expand Up @@ -731,17 +731,39 @@ class CWSCHelper : public CInterface, implements IWSCHelper
logXML = (flags & SOAPFlog) != 0;
logUserMsg = (flags & SOAPFlogusermsg) != 0;

timeout = helper->getTimeout();
if (timeout == (unsigned)-1)
timeout = 300; // 300 second default
else if (timeout == 0)
timeout = WAIT_FOREVER;
IHThorWebServiceCallExtra2 * helperExtra2 = static_cast<IHThorWebServiceCallExtra2*>(helper->selectInterface(TAIsoapcallextra_2));
if (helperExtra2)
{
double dval = helperExtra2->getTimeoutMS();//double, indicating seconds and nanoseconds.
if (dval == -1.0)//not provided
timeoutMS = 300*1000; // 300 second default
else if (dval == 0)
timeoutMS = WAIT_FOREVER;
else
timeoutMS = dval * 1000;

timeLimit = helper->getTimeLimit();
if (timeLimit == 0 || timeLimit == (unsigned)-1)
timeLimit = WAIT_FOREVER; //default
dval = helperExtra2->getTimeLimitMS();
if (dval <= 0.0)
timeLimitMS = WAIT_FOREVER;
else
timeLimitMS = dval * 1000;
}
else
timeLimitMon.reset(timeLimit*1000);
{
timeoutMS = helper->getTimeout();//get timeout, in seconds
if (timeoutMS == (unsigned)-1)
timeoutMS = 300*1000; // 300 second default
else if (timeoutMS == 0)
timeoutMS = WAIT_FOREVER;
else
timeoutMS *= 1000;

timeLimitMS = helper->getTimeLimit();
if (timeLimitMS == 0 || timeLimitMS == (unsigned)-1)
timeLimitMS = WAIT_FOREVER; //default
else
timeLimitMS *= 1000;
}

if (wscType == STsoap)
{
Expand Down Expand Up @@ -855,7 +877,6 @@ class CWSCHelper : public CInterface, implements IWSCHelper
else
throw MakeStringException(0, "%sCALL specified no URLs",wscType == STsoap ? "SOAP" : "HTTP");


for (unsigned i=0; i<numRowThreads; i++)
threads.append(*new CWSCHelperThread(this));
}
Expand All @@ -879,6 +900,9 @@ class CWSCHelper : public CInterface, implements IWSCHelper

void start()
{
if (timeLimitMS != WAIT_FOREVER)
timeLimitMon.reset(timeLimitMS);

ForEachItemIn(i,threads)
threads.item(i).start();
}
Expand Down Expand Up @@ -965,17 +989,19 @@ class CWSCHelper : public CInterface, implements IWSCHelper
return secureContext->createSecureSocket(sock);
}

bool isTimeLimitExceeded()
bool isTimeLimitExceeded(unsigned *_remainingMS)
{
if (timeLimit != WAIT_FOREVER)
if (timeLimitMS != WAIT_FOREVER)
{
CriticalBlock block(timeoutCrit);
if (timeLimitExceeded || timeLimitMon.timedout())
if (timeLimitExceeded || timeLimitMon.timedout(_remainingMS))
{
timeLimitExceeded = true;
return true;
}
}
else
*_remainingMS = (unsigned)-1;
return false;
}

Expand Down Expand Up @@ -1045,8 +1071,8 @@ class CWSCHelper : public CInterface, implements IWSCHelper
unsigned numRowThreads;
unsigned numUrlThreads;
unsigned maxRetries;
unsigned timeout; //seconds
unsigned timeLimit; //seconds
unsigned timeoutMS;
unsigned timeLimitMS;
bool logXML;
bool logMin;
bool logUserMsg;
Expand Down Expand Up @@ -1294,10 +1320,10 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
size32_t maxLen;
size32_t curPosn;
ISocket * socket;
unsigned timeout;
unsigned timeoutMS;
public:
CSocketDataProvider(const char * _buffer, size32_t _curPosn, size32_t _currLen, size32_t _maxLen, ISocket * _sock, unsigned _timeout )
: buffer(_buffer), currLen(_currLen), maxLen(_maxLen), curPosn(_curPosn), socket(_sock), timeout(_timeout)
: buffer(_buffer), currLen(_currLen), maxLen(_maxLen), curPosn(_curPosn), socket(_sock), timeoutMS(_timeout)
{
}
size32_t getBytes(char * buf, size32_t len)
Expand All @@ -1317,7 +1343,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
count = 0;
do
{
socket->read(buf + count, 0, len - count, bytesRead, timeout);
socket->readtms(buf + count, 0, len - count, bytesRead, timeoutMS);
count += bytesRead;
} while (count != len);
currLen = curPosn = 0;
Expand All @@ -1331,7 +1357,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
do
{
size32_t read;
socket->read(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeout);
socket->readtms(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeoutMS);
bytesRead += read;
} while (len != (bytesRead + avail));
count += bytesRead;
Expand All @@ -1350,6 +1376,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
StringBuffer responsePath;
Owned<CSocketDataProvider> dataProvider;
XmlReaderOptions options;
unsigned remainingMS;

inline void checkRoxieAbortMonitor(IRoxieAbortMonitor * roxieAbortMonitor)
{
Expand Down Expand Up @@ -1438,10 +1465,10 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
bool chunked;
size32_t read = 0;
do {
checkTimeLimitExceeded();
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->read(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, master->timeout);
checkTimeLimitExceeded();
socket->readtms(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS,remainingMS));
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

read += bytesRead;
Expand Down Expand Up @@ -1485,7 +1512,8 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
read chunk-size and CRLF
}
*/
dataProvider.setown(new CSocketDataProvider(buffer, payloadofs, read, WSCBUFFERSIZE, socket, master->timeout));
checkTimeLimitExceeded(&remainingMS);
dataProvider.setown(new CSocketDataProvider(buffer, payloadofs, read, WSCBUFFERSIZE, socket, MIN(master->timeoutMS,remainingMS)));
dataProvider->getBytes(&ch, 1);
while (isalpha(ch) || isdigit(ch))
{ //get chunk-size
Expand Down Expand Up @@ -1544,10 +1572,10 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
response.append(read,payload);
if (payloadsize) { // read directly into response
while (read<payloadsize) {
checkTimeLimitExceeded();
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->read(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, master->timeout);
checkTimeLimitExceeded();
socket->readtms(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, MIN(master->timeoutMS,remainingMS));
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

read += bytesRead;
Expand All @@ -1560,10 +1588,10 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
}
else {
loop {
checkTimeLimitExceeded();
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->read(buffer, 0, WSCBUFFERSIZE, bytesRead, master->timeout);
checkTimeLimitExceeded();
socket->readtms(buffer, 0, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS,remainingMS));
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

if (bytesRead==0)
Expand Down Expand Up @@ -1690,10 +1718,10 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
master->setErrorOwn(ne.getClear());
}

inline void checkTimeLimitExceeded()
inline void checkTimeLimitExceeded(unsigned * remainingMS)
{
if (master->isTimeLimitExceeded())
throw MakeStringException(TIMELIMIT_EXCEEDED, "%sCALL TIMELIMIT(%u) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimit);
if (master->isTimeLimitExceeded(remainingMS))
throw MakeStringException(TIMELIMIT_EXCEEDED, "%sCALL TIMELIMIT(%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
}

public:
Expand Down Expand Up @@ -1742,15 +1770,15 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
{
try
{
checkTimeLimitExceeded();
checkTimeLimitExceeded(&remainingMS);
Url &connUrl = master->proxyUrlArray.empty() ? url : master->proxyUrlArray.item(0);
socket.setown(blacklist->connect(connUrl.port, connUrl.host, master->logctx, (unsigned)master->maxRetries, master->timeout, master->roxieAbortMonitor));
socket.setown(blacklist->connect(connUrl.port, connUrl.host, master->logctx, (unsigned)master->maxRetries, master->timeoutMS, master->roxieAbortMonitor));
if (stricmp(url.method, "https") == 0)
{
Owned<ISecureSocket> ssock = master->createSecureSocket(socket.getClear());
if (ssock)
{
checkTimeLimitExceeded();
checkTimeLimitExceeded(&remainingMS);
int status = ssock->secure_connect();
if (status < 0)
{
Expand All @@ -1769,7 +1797,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
{
if (master->timeLimitExceeded)
{
master->logctx.CTXLOG("%sCALL exiting: time limit (%u) exceeded",master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimit);
master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded",master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
processException(url, inputRows, e);
return;
}
Expand Down Expand Up @@ -1798,12 +1826,12 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
}
try
{
checkTimeLimitExceeded();
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->write(request.str(), request.length());
if (soapTraceLevel > 4)
master->logctx.CTXLOG("%sCALL: sent request (%s) to %s:%d", master->wscType == STsoap ? "SOAP" : "HTTP",master->service.str(), url.host.str(), url.port);
checkTimeLimitExceeded();
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

int rval = readHttpResponse(response, socket);
Expand All @@ -1826,7 +1854,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
throw MakeStringException(-1, "Zero length response in processQuery");
}
endTime = msTick();
checkTimeLimitExceeded();
checkTimeLimitExceeded(&remainingMS);
ColumnProvider * meta = (ColumnProvider*)CreateColumnProvider(endTime-startTime, master->flags&SOAPFencoding?true:false);
processResponse(url, response, meta);
delete meta;
Expand Down Expand Up @@ -1867,7 +1895,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (master->timeLimitExceeded)
{
processException(url, inputRows, e);
master->logctx.CTXLOG("%sCALL exiting: time limit (%u) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimit);
master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
break;
}

Expand All @@ -1882,7 +1910,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
StringBuffer s;
master->logctx.CTXLOG("Exception %s - retrying? (%d<%d)", e->errorMessage(s).str(), attempts, master->maxRetries);

if (attempts >= master->maxRetries)
if (attempts > master->maxRetries)
{
// error affects all inputRows
master->logctx.CTXLOG("Exiting: maxRetries exceeded");
Expand Down
4 changes: 2 additions & 2 deletions ecl/hql/hqlgram.y
Expand Up @@ -3129,13 +3129,13 @@ soapFlag
}
| TIMEOUT '(' expression ')'
{
parser->normalizeExpression($3, type_int, false);
parser->normalizeExpression($3, type_real, false);
$$.setExpr(createExprAttribute(timeoutAtom, $3.getExpr()));
$$.setPosition($1);
}
| TIMELIMIT '(' expression ')'
{
parser->normalizeExpression($3, type_int, false);
parser->normalizeExpression($3, type_real, false);
$$.setExpr(createExprAttribute(timeLimitAtom, $3.getExpr()));
$$.setPosition($1);
}
Expand Down
1 change: 1 addition & 0 deletions ecl/hqlcpp/hqlcpp.ipp
Expand Up @@ -1663,6 +1663,7 @@ public:
void doBuildVarStringFunction(BuildCtx & ctx, const char * name, IHqlExpression * value);
void doBuildDataFunction(BuildCtx & ctx, const char * name, IHqlExpression * value);
void doBuildStringFunction(BuildCtx & ctx, const char * name, IHqlExpression * value);
void doBuildDoubleFunction(BuildCtx & ctx, const char * name, IHqlExpression * value);
void doBuildFunction(BuildCtx & ctx, ITypeInfo * type, const char * name, IHqlExpression * value);
void doBuildFunctionReturn(BuildCtx & ctx, ITypeInfo * type, IHqlExpression * value);
void doBuildUserFunctionReturn(BuildCtx & ctx, ITypeInfo * type, IHqlExpression * value);
Expand Down
20 changes: 18 additions & 2 deletions ecl/hqlcpp/hqlhtcpp.cpp
Expand Up @@ -3121,6 +3121,12 @@ void HqlCppTranslator::doBuildUnsignedFunction(BuildCtx & ctx, const char * name
doBuildFunction(ctx, unsignedType, name, value);
}

void HqlCppTranslator::doBuildDoubleFunction(BuildCtx & ctx, const char * name, IHqlExpression * value)
{
Owned<ITypeInfo> type = makeRealType(8);
doBuildFunction(ctx, doubleType, name, value);
}

void HqlCppTranslator::doBuildUnsigned64Function(BuildCtx & ctx, const char * name, IHqlExpression * value)
{
Owned<ITypeInfo> type = makeIntType(8, false);
Expand Down Expand Up @@ -16377,6 +16383,12 @@ ABoundActivity * HqlCppTranslator::doBuildActivitySOAP(BuildCtx & ctx, IHqlExpre
//virtual unsigned getTimeLimit()
doBuildUnsignedFunction(instance->classctx, "getTimeLimit", queryPropertyChild(expr, timeLimitAtom, 0));

//virtual double getTimeoutMS()
doBuildDoubleFunction(instance->classctx, "getTimeoutMS", queryPropertyChild(expr, timeoutAtom, 0));

//virtual double getTimeLimitMS()
doBuildDoubleFunction(instance->classctx, "getTimeLimitMS", queryPropertyChild(expr, timeLimitAtom, 0));

if (namespaceAttr)
{
doBuildVarStringFunction(instance->startctx, "queryNamespaceName", namespaceAttr->queryChild(0));
Expand Down Expand Up @@ -16423,7 +16435,6 @@ ABoundActivity * HqlCppTranslator::doBuildActivitySOAP(BuildCtx & ctx, IHqlExpre
buildTransformBody(onFailCtx, onFailTransform, dataset, NULL, expr, selSeq);
}
}

buildInstanceSuffix(instance);
if (boundDataset)
buildConnectInputOutput(ctx, instance, boundDataset, 0, 0);
Expand Down Expand Up @@ -16529,6 +16540,12 @@ ABoundActivity * HqlCppTranslator::doBuildActivityHTTP(BuildCtx & ctx, IHqlExpre
//virtual unsigned getTimeLimit()
doBuildUnsignedFunction(instance->classctx, "getTimeLimit", queryPropertyChild(expr, timeLimitAtom, 0));

//virtual double getTimeoutMS()
doBuildDoubleFunction(instance->classctx, "getTimeoutMS", queryPropertyChild(expr, timeoutAtom, 0));

//virtual double getTimeLimitMS()
doBuildDoubleFunction(instance->classctx, "getTimeLimitMS", queryPropertyChild(expr, timeLimitAtom, 0));

if (namespaceAttr)
{
doBuildVarStringFunction(instance->startctx, "queryNamespaceName", namespaceAttr->queryChild(0));
Expand Down Expand Up @@ -16567,7 +16584,6 @@ ABoundActivity * HqlCppTranslator::doBuildActivityHTTP(BuildCtx & ctx, IHqlExpre
buildTransformBody(onFailCtx, onFail->queryChild(0), dataset, NULL, expr, selSeq);
}
}

buildInstanceSuffix(instance);
if (boundDataset)
buildConnectInputOutput(ctx, instance, boundDataset, 0, 0);
Expand Down

0 comments on commit 18d3460

Please sign in to comment.