Skip to content

Commit

Permalink
Fix gh-2828 - Clean up code using DelayedSizeMarker
Browse files Browse the repository at this point in the history
Use recently introduced DelayedSizeMarker utility class to tidyup and
clarify some size back patching code

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
  • Loading branch information
jakesmith committed Jul 17, 2012
1 parent 115f67d commit 6daa189
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 62 deletions.
7 changes: 2 additions & 5 deletions thorlcr/activities/aggregate/thaggregateslave.cpp
Expand Up @@ -108,14 +108,11 @@ class AggregateSlaveBase : public CSlaveActivity, public CThorDataLink
void sendResult(const void *row, IOutputRowSerializer *serializer, rank_t dst)
{
CMessageBuffer mb;
size32_t start = mb.length();
size32_t sz = 0;
mb.append(sz);
DelayedSizeMarker sizeMark(mb);
if (row&&hadElement) {
CMemoryRowSerializer mbs(mb);
serializer->serialize(mbs,(const byte *)row);
sz = mb.length()-start-sizeof(size32_t);
mb.writeDirect(start,sizeof(size32_t),&sz);
sizeMark.write();
}
container.queryJob().queryJobComm().send(mb, dst, mpTag);
}
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
Expand Up @@ -1904,7 +1904,7 @@ class CKeyedJoinSlave : public CSlaveActivity, public CThorDataLink, implements
{
unsigned numDataParts;
data.read(numDataParts);
unsigned offsetMapSz = 0;
size32_t offsetMapSz = 0;
if (numDataParts)
{
deserializePartFileDescriptors(data, dataParts);
Expand Down
9 changes: 4 additions & 5 deletions thorlcr/activities/result/thresultslave.cpp
Expand Up @@ -48,13 +48,12 @@ class CResultSlaveActivity : public ProcessSlaveActivity

OwnedConstThorRow row = input->ungroupedNextRow();
CMessageBuffer mb;
size32_t lenpos = mb.length(); // its 0 really
mb.append((size32_t)0);
if (row) {
DelayedSizeMarker sizeMark(mb);
if (row)
{
CMemoryRowSerializer msz(mb);
::queryRowSerializer(input)->serialize(msz,(const byte *)row.get());
size32_t sz = mb.length()-lenpos-sizeof(size32_t);
mb.writeDirect(lenpos,sizeof(size32_t),&sz);
sizeMark.write();
processed++;
}
container.queryJob().queryJobComm().send(mb, 0, masterMpTag);
Expand Down
15 changes: 5 additions & 10 deletions thorlcr/activities/rollup/throllupslave.cpp
Expand Up @@ -252,29 +252,24 @@ class CDedupRollupBaseActivity : public CSlaveActivity, implements IStopInput
return false;
CMessageBuffer msg;
msg.append(numKept);
unsigned msgPos = msg.length();
msg.append((size32_t)0);
DelayedSizeMarker sizeMark(msg);
if (kept.get())
{
CMemoryRowSerializer msz(msg);
rowif->queryRowSerializer()->serialize(msz,(const byte *)kept.get());
size32_t sz = msg.length()-(msgPos+sizeof(sz));
msg.writeDirect(msgPos, sizeof(sz), &sz);
sizeMark.write();
if (rollup)
{
msgPos = msg.length();
msg.append((size32_t)0);
sizeMark.restart();
if (kept.get()!=keptTransformed.get())
{
sz = msg.length();
rowif->queryRowSerializer()->serialize(msz,(const byte *)keptTransformed.get());
sz = msg.length()-(msgPos+sizeof(sz));
msg.writeDirect(msgPos, sizeof(sz), &sz);
sizeMark.write();
}
}
}
else if (rollup)
msg.append((size32_t)0);
sizeMark.restart(); // write (0 size) for keptTransformed row
container.queryJob().queryJobComm().send(msg, container.queryJob().queryMyRank()+1, mpTag); // to next node
return true;
}
Expand Down
24 changes: 8 additions & 16 deletions thorlcr/graph/thgraph.cpp
Expand Up @@ -513,21 +513,17 @@ void CGraphElementBase::serializeCreateContext(MemoryBuffer &mb)
{
if (!onCreateCalled) return;
mb.append(queryId());
unsigned pos = mb.length();
mb.append((size32_t)0);
DelayedSizeMarker sizeMark(mb);
queryHelper()->serializeCreateContext(mb);
size32_t sz = (mb.length()-pos)-sizeof(size32_t);
mb.writeDirect(pos, sizeof(sz), &sz);
sizeMark.write();
}

void CGraphElementBase::serializeStartContext(MemoryBuffer &mb)
{
assertex(onStartCalled);
unsigned pos = mb.length();
mb.append((size32_t)0);
DelayedSizeMarker sizeMark(mb);
queryHelper()->serializeStartContext(mb);
size32_t sz = (mb.length()-pos)-sizeof(size32_t);
mb.writeDirect(pos, sizeof(sz), &sz);
sizeMark.write();
}

void CGraphElementBase::deserializeCreateContext(MemoryBuffer &mb)
Expand Down Expand Up @@ -1052,23 +1048,20 @@ void CGraphBase::clean()

void CGraphBase::serializeCreateContexts(MemoryBuffer &mb)
{
unsigned pos = mb.length();
mb.append((unsigned)0);
DelayedSizeMarker sizeMark(mb);
Owned<IThorActivityIterator> iter = (queryOwner() && !isGlobal()) ? getIterator() : getTraverseIterator(true); // all if non-global-child, or graph with conditionals
ForEach (*iter)
{
CGraphElementBase &element = iter->query();
element.serializeCreateContext(mb);
}
mb.append((activity_id)0);
unsigned len=mb.length()-pos-sizeof(unsigned);
mb.writeDirect(pos, sizeof(len), &len);
sizeMark.write();
}

void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
{
unsigned pos = mb.length();
mb.append((unsigned)0);
DelayedSizeMarker sizeMark(mb);
Owned<IThorActivityIterator> iter = getTraverseIterator();
ForEach (*iter)
{
Expand All @@ -1077,8 +1070,7 @@ void CGraphBase::serializeStartContexts(MemoryBuffer &mb)
element.serializeStartContext(mb);
}
mb.append((activity_id)0);
unsigned len=mb.length()-pos-sizeof(unsigned);
mb.writeDirect(pos, sizeof(len), &len);
sizeMark.write();
}

void CGraphBase::deserializeCreateContexts(MemoryBuffer &mb)
Expand Down
14 changes: 5 additions & 9 deletions thorlcr/graph/thgraphmaster.cpp
Expand Up @@ -2032,8 +2032,7 @@ void CMasterGraph::serializeCreateContexts(MemoryBuffer &mb)
bool CMasterGraph::serializeActivityInitData(unsigned slave, MemoryBuffer &mb, IThorActivityIterator &iter)
{
CriticalBlock b(createdCrit);
unsigned pos=mb.length();
mb.append((unsigned)0);
DelayedSizeMarker sizeMark1(mb);
ForEach (iter)
{
CMasterGraphElement &element = (CMasterGraphElement &)iter.query();
Expand All @@ -2043,19 +2042,16 @@ bool CMasterGraph::serializeActivityInitData(unsigned slave, MemoryBuffer &mb, I
if (activity)
{
mb.append(element.queryId());
unsigned pos = mb.length();
mb.append((size32_t)0);
DelayedSizeMarker sizeMark2(mb);
activity->serializeSlaveData(mb, slave);
size32_t sz = (mb.length()-pos)-sizeof(size32_t);
mb.writeDirect(pos, sizeof(sz), &sz);
sizeMark2.write();
}
}
}
if (pos == (mb.length()-sizeof(unsigned)))
if (0 == sizeMark1.size())
return false;
mb.append((activity_id)0); // terminator
unsigned len=mb.length()-pos-sizeof(unsigned);
mb.writeDirect(pos, sizeof(len), &len);
sizeMark1.write();
return true;
}

Expand Down
6 changes: 3 additions & 3 deletions thorlcr/graph/thgraphslave.cpp
Expand Up @@ -382,7 +382,7 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
if (needActInit)
{
mptag_t replyTag = TAG_NULL;
unsigned len;
size32_t len;
CMessageBuffer actInitRtnData;
actInitRtnData.append(false);
CMessageBuffer msg;
Expand Down Expand Up @@ -555,7 +555,7 @@ void CSlaveGraph::create(size32_t parentExtractSz, const byte *parentExtract)
throw MakeStringException(0, "Error receiving createctx data for graph: %"GIDPF"d", graphId);
try
{
unsigned len;
size32_t len;
msg.read(len);
if (len)
{
Expand Down Expand Up @@ -595,7 +595,7 @@ void CSlaveGraph::create(size32_t parentExtractSz, const byte *parentExtract)
msg.append(graphId);
if (!queryJob().queryJobComm().sendRecv(msg, 0, queryJob().querySlaveMpTag(), LONGTIMEOUT))
throwUnexpected();
unsigned len;
size32_t len;
msg.read(len);
if (len)
deserializeCreateContexts(msg);
Expand Down
6 changes: 2 additions & 4 deletions thorlcr/master/thactivitymaster.cpp
Expand Up @@ -554,8 +554,7 @@ CSlavePartMapping::CSlavePartMapping(const char *_logicalName, IFileDescriptor &
void CSlavePartMapping::serializeFileOffsetMap(MemoryBuffer &mb)
{
mb.append(fileWidth);
unsigned pos = mb.length();
mb.append((unsigned)0);
DelayedSizeMarker sizeMark(mb);
ForEachItemIn(sm, maps)
{
CSlaveMap &map = maps.item(sm);
Expand All @@ -570,8 +569,7 @@ void CSlavePartMapping::serializeFileOffsetMap(MemoryBuffer &mb)
mb.append(sizeof(FPosTableEntry), &entry);
}
}
unsigned l = mb.length()-pos-sizeof(unsigned);
mb.writeDirect(pos, sizeof(unsigned), &l);
sizeMark.write();
}

CSlavePartMapping *getFileSlaveMaps(const char *logicalName, IFileDescriptor &fileDesc, IUserDescriptor *userDesc, IGroup &localGroup, bool local, bool index, IHash *hash, IDistributedSuperFile *super)
Expand Down
6 changes: 2 additions & 4 deletions thorlcr/msort/tsorta.cpp
Expand Up @@ -176,14 +176,12 @@ void CThorKeyArray::serialize(MemoryBuffer &mb)
mb.append(totalserialsize);
bool haskeyserializer = keyserializer!=NULL;
mb.append(haskeyserializer);
size32_t pos = mb.length();
mb.append((size32_t)0);
DelayedSizeMarker sizeMark(mb);
IOutputRowSerializer *serializer = haskeyserializer?keyif->queryRowSerializer():rowif->queryRowSerializer();
CMemoryRowSerializer msz(mb);
for (i=0;i<n;i++)
serializer->serialize(msz,(const byte *)keys.query(i));
size32_t l = mb.length()-pos-sizeof(size32_t);
mb.writeDirect(pos,sizeof(l),&l);
sizeMark.write();
}

void CThorKeyArray::deserialize(MemoryBuffer &mb,bool append)
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/slave/slavmain.cpp
Expand Up @@ -274,7 +274,7 @@ class CJobListener : public CSimpleInterface
subGraph->createFromXGMML(graphNode, NULL, NULL, NULL);
PROGLOG("GraphInit: %s, graphId=%"GIDPF"d", jobKey.get(), subGraph->queryGraphId());
subGraph->setExecuteReplyTag(subGraph->queryJob().deserializeMPTag(msg));
unsigned len;
size32_t len;
msg.read(len);
MemoryBuffer initData;
initData.append(len, msg.readDirect(len));
Expand Down
6 changes: 2 additions & 4 deletions thorlcr/slave/slwatchdog.cpp
Expand Up @@ -123,11 +123,9 @@ class CGraphProgressHandlerBase : public CSimpleInterface, implements ISlaveWatc
LOG(MCdebugProgress, thorJob, "%s", str.append(graph.queryGraphId()).str());
if (mb)
{
unsigned pos=mb->length();
mb->append((size32_t)0); // placeholder
DelayedSizeMarker sizeMark(*mb);
gatherData(*mb);
size32_t len=(mb->length()-pos)-sizeof(size32_t);
mb->writeDirect(pos, sizeof(len), &len);
sizeMark.write();
}
activeGraphs.zap(graph);
}
Expand Down

0 comments on commit 6daa189

Please sign in to comment.