New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-19337 Add streaming remote (dafilesrv) disk read support #10979
Conversation
https://track.hpccsystems.com/browse/HPCC-19337 |
NB: ValueSetTest::testKeyed2, ValueSetTest::testFilter, ValueSetTest::testKeyed1 fail at the moment, e.g. testKeyed(rows, record, "f1=[5]") fails in deserializeFieldFilter. @mckellyln @ghalliday @richardkchapman - please review some/all of the changes. |
rtl/eclrtl/rtlnewkey.cpp
Outdated
@@ -1632,6 +1649,16 @@ void RowFilter::addFilter(const IFieldFilter & filter) | |||
numFieldsRequired = fieldNum+1; | |||
} | |||
|
|||
void RowFilter::addFilter(const RtlRecord & record, const char * filterText) | |||
{ | |||
//assertex(filter.queryField() == filters.ordinality()); //MORE - fill with wild filters and replace existing wild |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment still accurate? Is there a plan to address it? A Jira? Should it be fixed in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment can be deleted - it was related to when they are used for keys, and that needs more work anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A first review.
Main comments are to do with the remote activity implementation - particularly group handling.
common/remote/sockfile.cpp
Outdated
if (cmd > RFCmaxnormal) | ||
cmd = RFCmaxnormal; | ||
unsigned elems = sizeof(RFCStrings) / sizeof(RFCStrings[0]); | ||
if (cmd >= (sizeof(RFCStrings) / sizeof(RFCStrings[0]))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant to use elems
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. I'll change.
common/remote/sockfile.cpp
Outdated
cursorInfo.append("\"\n"); | ||
mrequest.append(cursorInfo.length(), cursorInfo.str()); | ||
} | ||
DBGLOG("req = <%s}>", request.str()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be less frequent - the logging may get excessive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I'll make it based on dafilesrv conditional tracing level.
mutable bool eogPending = false; | ||
mutable bool someInGroup = false; | ||
const RtlRecord *record = nullptr; | ||
RtlDynRow *filterRow = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this probably leaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will add delete to dtor.
common/remote/sockfile.cpp
Outdated
const byte *next = prefetchBuffer.queryRow(); | ||
bool eog = false; | ||
if (inputGrouped) | ||
prefetchBuffer.read(sizeof(eog), &eog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could invalidate next. Need to call queryRow() after this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will change.
common/remote/sockfile.cpp
Outdated
bool eog = false; | ||
if (inputGrouped) | ||
prefetchBuffer.read(sizeof(eog), &eog); | ||
prefetchBuffer.finishedRow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could also invalidate next, needs to happen after the transform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will change.
common/remote/sockfile.cpp
Outdated
if (!filterRow) | ||
filterRow = new RtlDynRow(*record); | ||
} | ||
// IRemoteActivity impl. | ||
virtual const void *nextRow(size32_t &retSz) override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should really take outBuilder as a parameter - otherwise it is very confusing that the buffer is updated as a side-effect of the call.
common/remote/sockfile.cpp
Outdated
for (unsigned __int64 i=0; i<defaultDaFSNumRecs; i++) | ||
{ | ||
bool *eog; | ||
if (grouped && outputActivity->queryProcessed()) // i.e. not 1st row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this logic to be executed after reading enough rows (since returning null is the normal way of indicating end of group), rather than each row.
Using a local eog variable appended to the buffer at the correct point.
common/remote/sockfile.cpp
Outdated
size32_t rowSz; | ||
const void *row = outputActivity->nextRow(rowSz); | ||
const void *row = outputActivity->nextRow(rowSz); // NB: row builder writes directly to reply buffer for effiency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This potentially invalidates eog pointer - so could corrupt memory.
(E.g., if a complex row is then filtered, followed by end of group).
common/remote/sockfile.cpp
Outdated
break; | ||
if (grouped) | ||
{ | ||
if (outputActivity->queryProcessed()) // i.e. not 1st row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a strange test to see if it is the first row - I would expect testing a local variable e.g., i != 0 or
virtual void serializeCursor(MemoryBuffer &tgt) const override | ||
{ | ||
tgt.append(prefetchBuffer.tell()); | ||
tgt.append(processed); | ||
tgt.append(someInGroup); | ||
tgt.append(eogPending); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible for eogPending to ever be true?
@jakesmith some initial comments |
forgot to tag you @ghalliday |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very close, a few comments.
common/remote/sockfile.cpp
Outdated
} | ||
virtual unsigned __int64 getStatistic(StatisticKind kind) | ||
{ | ||
UNIMPLEMENTED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this cause problems? Should it be implemented in some way in a later PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should at least return 0, as other implementations do that don't return stats.
common/remote/sockfile.cpp
Outdated
responseWriter->outputBeginNested("Row", true); | ||
out->toXML((const byte *)row, *responseWriter); | ||
responseWriter->outputEndNested("Row"); | ||
pastFirstRow = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth resetting outBuilder buffer, so it doesn't keep growing (and in the else below)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yes, well spotted.
rtl/eclrtl/rtlnewkey.cpp
Outdated
@@ -1632,6 +1649,16 @@ void RowFilter::addFilter(const IFieldFilter & filter) | |||
numFieldsRequired = fieldNum+1; | |||
} | |||
|
|||
void RowFilter::addFilter(const RtlRecord & record, const char * filterText) | |||
{ | |||
//assertex(filter.queryField() == filters.ordinality()); //MORE - fill with wild filters and replace existing wild |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment can be deleted - it was related to when they are used for keys, and that needs more work anyway.
common/remote/sockfile.cpp
Outdated
checkOpen(); | ||
if (needTransform) | ||
while (!eofSeen && ((chooseN == 0) || (processed < chooseN))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure the chooseN == 0 check is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it was code copied from hthor and matches thor behaviour too.
But it turns out 0 should mean 0 and not all as it is currently implemented here, in hthor and thor.
The behaviour was changed some years ago, but it looks like hthor and thor were never changed, however they get away with it, because the codegen always generates a subsequent choosen activity.
I've opened a separate JIRA to change hthor and thor choosen==0 behaviour : https://track.hpccsystems.com/browse/HPCC-19440
ecl/hthor/hthor.cpp
Outdated
inputfile.setown(createIFile(rfilename)); | ||
if(compressed) | ||
inputfile.setown(createIFile(rfilename)); | ||
if (rfilename.isLocal() && (!canSerialize || !readRemote(rfilename))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this test correct?
Currently xml/csv needs to be read local - even if it is not a local filename
Similarly if you cannot serialize then it needs to be local.
Possibly:
if (!isBinary() || !canSerialize || (isLocal && !readRemote())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, also going to change readRemote() to forceRemote() for clarity.
ecl/hthor/hthor.cpp
Outdated
calcFixedDiskRecordSize(); | ||
actualFilter.appendFilters(fieldFilters); | ||
|
||
bool canSerialize = actualDiskMeta->queryTypeInfo()->canSerialize() && projectedDiskMeta->queryTypeInfo()->canSerialize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: canSerializeTypeInfo would be a better indication of what couldn't be serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change.
@ghalliday - please see changes following last review. |
Automated Smoketest: ✅ Unit tests result:
Regression test result:
HPCC Stop: OK
|
Implementation of a streaming remote project/filtering disk i/o class for use by engines, plus initial use by hthor. NB: This commit contains various fixes to rtl, hthor, dafilesrv discovered/required during implementation. It also adds: 1) remote compression support 2) remote filtering 3) remote grouped file support 4) Tidies up the dafilesrv exception handling and improves the error messages fed back to the client. Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Type of change:
Checklist:
Testing:
Run through hthor regression suite with forceRemotePattern on for all hthor files.