Skip to content

Commit

Permalink
added systematic robustness to message queue posting
Browse files Browse the repository at this point in the history
  • Loading branch information
jpswinski committed Mar 7, 2024
1 parent 25640d1 commit 24a6c79
Show file tree
Hide file tree
Showing 16 changed files with 127 additions and 37 deletions.
8 changes: 5 additions & 3 deletions packages/core/CsvDispatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ CsvDispatch::CsvDispatch (lua_State* L, const char* outq_name, const char** _col

/* Send Out Header Row */
int len = StringLib::size(hdrrow) + 1;
outQ->postCopy(hdrrow, len, SYS_TIMEOUT);
int rc = outQ->postCopy(hdrrow, len, SYS_TIMEOUT);
if(rc <= 0) mlog(CRITICAL, "Failed (%d) to post terminator to %s", rc, outQ->getName());
}

/*----------------------------------------------------------------------------
Expand Down Expand Up @@ -173,8 +174,9 @@ bool CsvDispatch::processRecord (RecordObject* record, okey_t key, recVec_t* rec

/* Send Out Row */
int len = StringLib::size(valrow) + 1;
int status = outQ->postCopy(valrow, len, SYS_TIMEOUT);
int rc = outQ->postCopy(valrow, len, SYS_TIMEOUT);
if(rc <= 0) mlog(CRITICAL, "Failed (%d) to post terminator to %s", rc, outQ->getName());

/* Check and Return Status */
return status > 0;
return rc > 0;
}
3 changes: 2 additions & 1 deletion packages/core/DeviceReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ void* DeviceReader::readerThread (void* parm)
delete [] buf;
dr->device->closeConnection();
dr->signalComplete();
dr->outq->postCopy("", 0); // send terminator
int rc = dr->outq->postCopy("", 0, SYS_TIMEOUT); // send terminator
if(rc <= 0) mlog(ERROR, "Device reader failed to post terminator %s: %d", dr->outq->getName(), rc);
return NULL;
}
19 changes: 10 additions & 9 deletions packages/core/LuaEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,12 @@ void* LuaEndpoint::requestThread (void* parm)
/* Respond with Unauthorized Error */
char header[MAX_HDR_SIZE];
int header_length = buildheader(header, Unauthorized);
rspq->postCopy(header, header_length);
rspq->postCopy(header, header_length, POST_TIMEOUT_MS);
}

/* End Response */
rspq->postCopy("", 0);
int rc = rspq->postCopy("", 0, POST_TIMEOUT_MS);
if(rc <= 0) mlog(CRITICAL, "Failed to post terminator on %s: %d", rspq->getName(), rc);

/* Generate Metric for Endpoint */
double duration = TimeLib::latchtime() - start;
Expand Down Expand Up @@ -277,27 +278,27 @@ void LuaEndpoint::normalResponse (const char* scriptpath, Request* request, Publ
{
int result_length = StringLib::size(result);
int header_length = buildheader(header, OK, "text/plain", result_length, NULL, serverHead.c_str());
rspq->postCopy(header, header_length);
rspq->postCopy(result, result_length);
rspq->postCopy(header, header_length, POST_TIMEOUT_MS);
rspq->postCopy(result, result_length, POST_TIMEOUT_MS);
}
else
{
int header_length = buildheader(header, Not_Found);
rspq->postCopy(header, header_length);
rspq->postCopy(header, header_length, POST_TIMEOUT_MS);
}
}
else
{
mlog(ERROR, "Failed to execute request: %s", scriptpath);
int header_length = buildheader(header, Internal_Server_Error);
rspq->postCopy(header, header_length);
rspq->postCopy(header, header_length, POST_TIMEOUT_MS);
}
}
else
{
mlog(CRITICAL, "Memory (%d%%) exceeded threshold, not performing request: %s", (int)(mem * 100.0), scriptpath);
int header_length = buildheader(header, Service_Unavailable);
rspq->postCopy(header, header_length);
rspq->postCopy(header, header_length, POST_TIMEOUT_MS);
}

/* Clean Up */
Expand All @@ -320,7 +321,7 @@ void LuaEndpoint::streamResponse (const char* scriptpath, Request* request, Publ
{
/* Send Header */
int header_length = buildheader(header, OK, "application/octet-stream", 0, "chunked", serverHead.c_str());
rspq->postCopy(header, header_length);
rspq->postCopy(header, header_length, POST_TIMEOUT_MS);

/* Create Engine */
engine = new LuaEngine(scriptpath, (const char*)request->body, trace_id, NULL, true);
Expand All @@ -338,7 +339,7 @@ void LuaEndpoint::streamResponse (const char* scriptpath, Request* request, Publ
{
mlog(CRITICAL, "Memory (%d%%) exceeded threshold, not performing request: %s", (int)(mem * 100.0), scriptpath);
int header_length = buildheader(header, Service_Unavailable);
rspq->postCopy(header, header_length);
rspq->postCopy(header, header_length, POST_TIMEOUT_MS);
}

/* Clean Up */
Expand Down
3 changes: 2 additions & 1 deletion packages/core/LuaEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class LuaEndpoint: public EndpointObject
static const double DEFAULT_NORMAL_REQUEST_MEMORY_THRESHOLD;
static const double DEFAULT_STREAM_REQUEST_MEMORY_THRESHOLD;

static const int MAX_RESPONSE_TIME_MS = 5000;
static const int POST_TIMEOUT_MS = 60000;
static const int MAX_RESPONSE_TIME_MS = 60000;
static const char* LUA_RESPONSE_QUEUE;
static const char* LUA_REQUEST_ID;

Expand Down
10 changes: 2 additions & 8 deletions packages/core/MsgQ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,7 @@ int Publisher::postRef(void* data, int size, int timeout)
*----------------------------------------------------------------------------*/
int Publisher::postCopy(const void* data, int size, int timeout)
{
int status = post((void*)data, ((unsigned int)size) | MSGQ_COPYQ_MASK, NULL, 0, timeout);
if(status == STATE_OKAY) return size;
return status;
return post((void*)data, ((unsigned int)size) | MSGQ_COPYQ_MASK, NULL, 0, timeout);
}

/*----------------------------------------------------------------------------
Expand All @@ -371,11 +369,7 @@ int Publisher::postCopy(const void* data, int size, int timeout)
int Publisher::postCopy(const void* data, int size, const void* secondary_data, int secondary_size, int timeout)
{
if(size < 0 || secondary_size < 0) return STATE_SIZE_ERROR;

int status = post((void*)data, ((unsigned int)size) | MSGQ_COPYQ_MASK, (void*)secondary_data, secondary_size, timeout);

if(status == STATE_OKAY) return size + secondary_size;
return status;
return post((void*)data, ((unsigned int)size) | MSGQ_COPYQ_MASK, (void*)secondary_data, secondary_size, timeout);
}

/*----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion packages/h5/H5File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ int H5File::luaRead (lua_State* L)

/* Terminate Data */
Publisher outQ(outq_name);
outQ.postCopy("", 0);
outQ.postCopy("", 0, SYS_TIMEOUT);
}
catch(const RunTimeException& e)
{
Expand Down
8 changes: 5 additions & 3 deletions packages/netsvc/CurlLib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ long CurlLib::postAsStream (const char* url, const char* data, Publisher* outq,
/* Terminate Stream */
if(with_terminator)
{
outq->postCopy("", 0);
int rc = outq->postCopy("", 0, DATA_TIMEOUT * 1000);
if(rc <= 0) mlog(CRITICAL, "Failed to post terminator on %s: %d", outq->getName(), rc);
}

/* Return HTTP Code */
Expand Down Expand Up @@ -323,7 +324,8 @@ long CurlLib::postAsRecord (const char* url, const char* data, Publisher* outq,
/* Terminate Stream */
if(with_terminator)
{
outq->postCopy("", 0);
int rc = outq->postCopy("", 0, DATA_TIMEOUT * 1000);
if(rc <= 0) mlog(CRITICAL, "Failed to post terminator on %s: %d", outq->getName(), rc);
}

/* Return HTTP Code */
Expand Down Expand Up @@ -601,7 +603,7 @@ size_t CurlLib::postRecords(void *buffer, size_t size, size_t nmemb, void *userp
size_t CurlLib::postData(void *buffer, size_t size, size_t nmemb, void *userp)
{
Publisher* outq = static_cast<Publisher*>(userp);
return outq->postCopy(buffer, size * nmemb);
return outq->postCopy(buffer, size * nmemb, DATA_TIMEOUT * 1000);
}

/*----------------------------------------------------------------------------
Expand Down
11 changes: 10 additions & 1 deletion packages/netsvc/EndpointProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,16 @@ void* EndpointProxy::collatorThread (void* parm)
/* Send Terminator */
if(proxy->sendTerminator)
{
proxy->outQ->postCopy("", 0);
int status = MsgQ::STATE_TIMEOUT;
while(proxy->active && (status == MsgQ::STATE_TIMEOUT))
{
status = proxy->outQ->postCopy("", 0, SYS_TIMEOUT);
if(status < 0)
{
mlog(CRITICAL, "Failed (%d) to post terminator", status);
break;
}
}
}

/* Signal Complete */
Expand Down
2 changes: 1 addition & 1 deletion plugins/gedi/plugin/FootprintReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ FootprintReader<footprint_t>::FootprintReader ( lua_State* L, Asset* _asset, con
else alert(RTE_RESOURCE_DOES_NOT_EXIST, e.level(), outQ, &active, "%s: (%s)", e.what(), resource);

/* Indicate End of Data */
if(sendTerminator) outQ->postCopy("", 0);
if(sendTerminator) outQ->postCopy("", 0, SYS_TIMEOUT);
signalComplete();
}
}
Expand Down
18 changes: 17 additions & 1 deletion plugins/gedi/plugin/Gedi01bReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,23 @@ void* Gedi01bReader::subsettingThread (void* parm)
/* Indicate End of Data */
if(reader->numComplete == reader->threadCount)
{
if(reader->sendTerminator) reader->outQ->postCopy("", 0);
if(reader->sendTerminator)
{
int status = MsgQ::STATE_TIMEOUT;
while(reader->active && (status == MsgQ::STATE_TIMEOUT))
{
status = reader->outQ->postCopy("", 0, SYS_TIMEOUT);
if(status < 0)
{
mlog(CRITICAL, "Failed (%d) to post terminator for %s", status, info->reader->resource);
break;
}
else if(status == MsgQ::STATE_TIMEOUT)
{
mlog(INFO, "Timeout posting terminator for %s ... trying again", info->reader->resource);
}
}
}
reader->signalComplete();
}
}
Expand Down
18 changes: 17 additions & 1 deletion plugins/gedi/plugin/Gedi02aReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,23 @@ void* Gedi02aReader::subsettingThread (void* parm)
/* Indicate End of Data */
if(reader->numComplete == reader->threadCount)
{
if(reader->sendTerminator) reader->outQ->postCopy("", 0);
if(reader->sendTerminator)
{
int status = MsgQ::STATE_TIMEOUT;
while(reader->active && (status == MsgQ::STATE_TIMEOUT))
{
status = reader->outQ->postCopy("", 0, SYS_TIMEOUT);
if(status < 0)
{
mlog(CRITICAL, "Failed (%d) to post terminator for %s", status, info->reader->resource);
break;
}
else if(status == MsgQ::STATE_TIMEOUT)
{
mlog(INFO, "Timeout posting terminator for %s ... trying again", info->reader->resource);
}
}
}
reader->signalComplete();
}
}
Expand Down
18 changes: 17 additions & 1 deletion plugins/gedi/plugin/Gedi04aReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,23 @@ void* Gedi04aReader::subsettingThread (void* parm)
/* Indicate End of Data */
if(reader->numComplete == reader->threadCount)
{
if(reader->sendTerminator) reader->outQ->postCopy("", 0);
if(reader->sendTerminator)
{
int status = MsgQ::STATE_TIMEOUT;
while(reader->active && (status == MsgQ::STATE_TIMEOUT))
{
status = reader->outQ->postCopy("", 0, SYS_TIMEOUT);
if(status < 0)
{
mlog(CRITICAL, "Failed (%d) to post terminator for %s", status, info->reader->resource);
break;
}
else if(status == MsgQ::STATE_TIMEOUT)
{
mlog(INFO, "Timeout posting terminator for %s ... trying again", info->reader->resource);
}
}
}
reader->signalComplete();
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/icesat2/plugin/Atl03Indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ void* Atl03Indexer::indexerThread (void* parm)
if(indexer->numComplete == indexer->threadCount)
{
/* Indicate End of Data */
indexer->outQ->postCopy("", 0);
indexer->outQ->postCopy("", 0, SYS_TIMEOUT);
indexer->signalComplete();
}
}
Expand Down
20 changes: 18 additions & 2 deletions plugins/icesat2/plugin/Atl03Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Atl03Reader::Atl03Reader (lua_State* L, Asset* _asset, const char* _resource, co
else alert(RTE_RESOURCE_DOES_NOT_EXIST, e.level(), outQ, &active, "Failure on resource %s: %s", resource, e.what());

/* Indicate End of Data */
if(sendTerminator) outQ->postCopy("", 0);
if(sendTerminator) outQ->postCopy("", 0, SYS_TIMEOUT);
signalComplete();
}
}
Expand Down Expand Up @@ -1533,7 +1533,23 @@ void* Atl03Reader::subsettingThread (void* parm)
mlog(INFO, "Completed processing resource %s", info->reader->resource);

/* Indicate End of Data */
if(reader->sendTerminator) reader->outQ->postCopy("", 0);
if(reader->sendTerminator)
{
int status = MsgQ::STATE_TIMEOUT;
while(reader->active && (status == MsgQ::STATE_TIMEOUT))
{
status = reader->outQ->postCopy("", 0, SYS_TIMEOUT);
if(status < 0)
{
mlog(CRITICAL, "Failed (%d) to post terminator for %s", status, info->reader->resource);
break;
}
else if(status == MsgQ::STATE_TIMEOUT)
{
mlog(INFO, "Timeout posting terminator for %s ... trying again", info->reader->resource);
}
}
}
reader->signalComplete();
}
}
Expand Down
20 changes: 18 additions & 2 deletions plugins/icesat2/plugin/Atl06Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Atl06Reader::Atl06Reader (lua_State* L, Asset* _asset, const char* _resource, co
else alert(RTE_RESOURCE_DOES_NOT_EXIST, e.level(), outQ, &active, "Failure on resource %s: %s", resource, e.what());

/* Indicate End of Data */
if(sendTerminator) outQ->postCopy("", 0);
if(sendTerminator) outQ->postCopy("", 0, SYS_TIMEOUT);
signalComplete();
}
}
Expand Down Expand Up @@ -656,7 +656,23 @@ void* Atl06Reader::subsettingThread (void* parm)
mlog(INFO, "Completed processing resource %s", info->reader->resource);

/* Indicate End of Data */
if(reader->sendTerminator) reader->outQ->postCopy("", 0);
if(reader->sendTerminator)
{
int status = MsgQ::STATE_TIMEOUT;
while(reader->active && (status == MsgQ::STATE_TIMEOUT))
{
status = reader->outQ->postCopy("", 0, SYS_TIMEOUT);
if(status < 0)
{
mlog(CRITICAL, "Failed (%d) to post terminator for %s", status, info->reader->resource);
break;
}
else if(status == MsgQ::STATE_TIMEOUT)
{
mlog(INFO, "Timeout posting terminator for %s ... trying again", info->reader->resource);
}
}
}
reader->signalComplete();
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/swot/plugin/SwotL2Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ void SwotL2Reader::checkComplete (void)
if(numComplete >= threadCount)
{
mlog(INFO, "Completed processing resource %s", resource);
if(sendTerminator) outQ->postCopy("", 0);
if(sendTerminator) outQ->postCopy("", 0, SYS_TIMEOUT);
signalComplete();
}
}
Expand Down

0 comments on commit 24a6c79

Please sign in to comment.