Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-16476 Refactored Keyed Join #11024

Merged
merged 1 commit into from Apr 18, 2018
Merged

Conversation

@jakesmith
Copy link
Member

jakesmith commented Apr 13, 2018

Type of change:

  • This change is a bug fix (non-breaking change which fixes an issue).
  • This change is a new feature (non-breaking change which adds functionality).
  • This change improves the code (refactor or other change that does not change the functionality)
  • This change fixes warnings (the fix does not alter the functionality or the generated code)
  • This change is a breaking change (fix or feature that will cause existing behavior to change).
  • This change alters the query API (existing queries will have to be recompiled)

Checklist:

  • My code follows the code style of this project.
    • My code does not create any new warnings from compiler, build system, or lint.
  • The commit message is properly formatted and free of typos.
    • The commit message title makes sense in a changelog, by itself.
    • The commit is signed.
  • My change requires a change to the documentation.
    • I have updated the documentation accordingly, or...
    • I have created a JIRA ticket to update the documentation.
    • Any new interfaces or exported functions are appropriately commented.
  • I have read the CONTRIBUTORS document.
  • The change has been fully tested:
    • I have added tests to cover my changes.
    • All new and existing tests passed.
    • I have checked that this change does not introduce memory leaks.
    • I have used Valgrind or similar tools to check for potential issues.
  • I have given due consideration to all of the following potential concerns:
    • Scalability
    • Performance
    • Security
    • Thread-safety
    • Premature optimization
    • Existing deployed queries will not be broken
    • This change fixes the problem, not just the symptom
    • The target branch of this pull request is appropriate for such a change.
  • There are no similar instances of the same problem that should be addressed
    • I have addressed them here
    • I have raised JIRA issues to address them separately
  • This is a user interface / front-end modification
    • I have tested my changes in multiple modern browsers
    • The component(s) render as expected

Testing:

Regression suite in various configurations.
Extended some regression tests to force remote reads (to emulate multi node clusters).
Performance suite on 12 slaves (3 nodes, slavesPerNode=4)
Ran through some test through valgrind

@hpcc-jirabot

This comment has been minimized.

Copy link

hpcc-jirabot commented Apr 13, 2018

@jakesmith

This comment has been minimized.

Copy link
Member Author

jakesmith commented Apr 13, 2018

@ghalliday - please review

Copy link
Member

ghalliday left a comment

Jake a few initial comments.

#include "../hashdistrib/thhashdistrib.ipp"
#include "thkeyedjoin.ipp"
#include "jhtree.hpp"

static const std::array<StatisticKind, 8> progressKinds = {{ StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected }};

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

Out of curiosity, why not
static const StatisticKind[8] = { ... }?

Read some more details. I can see it potentially being useful. I don't think the double {} are needed if RHS of an assignment.

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 16, 2018

Author Member

yes, I can take the double braces out, can also abbreviate it to :
static const std::array<StatisticKind, 8> progressKinds{ StNumIndexSeeks, ...... };

{
slaveParts.push_back(p);
if (superFileDesc)
partIdx = superWidth*subfile+subPartIdx;

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

afaics this has already been done in line 127

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 16, 2018

Author Member

yes, should be removed.

INode &groupNode = dfsGroup.queryNode(gn);
if ((partNode->equals(&groupNode)))
{
if (!partsOnSlaves->testSet(groupSize*p+gn))

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

This could do with a comment. I think it is ensuring that only the first copy that exists on the group is processed.

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 16, 2018

Author Member

I'll add a comment.

initMb.append(tag);
initMb.append(remoteKeyedLookup);
initMb.append(remoteKeyedFetch);
indexMap.map(*this, indexFile, keyHasTlk, getOptBool("allLocalIndexParts"));

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

clearer if this happened before the appends? It isn't related to the line before or after as far as I can see.

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 16, 2018

Author Member

agree, will move it up.

return;
RowArray &rowArray = rowArrays[partNo];
rowArray.rows[sequence].rhs = right;
++totalRows;

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

minor: Probably insignificant so I wouldn't change, but if most increments of this are within a critical section, it might be more efficient to protect this in a critical section and not use an atomic.

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 16, 2018

Author Member

addRightMatchCompletePending is not called in a crit.

/* NB: encode *index* partNo into sequence
* This is used when result comes back to preserve order when calling joinGroup->addRightMatchCompletePending()
*/
sequence = sequence | (((unsigned __int64)partNo) << 32);

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

pedantic: should assert that sequence < 2^32?

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 17, 2018

Author Member

I'll add a check.

dstNode = ((FPosTableEntry *)result)->index;
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> 24;
Owned<IKeyIndex> keyIndex = activit

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

Out of interest, does this help?

dstNode = ((FPosTableEntry *)result)->index;
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> 24;
Owned<IKeyIndex> keyIndex = activit

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

Not clear from reading this code that it is safe. It probably is ok, but reading into an unsigned would be safer.

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 16, 2018

Author Member

Not clear from reading this code that it is safe. It probably is ok, but reading into an unsigned would be safer.

Can you re-add your comment - it does seem github has lost it's position..

dstNode = ((FPosTableEntry *)result)->index;
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> 24;
Owned<IKeyIndex> keyIndex = activit

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

Need to double check - they might never be needed any more...

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 16, 2018

Author Member

Not clear from reading this code that it is safe. It probably is ok, but reading into an unsigned would be safer.

Can you re-add your comment - it does seem github has lost it's position..

dstNode = ((FPosTableEntry *)result)->index;
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> 24;
Owned<IKeyIndex> keyIndex = activit

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 16, 2018

Member

another case where ScopedAtomic would help (probably several others in the same file)

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 16, 2018

Author Member

Not clear from reading this code that it is safe. It probably is ok, but reading into an unsigned would be safer.

Can you re-add your comment - it does seem github has lost it's position..

@ghalliday

This comment has been minimized.

Copy link
Member

ghalliday commented Apr 16, 2018

The are some other potential inefficiencies e.g., allocation of CJoinGroup and possible clashes on variable size allocators that I would be concerned about if the data was in memory, but unlikely to be issues for a keyed join, so didn't comment.

@jakesmith

This comment has been minimized.

Copy link
Member Author

jakesmith commented Apr 16, 2018

@ghalliday - replied to a few comments, I don't know what happened with the 'show outdated' ones, can you try to re-add, so I can see what they refer to?

Copy link
Member

ghalliday left a comment

Added comments back on the lines.

There were also collapsed comments that you hadn't responded to. I am now going to walk through the code to see if I spot anything else.

dstNode = ((FPosTableEntry *)result)->index;
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> 24;
Owned<IKeyIndex> keyIndex = activit

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

"Out of interest, does this help?"

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

line 1068

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 17, 2018

Author Member

1068: fastLZDecompressToBuffer(mb, msg);

I didn't test without, but am trying to conserve network bandwidth and as there are multiple handles and receiver threads dealing with compression/decompression I wasn't overly worried about the cpu overhead.
I could add an option and test without compression.

dstNode = ((FPosTableEntry *)result)->index;
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> 24;
Owned<IKeyIndex> keyIndex = activit

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

"Not clear from reading this code that it is safe. It probably is ok, but reading into an unsigned would be safer."

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

line 1081

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 17, 2018

Author Member

1081: mb.read((unsigned &)flags);

GroupFlags is defined as :

enum GroupFlags:unsigned { gf_null=0x0, gf_limitatmost=0x01, gf_limitabort=0x02, gf_eog=0x04, gf_head=0x08 };

so should be safe?
I could use std::underlying_type to get at the underlying type..

dstNode = ((FPosTableEntry *)result)->index;
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> 24;
Owned<IKeyIndex> keyIndex = activit

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

"Need to double check - they might never be needed any more..."

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

line 1117

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 17, 2018

Author Member

1117: mb.read(matches * sizeof(unsigned __int64), &fposs[0]); // JCSMORE shame to serialize these if not needed, does codegen give me a tip?

ok, well would be good to remove, as it's causing extra traffic at the moment..

dstNode = ((FPosTableEntry *)result)->index;
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> 24;
Owned<IKeyIndex> keyIndex = activit

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

Use ScopedAtomic in jatomic.hpp to avoid an atomic inc for each iteration around the loop.

Anywhere an atomic is being iterated in a loop it is likely to be more efficient to use ScopedAtomic - it it exception safe. Only complication is you would need to add the local to the stat in the line below.

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

line 1233

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

(and following)

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 17, 2018

Author Member

1233: activity.statsArr[AS_DiskAccepted]++;

Ok, sounds good, will change to use ScopedAtomics.

dstNode = ((FPosTableEntry *)result)->index;
unsigned partNo = partCopy & partMask;
unsigned copy = partCopy >> 24;
Owned<IKeyIndex> keyIndex = activit

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

Another example for a ScopedAtomic

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

line 1379

This comment has been minimized.

Copy link
@jakesmith

jakesmith Apr 17, 2018

Author Member

1379: activity.statsArr[AS_DiskSeeks]++; // NB: really the seek happened on the remote side, but it can't be tracked into the activity stats there.

will change.

@hpcc-systems hpcc-systems deleted a comment from jakesmith Apr 17, 2018
@ghalliday

This comment has been minimized.

Copy link
Member

ghalliday commented Apr 17, 2018

thkeyedjoinslave.cpp(2090) should this be handlerContainer instead of keyLookupHandlers?

@ghalliday

This comment has been minimized.

Copy link
Member

ghalliday commented Apr 17, 2018

thkeyedjoinslave.cpp(2125) repeated condition (as line 2122)

@ghalliday

This comment has been minimized.

Copy link
Member

ghalliday commented Apr 17, 2018

thkeyedjoinslave.cpp(2361) should this be controlled by a tracing level?

@ghalliday

This comment has been minimized.

Copy link
Member

ghalliday commented Apr 17, 2018

thkeyedjoinslave.cpp(1759) How common is the case with multiple TLKs. If comment then it might be better to create the segment monitors once, and then clone them into each key manager. Possibly something for another jira.

@ghalliday

This comment has been minimized.

Copy link
Member

ghalliday commented Apr 17, 2018

curious why you have gone for "do {} while(true)" rather than "for (;;)"

@jakesmith

This comment has been minimized.

Copy link
Member Author

jakesmith commented Apr 17, 2018

thkeyedjoinslave.cpp(2090) should this be handlerContainer instead of keyLookupHandlers?

Yes, I'll change, although in reality handlerContainer will always be keyLookupHandlers in this context.

@jakesmith

This comment has been minimized.

Copy link
Member Author

jakesmith commented Apr 17, 2018

thkeyedjoinslave.cpp(2125) repeated condition (as line 2122)

yes, will remove.

@jakesmith

This comment has been minimized.

Copy link
Member Author

jakesmith commented Apr 17, 2018

thkeyedjoinslave.cpp(2361) should this be controlled by a tracing level?

probably, unfortunately , Thor doesn't have a dynamic log level tracing at the moment.
In this case, this is old tracing I'm happy to delete or make debug only.

I'll wrap it in a #ifdef _DEBUG

@jakesmith

This comment has been minimized.

Copy link
Member Author

jakesmith commented Apr 17, 2018

thkeyedjoinslave.cpp(1759) How common is the case with multiple TLKs. If comment then it might be better to create the segment monitors once, and then clone them into each key manager. Possibly something for another jira.

1759: helper->createSegmentMonitors(&keyManager, keyedFieldsRow);

SuperIndex reasonably common in some use cases.

, and then clone them into each key manager.

is that possible now?

@jakesmith

This comment has been minimized.

Copy link
Member Author

jakesmith commented Apr 17, 2018

curious why you have gone for "do {} while(true)" rather than "for (;;)"

I prefer while (true) { } , than for (;;)
it looks more natural / logical to me.

But I don't know why I've gone for do { } while (true); in some cases. I might change them all to while (true) { }

@jakesmith jakesmith force-pushed the jakesmith:hpcc-16476 branch from f27c504 to 9597f73 Apr 17, 2018
@@ -27,7 +27,7 @@
#include "thkeyedjoin.ipp"
#include "jhtree.hpp"

static const std::array<StatisticKind, 8> progressKinds = {{ StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected }};
static const std::array<StatisticKind, 8> progressKinds{ StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected };

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

I think I read that you DO need double {} on c++11 if you are not using = (but not c++14). See http://en.cppreference.com/w/cpp/container/array

activity.statsArr[AS_DiskAccepted]++;
if (activity.statsArr[AS_DiskAccepted] > activity.rowLimit)
diskAccepted++;
if (diskAccepted > activity.rowLimit)

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 17, 2018

Member

I think this needs to be diskAccepted + activity.statsArr[AS_DiskAccepted], but I think what you are expecting is more natural. @richardkchapman what do you think about changing it?

@ghalliday

This comment has been minimized.

Copy link
Member

ghalliday commented Apr 17, 2018

and then clone them into each key manager.

is that possible now?

We would need to double check whether the segment monitors get modified once they are passed to the key manager. (They might be for optimization or smart stepping...) If they are not then it should be possible. The parameter passed to create segment monitors is an interface, which could save them, and then clone them from the array each time.

@jakesmith

This comment has been minimized.

Copy link
Member Author

jakesmith commented Apr 17, 2018

@ghalliday - please review last couple of commits and let me know when ready to squash.

@@ -1230,8 +1229,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor
// If !preserverOrder, right rows added to single array in jg, so pass 0
joinGroup->addRightMatchCompletePending(activity.preserveOrder ? indexPartNo : 0, sequence, fetchRow);

diskAccepted++;
if (diskAccepted > activity.rowLimit)
if (++activity.statsArr[AS_DiskAccepted] > activity.rowLimit)

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 18, 2018

Member

This is less efficient, but I agree it is needed because you actually care about inter thread consistency.

if (activityCtx->useMessageCompression())
{
fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray());
replyMb.clear();

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 18, 2018

Member

correct as it stands, but clearer if you use tempMB.clear()

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 18, 2018

Member

same on line 904

@jakesmith jakesmith force-pushed the jakesmith:hpcc-16476 branch from 636065a to 8d1e8cc Apr 18, 2018
{
if (seq)
{
dbgassertex(actId <= 0xffff); // normal max is MAX_ACTIVITY_ID (0xffffff), reserving 0x00ff0000 byte for seq if present

This comment has been minimized.

Copy link
@ghalliday

ghalliday Apr 18, 2018

Member

This limit is too low. I have an example (issue16893) with over 200,000 activities in a graph.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
@jakesmith jakesmith force-pushed the jakesmith:hpcc-16476 branch from dcc9ef2 to 9b84e2d Apr 18, 2018
@HPCCSmoketest

This comment has been minimized.

Copy link
Contributor

HPCCSmoketest commented Apr 18, 2018

Automated Smoketest:
OS: centos 7.2.1511 (Linux 3.10.0-327.28.3.el7.x86_64)
Sha: dcc9ef2
Build: success
Install hpccsystems-platform-community_6.5.0-trunk0.el7.x86_64.rpm
HPCC Start: OK

Unit tests result:

Test total passed failed errors timeout
unittest 88 88 0 0 0
wutoolTest(Dali) 19 19 0 0 0
wutoolTest(Cassandra) 19 19 0 0 0

Regression test result:

phase total pass fail
setup (hthor) 11 11 0
setup (thor) 11 11 0
setup (roxie) 11 11 0
test (hthor) 776 776 0
test (thor) 706 706 0
test (roxie) 819 819 0

HPCC Stop: OK
HPCC Uninstall: OK
Time stats:

Prep time Build time Package time Install time Start time Test time Stop time Summary
28 sec (00:00:28) 185 sec (00:03:05) 56 sec (00:00:56) 5 sec (00:00:05) 21 sec (00:00:21) 1157 sec (00:19:17) 16 sec (00:00:16) 1468 sec (00:24:28)
@ghalliday ghalliday merged commit 30d109d into hpcc-systems:master Apr 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.