Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PUBDEV-6938 GroupBy - support for grouping by String columns #4594

Merged
merged 3 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 126 additions & 60 deletions h2o-core/src/main/java/water/rapids/ast/prims/mungers/AstGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
* columns, and the last column(s) the reduction result(s).
* <p/>
* However, GroupBy operations will not be performed on String columns. These columns
* will be skipped.
* will be skipped. GroupBy for string columns cannot be performed directly using Ast API, but
* there is an internal implementation for such GroupBy operations -
* {@link AstGroup#performGroupingWithAggregations(Frame, int[], int[], AGG[])}.
* <p/>
* The returned column(s).
*/
Expand Down Expand Up @@ -286,21 +288,25 @@ public ValFrame apply(Env env, Env.StackHelp stk, AstRoot asts[]) {

return performGroupingWithAggregations(fr, gbCols, aggs);
}

public ValFrame performGroupingWithAggregations(Frame fr, int[] gbCols, AGG[] aggs) {
return performGroupingWithAggregations(fr, gbCols, new int[]{}, aggs);
}

public ValFrame performGroupingWithAggregations(Frame fr, int[] gbColsNum, int[] gbColsStr, AGG[] aggs) {
final boolean hasMedian = hasMedian(aggs);
final IcedHashSet<G> gss = doGroups(fr, gbCols, aggs, hasMedian, _per_node_aggregates);
final IcedHashSet<G> gss = doGroups(fr, gbColsNum, gbColsStr, aggs, hasMedian, _per_node_aggregates);
final G[] grps = gss.toArray(new G[gss.size()]);

applyOrdering(gbCols, grps);
applyOrdering(gbColsNum, gbColsStr, grps);

final int medianActionsNeeded = hasMedian ? calculateMediansForGRPS(fr, gbCols, aggs, gss, grps) : -1;
final int medianActionsNeeded = hasMedian ? calculateMediansForGRPS(fr, gbColsNum, gbColsStr, aggs, gss, grps) : -1;

MRTask mrFill = prepareMRFillTask(grps, aggs, medianActionsNeeded);

String[] fcNames = prepareFCNames(fr, aggs);

Frame f = buildOutput(gbCols, aggs.length, fr, fcNames, grps.length, mrFill);
Frame f = buildOutput(gbColsNum, gbColsStr, aggs.length, fr, fcNames, grps.length, mrFill);
return new ValFrame(f);
}

Expand All @@ -318,9 +324,13 @@ public void map(Chunk[] c, NewChunk[] ncs) {
int start = (int) c[0].start();
for (int i = 0; i < c[0]._len; ++i) {
G g = grps[i + start]; // One Group per row
int gbColsCnt = g._gs.length + g._gsStr.length;
honzasterba marked this conversation as resolved.
Show resolved Hide resolved
int j;
for (j = 0; j < g._gs.length; j++) // The Group Key, as a row
ncs[j].addNum(g._gs[j]);
// TODO: Columns order - currently string columns are first
for (j = 0; j < g._gsStr.length; j++) // The string group key, as a row
ncs[j].addStr(g._gsStr[j]);
for (j = g._gsStr.length; j < gbColsCnt; j++) // The numeric group key, as a row
ncs[j].addNum(g._gs[j - g._gsStr.length]);
for (int a = 0; a < aggs.length; a++) {
if ((medianCount >=0) && g.medianR._isMedian[a])
ncs[j++].addNum(g.medianR._medians[a]);
Expand Down Expand Up @@ -383,15 +393,21 @@ private AGG[] constructAggregates(Frame fr, int numberOfAggregates, Env env, Ast
return aggs;
}

private void applyOrdering(final int[] gbCols, G[] grps) {
if (gbCols.length > 0)
// TODO: Currently - Order by string columns first then by numeric columns
private void applyOrdering(final int[] gbColsNum, final int[] gbColsStr, G[] grps) {
if (gbColsStr.length > 0 || gbColsNum.length > 0)
Arrays.sort(grps, new java.util.Comparator<G>() {
// Compare 2 groups. Iterate down _gs, stop when _gs[i] > that._gs[i],
// or _gs[i] < that._gs[i]. Order by various columns specified by
// gbCols. NaN is treated as least
@Override
public int compare(G g1, G g2) {
for (int i = 0; i < gbCols.length; i++) {
for (int i = 0; i < gbColsStr.length; i++) {
if (g1._gsStr[i] != null && g2._gsStr[i] == null) return -1;
if (g1._gsStr[i] == null && g2._gsStr[i] != null) return 1;
if (!g1._gsStr[i].equals(g2._gsStr[i])) return g1._gsStr[i].compareTo(g2._gsStr[i]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compareTo already does equals, I think doing both here is not efficient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that it should not return 0 if we hit values that are are equal - based on the previous ordering implementation, as explained in the comment below:

// Compare 2 groups.  Iterate down _gs, stop when _gs[i] > that._gs[i],
// or _gs[i] < that._gs[i].  Order by various columns specified by
// gbCols.  NaN is treated as least

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, thats true, than maybe you could do

int res = g1._gsStr[i].compareTo(g2._gsStr[i]); if (res != 0) return res;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this looks cleaner and more efficient. I changed it.

}
for (int i = 0; i < gbColsNum.length; i++) {
if (Double.isNaN(g1._gs[i]) && !Double.isNaN(g2._gs[i])) return -1;
if (!Double.isNaN(g1._gs[i]) && Double.isNaN(g2._gs[i])) return 1;
if (g1._gs[i] != g2._gs[i]) return g1._gs[i] < g2._gs[i] ? -1 : 1;
Expand All @@ -407,7 +423,7 @@ public boolean equals(Object o) {
});
}

private int calculateMediansForGRPS(Frame fr, int[] gbCols, AGG[] aggs, IcedHashSet<G> gss, G[] grps) {
private int calculateMediansForGRPS(Frame fr, int[] gbColsNum, int[] gbColsStr, AGG[] aggs, IcedHashSet<G> gss, G[] grps) {
// median action exists, we do the following three things:
// 1. Find out how many columns over all groups we need to perform median on
// 2. Assign an index to the NewChunk that we will be storing the data for each median column for each group
Expand All @@ -420,7 +436,7 @@ private int calculateMediansForGRPS(Frame fr, int[] gbCols, AGG[] aggs, IcedHash
}
}
}
BuildGroup buildMedians = new BuildGroup(gbCols, aggs, gss, grps, numberOfMedianActionsNeeded);
BuildGroup buildMedians = new BuildGroup(gbColsNum, gbColsStr, aggs, gss, grps, numberOfMedianActionsNeeded);
Vec[] groupChunks = buildMedians.doAll(numberOfMedianActionsNeeded, Vec.T_NUM, fr).close();
buildMedians.calcMedian(groupChunks);
return numberOfMedianActionsNeeded;
Expand All @@ -443,47 +459,64 @@ public static AstNumList check(long dstX, AstRoot ast) {
// Do all the grouping work. Find groups in frame 'fr', grouped according to
// the selected 'gbCols' columns, and for each group compute aggregrate
// results using 'aggs'. Return an array of groups, with the aggregate results.
public static IcedHashSet<G> doGroups(Frame fr, int[] gbCols, AGG[] aggs) {
return doGroups(fr, gbCols, aggs, false, true);
public static IcedHashSet<G> doGroups(Frame fr, int[] gbColsNum, AGG[] aggs) {
return doGroups(fr, gbColsNum, new int[]{}, aggs, false, true);
}

private static IcedHashSet<G> doGroups(Frame fr, int[] gbCols, AGG[] aggs, boolean hasMedian, boolean perNodeAggregates) {
public static IcedHashSet<G> doGroups(Frame fr, int[] gbColsNum, int[] gbColsStr, AGG[] aggs) {
return doGroups(fr, gbColsNum, gbColsStr, aggs, false, true);
}

private static IcedHashSet<G> doGroups(Frame fr, int[] gbColsNum, int[] gbColsStr, AGG[] aggs, boolean hasMedian, boolean perNodeAggregates) {
// do the group by work now
long start = System.currentTimeMillis();
GBTask<?> p1 = makeGBTask(perNodeAggregates, gbCols, aggs, hasMedian).doAll(fr);
GBTask<?> p1 = makeGBTask(perNodeAggregates, gbColsNum, gbColsStr, aggs, hasMedian).doAll(fr);
Log.info("Group By Task done in " + (System.currentTimeMillis() - start) / 1000. + " (s)");
return p1.getGroups();
}

private static GBTask<? extends GBTask> makeGBTask(boolean perNodeAggregates, int[] gbCols, AGG[] aggs, boolean hasMedian) {
private static GBTask<? extends GBTask> makeGBTask(boolean perNodeAggregates, int[] gbColsNum, int[] gbColsStr, AGG[] aggs, boolean hasMedian) {
if (perNodeAggregates)
return new GBTaskAggsPerNode(gbCols, aggs, hasMedian);
return new GBTaskAggsPerNode(gbColsNum, gbColsStr, aggs, hasMedian);
else
return new GBTaskAggsPerMap(gbCols, aggs, hasMedian);
return new GBTaskAggsPerMap(gbColsNum, gbColsStr, aggs, hasMedian);
}

// Utility for AstDdply; return a single aggregate for counting rows-per-group
public static AGG[] aggNRows() {
return new AGG[]{new AGG(FCN.nrow, 0, NAHandling.IGNORE, 0)};
}

// Build output frame from the multi-column results
public static Frame buildOutput(int[] gbCols, int noutCols, Frame fr, String[] fcnames, int ngrps, MRTask mrfill) {

public static Frame buildOutput(int[] gbColsNum, int noutCols, Frame fr, String[] fcnames, int ngrps, MRTask mrfill) {
return buildOutput(gbColsNum, noutCols, fr, fcnames, ngrps, mrfill);
}

// Build output frame from the multi-column results
public static Frame buildOutput(int[] gbColsNum, int[] gbColsStr, int noutCols, Frame fr, String[] fcnames, int ngrps, MRTask mrfill) {
// Build the output!
// the names of columns
final int nCols = gbCols.length + noutCols;
final int gbColsCnt = gbColsNum.length + gbColsStr.length;
final int nCols = gbColsCnt + noutCols;
String[] names = new String[nCols];
String[][] domains = new String[nCols][];
byte[] types = new byte[nCols];
for (int i = 0; i < gbCols.length; i++) {
names[i] = fr.name(gbCols[i]);
domains[i] = fr.domains()[gbCols[i]];
// String GroupBy columns
for (int i = 0; i < gbColsStr.length; i++) {
names[i] = fr.name(gbColsStr[i]);
domains[i] = fr.domains()[gbColsStr[i]];
types[i] = fr.vec(names[i]).get_type();
}
// Numeric GroupBy columns
for (int i = gbColsStr.length; i < gbColsCnt; i++) {
names[i] = fr.name(gbColsNum[i - gbColsStr.length]);
domains[i] = fr.domains()[gbColsNum[i - gbColsStr.length]];
types[i] = fr.vec(names[i]).get_type();
}
// Output columns of GroupBy functions
for (int i = 0; i < fcnames.length; i++) {
names[i + gbCols.length] = fcnames[i];
types[i + gbCols.length] = Vec.T_NUM;
names[i + gbColsCnt] = fcnames[i];
types[i + gbColsCnt] = Vec.T_NUM;
}
Vec v = Vec.makeZero(ngrps); // dummy layout vec
// Convert the output arrays into a Frame, also doing the post-pass work
Expand Down Expand Up @@ -532,12 +565,14 @@ public double[] initVal() {
}

private static abstract class GBTask<E extends MRTask<E>> extends MRTask<E> {
final int[] _gbCols; // Columns used to define group
final int[] _gbColsNum; // Numeric columns used to define group
final int[] _gbColsStr; // String columns used to define group
final AGG[] _aggs; // Aggregate descriptions
final boolean _hasMedian;

GBTask(int[] gbCols, AGG[] aggs, boolean hasMedian) {
_gbCols = gbCols;
GBTask(int[] gbColsNum, int[] gbColsStr, AGG[] aggs, boolean hasMedian) {
_gbColsNum = gbColsNum;
_gbColsStr = gbColsStr;
_aggs = aggs;
_hasMedian = hasMedian;
}
Expand All @@ -555,23 +590,23 @@ private static abstract class GBTask<E extends MRTask<E>> extends MRTask<E> {
private static class GBTaskAggsPerNode extends GBTask<GBTaskAggsPerNode> {
final IcedHashSet<G> _gss; // Shared per-node, common, racy

GBTaskAggsPerNode(int[] gbCols, AGG[] aggs, boolean hasMedian) {
super(gbCols, aggs, hasMedian);
GBTaskAggsPerNode(int[] gbColsNum, int[] gbColsStr, AGG[] aggs, boolean hasMedian) {
super(gbColsNum, gbColsStr, aggs, hasMedian);
_gss = new IcedHashSet<>();
}

@Override
public void map(Chunk[] cs) {
// Groups found in this Chunk
IcedHashSet<G> gs = new IcedHashSet<>();
G gWork = new G(_gbCols.length, _aggs, _hasMedian); // Working Group
G gWork = new G(_gbColsNum.length, _gbColsStr.length, _aggs, _hasMedian); // Working Group
G gOld; // Existing Group to be filled in
for (int row = 0; row < cs[0]._len; row++) {
// Find the Group being worked on
gWork.fill(row, cs, _gbCols); // Fill the worker Group for the hashtable lookup
gWork.fill(row, cs, _gbColsNum, _gbColsStr); // Fill the worker Group for the hashtable lookup
if (gs.addIfAbsent(gWork) == null) { // Insert if not absent (note: no race, no need for atomic)
gOld = gWork; // Inserted 'gWork' into table
gWork = new G(_gbCols.length, _aggs, _hasMedian); // need entirely new G
gWork = new G(_gbColsNum.length, _gbColsStr.length, _aggs, _hasMedian); // need entirely new G
} else gOld = gs.get(gWork); // Else get existing group

for (int i = 0; i < _aggs.length; i++) // Accumulate aggregate reductions
Expand Down Expand Up @@ -656,22 +691,22 @@ protected void map(final int subGroupId) {
public static class GBTaskAggsPerMap extends GBTask<GBTaskAggsPerMap> {
IcedHashSet<G> _gss; // each map will have its own IcedHashMap

GBTaskAggsPerMap(int[] gbCols, AGG[] aggs, boolean hasMedian) {
super(gbCols, aggs, hasMedian);
GBTaskAggsPerMap(int[] gbColsNum, int[] gbColsStr, AGG[] aggs, boolean hasMedian) {
super(gbColsNum, gbColsStr, aggs, hasMedian);
}

@Override
public void map(Chunk[] cs) {
// Groups found in this Chunk
_gss = new IcedHashSet<>();
G gWork = new G(_gbCols.length, _aggs, _hasMedian); // Working Group
G gWork = new G(_gbColsNum.length, _gbColsStr.length, _aggs, _hasMedian); // Working Group
G gOld; // Existing Group to be filled in
for (int row = 0; row < cs[0]._len; row++) {
// Find the Group being worked on
gWork.fill(row, cs, _gbCols); // Fill the worker Group for the hashtable lookup
gWork.fill(row, cs, _gbColsNum, _gbColsStr); // Fill the worker Group for the hashtable lookup
if (_gss.addIfAbsent(gWork) == null) { // Insert if not absent (note: no race, no need for atomic)
gOld = gWork; // Inserted 'gWork' into table
gWork = new G(_gbCols.length, _aggs, _hasMedian); // need entirely new G
gWork = new G(_gbColsNum.length, _gbColsStr.length, _aggs, _hasMedian); // need entirely new G
} else gOld = _gss.get(gWork); // Else get existing group

for (int i = 0; i < _aggs.length; i++) // Accumulate aggregate reductions
Expand Down Expand Up @@ -712,11 +747,12 @@ public MedianResult(int len) {
_na = new NAHandling[len];
}
}
// Groups! Contains a Group Key - an array of doubles (often just 1 entry
// long) that defines the Group. Also contains an array of doubles for the
// Groups! Contains a Group Key - arrays of doubles and strings that
// define the Group. Also contains an array of doubles for the
// aggregate results, one per aggregate.
public static class G extends Iced<G> {
public final double[] _gs; // Group Key: Array is final; contents change with the "fill"
public final String[] _gsStr; // Group Key: Array is final; contents change with the "fill"
int _hash; // Hash is not final; changes with the "fill"

public final double _dss[][]; // Aggregates: usually sum or sum*2
Expand All @@ -728,12 +764,19 @@ public static class G extends Iced<G> {
public NAHandling[] _na;*/
public MedianResult medianR = null;

public G(int ncols, AGG[] aggs) {
this(ncols, aggs, false);
public G(int ncolsNum, AGG[] aggs) {
this(ncolsNum, 0, aggs, false);
}

public G(int ncolsNum, int ncolsStr, AGG[] aggs) {
this(ncolsNum, ncolsStr, aggs, false);
}

public G(int ncols, AGG[] aggs, boolean hasMedian) {
_gs = new double[ncols];
public G(int ncolsNum, AGG[] aggs, boolean hasMedian) { this(ncolsNum, 0, aggs, hasMedian); }

public G(int ncolsNum, int ncolsStr, AGG[] aggs, boolean hasMedian) {
_gs = new double[ncolsNum];
_gsStr = new String[ncolsStr];
int len = aggs == null ? 0 : aggs.length;
_dss = new double[len][];
_ns = new long[len];
Expand All @@ -753,15 +796,31 @@ public G(int ncols, AGG[] aggs, boolean hasMedian) {
}

public G fill(int row, Chunk chks[]) {
for (int c = 0; c < chks.length; c++) // For all selection cols
_gs[c] = chks[c].atd(row); // Load into working array
for (int c = 0; c < chks.length; c++) { // For all selection cols
Vec vec = chks[c].vec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of code in this PR handles switching between the 2 arrays (_gs nad _gsStr). This is due to the fact that we can have an arbitrary order of string/numerical columns on the input. However, in the implementation itself, we can re-order the columns to make sure we will have the exact order we want, eg. numerical first then string ones. Would that be something that would help with making the code less complex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is nothing that comes to my mind that could solve this problem. Switching between _gs and _gsStr is done in 2 places - filling the output frame and sorting it. In both cases it is expected to preserve order of the columns. There is a possibility (at least for the "filling" part) of replacing gbColsTypes array with 2 arrays defining order for each of the columns in _gs and _gsStr. But I don't know whether it significantly reduces the complexity, if at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my point of view, two arrays are ok in this case. One array with both types of values has the same complexity; there should also be if where we will have to decide what to do with string and numeric separately. Maybe some class can solve this, however, I think it is overengineering in this case.


// Load into working array
if (vec.isString())
_gsStr[c] = chks[c].stringAt(row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is better to keep everything in BufferedStrings as much as possible (atStr method), this won't duplicate the memory needed to hold the string, BufferedString just references an existing byte array that holds the actual data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I will adjust it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to use BufferedString internally.

else
_gs[c] = chks[c].atd(row);
}

_hash = hash();
return this;
}

public G fill(int row, Chunk chks[], int cols[]) {
for (int c = 0; c < cols.length; c++) // For all selection cols
_gs[c] = chks[cols[c]].atd(row); // Load into working array
public G fill(int row, Chunk[] chks, int[] colsNum) { return fill(row, chks, colsNum, new int[]{}); }

public G fill(int row, Chunk[] chks, int[] colsNum, int[] colsStr) {
for (int c = 0; c < colsNum.length; c++) // For all numeric selection cols
// Load into working array
_gs[c] = chks[colsNum[c]].atd(row);

for (int c = 0; c < colsStr.length; c++) // For all string selection cols
// Load into working array
_gsStr[c] = chks[colsStr[c]].stringAt(row);

_hash = hash();
return this;
}
Expand All @@ -772,12 +831,17 @@ protected int hash() {
// Doubles are lousy hashes; mix up the bits some
h ^= (h >>> 20) ^ (h >>> 12);
h ^= (h >>> 7) ^ (h >>> 4);

for (String str : _gsStr) h = 37 * h + str.hashCode();

return (int) ((h ^ (h >> 32)) & 0x7FFFFFFF);
}

@Override
public boolean equals(Object o) {
return o instanceof G && Arrays.equals(_gs, ((G) o)._gs);
return o instanceof G
&& Arrays.equals(_gs, ((G) o)._gs)
&& Arrays.equals(_gsStr, ((G) o)._gsStr);
}

@Override
Expand All @@ -787,7 +851,7 @@ public int hashCode() {

@Override
public String toString() {
return Arrays.toString(_gs);
return Arrays.toString(_gsStr) + " - " + Arrays.toString(_gs);
}
}

Expand All @@ -796,15 +860,17 @@ public String toString() {
// extract the column per groupG per aggregate function into a NewChunk column
// here.
private static class BuildGroup extends MRTask<BuildGroup> {
final int[] _gbCols;
final int[] _gbColsNum;
final int[] _gbColsStr;
private final AGG[] _aggs; // Aggregate descriptions
private final int _medianCols;
IcedHashSet<G> _gss;
private G[] _grps;


BuildGroup(int[] gbCols, AGG[] aggs, IcedHashSet<G> gss, G[] grps, int medianCols) {
_gbCols = gbCols;
BuildGroup(int[] gbColsNum, int[] gbColsStr, AGG[] aggs, IcedHashSet<G> gss, G[] grps, int medianCols) {
_gbColsNum = gbColsNum;
_gbColsStr = gbColsStr;
_aggs = aggs;
_gss = gss;
_grps = grps;
Expand All @@ -813,11 +879,11 @@ private static class BuildGroup extends MRTask<BuildGroup> {

@Override
public void map(Chunk[] cs, NewChunk[] ncs) {
G gWork = new G(_gbCols.length, _aggs, _medianCols > 0); // Working Group
G gWork = new G(_gbColsNum.length, _gbColsStr.length, _aggs, _medianCols > 0); // Working Group
G gOld;

for (int row = 0; row < cs[0]._len; row++) { // for each
gWork.fill(row, cs, _gbCols);
gWork.fill(row, cs, _gbColsNum, _gbColsStr);
gOld = _gss.get(gWork);
for (int i = 0; i < gOld.medianR._isMedian.length; i++) { // Accumulate aggregate reductions
if (gOld.medianR._isMedian[i]) { // median action required on column and group
Expand Down
Loading