Skip to content

Commit

Permalink
Optimizing Merge speed performance. Simplifying some rather obscure
Browse files Browse the repository at this point in the history
code.
  • Loading branch information
leerho committed Apr 17, 2022
1 parent 876059e commit 231201f
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 90 deletions.
45 changes: 21 additions & 24 deletions src/main/java/org/apache/datasketches/kll/KllDoublesHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,20 @@ static double[] getDoublesQuantiles(final KllSketch mine, final double[] fractio
static void mergeDoubleImpl(final KllSketch mine, final KllSketch other) {
if (other.isEmpty()) { return; }
final long finalN = mine.getN() + other.getN();
final double[] otherDoubleItemsArr = other.getDoubleItemsArray();
final int otherNumLevels = other.getNumLevels();
final int[] otherLevelsArr = other.getLevelsArray();
final double[] otherDoubleItemsArr;
//capture my min & max, minK
final double myMin = mine.getMinDoubleValue();
final double myMax = mine.getMaxDoubleValue();
final int myMinK = mine.getMinK();

//update this sketch with level0 items from the other sketch
if (other.isCompactSingleItem()) {
updateDouble(mine, other.getFloatSingleItem());
updateDouble(mine, other.getDoubleSingleItem());
otherDoubleItemsArr = new double[0];
} else {
otherDoubleItemsArr = other.getDoubleItemsArray();
for (int i = otherLevelsArr[0]; i < otherLevelsArr[1]; i++) {
KllDoublesHelper.updateDouble(mine, otherDoubleItemsArr[i]);
}
Expand All @@ -150,11 +152,11 @@ static void mergeDoubleImpl(final KllSketch mine, final KllSketch other) {
final int[] myCurLevelsArr = mine.getLevelsArray();
final double[] myCurDoubleItemsArr = mine.getDoubleItemsArray();

final int myNewNumLevels;
final int[] myNewLevelsArr;
final double[] myNewDoubleItemsArr;
int myNewNumLevels = myCurNumLevels;
int[] myNewLevelsArr = myCurLevelsArr;
double[] myNewDoubleItemsArr = myCurDoubleItemsArr;

if (otherNumLevels > 1) { //now merge other levels if they exist
if (otherNumLevels > 1 && !other.isCompactSingleItem()) { //now merge other levels if they exist
final int tmpSpaceNeeded = mine.getNumRetained()
+ KllHelper.getNumRetainedAboveLevelZero(otherNumLevels, otherLevelsArr);
final double[] workbuf = new double[tmpSpaceNeeded];
Expand All @@ -164,7 +166,9 @@ static void mergeDoubleImpl(final KllSketch mine, final KllSketch other) {

final int provisionalNumLevels = max(myCurNumLevels, otherNumLevels);

populateDoubleWorkArrays(mine, other, workbuf, worklevels, provisionalNumLevels);
populateDoubleWorkArrays(workbuf, worklevels, provisionalNumLevels,
myCurNumLevels, myCurLevelsArr, myCurDoubleItemsArr,
otherNumLevels, otherLevelsArr, otherDoubleItemsArr);

// notice that workbuf is being used as both the input and output
final int[] result = generalDoublesCompress(mine.getK(), mine.getM(), provisionalNumLevels,
Expand Down Expand Up @@ -202,11 +206,6 @@ static void mergeDoubleImpl(final KllSketch mine, final KllSketch other) {
if (mine.updatableMemFormat) {
mine.wmem = KllHelper.memorySpaceMgmt(mine, myNewLevelsArr.length, myNewDoubleItemsArr.length);
}

} else {
myNewNumLevels = myCurNumLevels;
myNewLevelsArr = myCurLevelsArr;
myNewDoubleItemsArr = myCurDoubleItemsArr;
}

//Update Preamble:
Expand Down Expand Up @@ -481,30 +480,28 @@ private static void incrementDoublesBucketsUnsortedLevel(
}
}

private static void populateDoubleWorkArrays(final KllSketch mine, final KllSketch other, final double[] workbuf,
final int[] worklevels, final int provisionalNumLevels) {
private static void populateDoubleWorkArrays(
final double[] workbuf, final int[] worklevels, final int provisionalNumLevels,
final int myCurNumLevels, final int[] myCurLevelsArr, final double[] myCurDoubleItemsArr,
final int otherNumLevels, final int[] otherLevelsArr, final double[] otherDoubleItemsArr) {
worklevels[0] = 0;
final int[] myLevelsArr = mine.getLevelsArray();
final int[] otherLevelsArr = other.getLevelsArray();
final double[] myDoubleItemsArr = mine.getDoubleItemsArray();
final double[] otherDoubleItemsArr = other.getDoubleItemsArray();

// Note: the level zero data from "other" was already inserted into "self"
final int selfPopZero = KllHelper.currentLevelSize(0, mine.getNumLevels(),myLevelsArr);
System.arraycopy(myDoubleItemsArr, myLevelsArr[0], workbuf, worklevels[0], selfPopZero);
final int selfPopZero = KllHelper.currentLevelSize(0, myCurNumLevels,myCurLevelsArr);
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
worklevels[1] = worklevels[0] + selfPopZero;

for (int lvl = 1; lvl < provisionalNumLevels; lvl++) {
final int selfPop = KllHelper.currentLevelSize(lvl, mine.getNumLevels(), myLevelsArr);
final int otherPop = KllHelper.currentLevelSize(lvl, other.getNumLevels(), otherLevelsArr);
final int selfPop = KllHelper.currentLevelSize(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSize(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myDoubleItemsArr, myLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
} else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherDoubleItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
} else if (selfPop > 0 && otherPop > 0) {
mergeSortedDoubleArrays(myDoubleItemsArr, myLevelsArr[lvl], selfPop, otherDoubleItemsArr,
mergeSortedDoubleArrays(myCurDoubleItemsArr, myCurLevelsArr[lvl], selfPop, otherDoubleItemsArr,
otherLevelsArr[lvl], otherPop, workbuf, worklevels[lvl]);
}
}
Expand Down
43 changes: 20 additions & 23 deletions src/main/java/org/apache/datasketches/kll/KllFloatsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ static float[] getFloatsQuantiles(final KllSketch mine, final double[] fractions
static void mergeFloatImpl(final KllSketch mine, final KllSketch other) {
if (other.isEmpty()) { return; }
final long finalN = mine.getN() + other.getN();
final float[] otherFloatItemsArr = other.getFloatItemsArray();
final int otherNumLevels = other.getNumLevels();
final int[] otherLevelsArr = other.getLevelsArray();
final float[] otherFloatItemsArr;
//capture my min & max, minK
final float myMin = mine.getMinFloatValue();
final float myMax = mine.getMaxFloatValue();
Expand All @@ -140,7 +140,9 @@ static void mergeFloatImpl(final KllSketch mine, final KllSketch other) {
//update this sketch with level0 items from the other sketch
if (other.isCompactSingleItem()) {
updateFloat(mine, other.getFloatSingleItem());
otherFloatItemsArr = new float[0];
} else {
otherFloatItemsArr = other.getFloatItemsArray();
for (int i = otherLevelsArr[0]; i < otherLevelsArr[1]; i++) {
updateFloat(mine, otherFloatItemsArr[i]);
}
Expand All @@ -150,11 +152,11 @@ static void mergeFloatImpl(final KllSketch mine, final KllSketch other) {
final int[] myCurLevelsArr = mine.getLevelsArray();
final float[] myCurFloatItemsArr = mine.getFloatItemsArray();

final int myNewNumLevels;
final int[] myNewLevelsArr;
final float[] myNewFloatItemsArr;
int myNewNumLevels = myCurNumLevels;
int[] myNewLevelsArr = myCurLevelsArr;
float[] myNewFloatItemsArr = myCurFloatItemsArr;

if (otherNumLevels > 1) { //now merge higher levels if they exist
if (otherNumLevels > 1 && !other.isCompactSingleItem()) { //now merge higher levels if they exist
final int tmpSpaceNeeded = mine.getNumRetained()
+ KllHelper.getNumRetainedAboveLevelZero(otherNumLevels, otherLevelsArr);
final float[] workbuf = new float[tmpSpaceNeeded];
Expand All @@ -164,7 +166,9 @@ static void mergeFloatImpl(final KllSketch mine, final KllSketch other) {

final int provisionalNumLevels = max(myCurNumLevels, otherNumLevels);

populateFloatWorkArrays(mine, other, workbuf, worklevels, provisionalNumLevels);
populateFloatWorkArrays(workbuf, worklevels, provisionalNumLevels,
myCurNumLevels, myCurLevelsArr, myCurFloatItemsArr,
otherNumLevels, otherLevelsArr, otherFloatItemsArr);

// notice that workbuf is being used as both the input and output
final int[] result = generalFloatsCompress(mine.getK(), mine.getM(), provisionalNumLevels,
Expand Down Expand Up @@ -202,11 +206,6 @@ static void mergeFloatImpl(final KllSketch mine, final KllSketch other) {
if (mine.updatableMemFormat) {
mine.wmem = KllHelper.memorySpaceMgmt(mine, myNewLevelsArr.length, myNewFloatItemsArr.length);
}

} else {
myNewNumLevels = myCurNumLevels;
myNewLevelsArr = myCurLevelsArr;
myNewFloatItemsArr = myCurFloatItemsArr;
}

//Update Preamble:
Expand Down Expand Up @@ -481,30 +480,28 @@ private static void incrementFloatBucketsUnsortedLevel(
}
}

private static void populateFloatWorkArrays(final KllSketch mine, final KllSketch other, final float[] workbuf,
final int[] worklevels, final int provisionalNumLevels) {
private static void populateFloatWorkArrays(
final float[] workbuf, final int[] worklevels, final int provisionalNumLevels,
final int myCurNumLevels, final int[] myCurLevelsArr, final float[] myCurFloatItemsArr,
final int otherNumLevels, final int[] otherLevelsArr, final float[] otherFloatItemsArr) {
worklevels[0] = 0;
final int[] myLevelsArr = mine.getLevelsArray();
final int[] otherLevelsArr = other.getLevelsArray();
final float[] myFloatItemsArr = mine.getFloatItemsArray();
final float[] otherFloatItemsArr = other.getFloatItemsArray();

// Note: the level zero data from "other" was already inserted into "self"
final int selfPopZero = KllHelper.currentLevelSize(0, mine.getNumLevels(), myLevelsArr);
System.arraycopy( myFloatItemsArr, myLevelsArr[0], workbuf, worklevels[0], selfPopZero);
final int selfPopZero = KllHelper.currentLevelSize(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy( myCurFloatItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
worklevels[1] = worklevels[0] + selfPopZero;

for (int lvl = 1; lvl < provisionalNumLevels; lvl++) {
final int selfPop = KllHelper.currentLevelSize(lvl, mine.getNumLevels(), myLevelsArr);
final int otherPop = KllHelper.currentLevelSize(lvl, other.getNumLevels(), otherLevelsArr);
final int selfPop = KllHelper.currentLevelSize(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSize(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

if (selfPop > 0 && otherPop == 0) {
System.arraycopy( myFloatItemsArr, myLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
System.arraycopy(myCurFloatItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
} else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherFloatItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
} else if (selfPop > 0 && otherPop > 0) {
mergeSortedFloatArrays( myFloatItemsArr, myLevelsArr[lvl], selfPop, otherFloatItemsArr,
mergeSortedFloatArrays( myCurFloatItemsArr, myCurLevelsArr[lvl], selfPop, otherFloatItemsArr,
otherLevelsArr[lvl], otherPop, workbuf, worklevels[lvl]);
}
}
Expand Down
33 changes: 9 additions & 24 deletions src/main/java/org/apache/datasketches/kll/KllSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ enum Error {
SRC_MUST_BE_FLOAT("Given sketch must be of type Float."),
MUST_NOT_CALL("This is an artifact of inheritance and should never be called."),
SINGLE_ITEM_IMPROPER_CALL("Improper method use for single-item sketch"),
ERROR_UNKNOWN("Possible sketch corruption: Unknown error.");
MRS_MUST_NOT_BE_NULL("MemoryRequestServer cannot be null.");

private String msg;

Expand Down Expand Up @@ -125,7 +125,7 @@ private String getMessage() {
WritableMemory wmem;

/**
* Constructor for writable updatable memory and for the Heap.
* Constructor for on-heap and off-heap.
* If both wmem and memReqSvr are null, this is a heap constructor.
* If wmem != null and wmem is not readOnly, then memReqSvr must not be null.
* If wmem was derived from an original Memory instance via a cast, it will be readOnly.
Expand All @@ -139,31 +139,16 @@ private String getMessage() {
if (wmem != null) {
this.updatableMemFormat = KllPreambleUtil.getMemoryUpdatableFormatFlag(wmem);
this.readOnly = wmem.isReadOnly() || !updatableMemFormat;
final int sw = (readOnly ? 1 : 0)
| (updatableMemFormat ? 2 : 0)
| ((memReqSvr != null) ? 4 : 0);
switch (sw) {
case 1: //no MemReqSvr, compact, readOnly -> ReadOnly Compact
case 5: //MemReqSvr, compact, readOnly -> ReadOnly Compact, ignore MemReqSvr
case 3: //no MemReqSvr, updatable, readOnly -> ReadOnly Updatable
case 7: { //MemReqSvr, updatable, readOnly -> ReadOnly Updatable, ignore MemReqSvr
this.memReqSvr = null;
break;
}
case 6: { //MemReqSvr, updatable, writable -> Normal Direct
this.memReqSvr = memReqSvr;
break;
}
default: { //unlikely
this.memReqSvr = null;
kllSketchThrow(Error.ERROR_UNKNOWN);
break;
}
if (readOnly) {
this.memReqSvr = null;
} else {
if (memReqSvr == null) { kllSketchThrow(Error.MRS_MUST_NOT_BE_NULL); }
this.memReqSvr = memReqSvr;
}
} else { //wmem is null, heap case
this.updatableMemFormat = false;
this.memReqSvr = null; //no matter what
this.readOnly = false; //heap sketch
this.memReqSvr = null;
this.readOnly = false;
}
}

Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/apache/datasketches/kll/package-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@
*
* <ul>
* <li><b>KllFloatsSketch</b>: This operates on the Java heap and uses the java <i>float</i> primitive for the
* smallest possible size. It can be serialized to a compact, immutable format or to an updatable format suitable for
* use by the Kll Direct sketches.</li>
* smallest possible size. It can be serialized to a compact, immutable format or to an updatable format, which can
* be modified off-heap. </li>
* <li><b>KllDoublesSketch</b>: This operates on the Java heap and uses the java <i>double</i> primitive for a much
* larger range of numeric values, and is larger as a result. It can be serialized to a compact, immutable format or
* to an updatable format suitable for use by the Kll Direct sketches.</li>
* <li><b>KllDirectFloatsSketch</b>: This is intended to operate off-heap and performs all of its operations in one
* contiguous chunk of memory. It uses the java <i>float</i> primitive for the smallest possible size off-heap.</li>
* <li><b>KllDirectDoublesSketch</b>: This is intended to operate off-heap and performs all of its operations in one
* contiguous chunk of memory. It uses the java <i>double</i> primitive for a much larger range of numeric values,
* and is larger as a result.</li>
* to an updatable format, which can be modified off-heap.</li>
* <li><b>KllFloatsSketchIterator</b>: Iterates over the retained values and weights of the KllFloatsSketch.</li>
* <li><b>KllDoublesSketchIterator</b>: : Iterates over the retained values and weights of the KllDoublesSketch.</li>
* <li><b>KllSketch</b>: The root of the Kll Sketch hierarchy and home of all methods that are in common with both
* the KllFloatsSketch and the KllDoublesSketch.
* </li>
* </ul>
*
* <p>Please visit our website: <a href="https://datasketches.apache.org">DataSketches Home Page</a> for more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import static org.testng.Assert.fail;

import org.apache.datasketches.SketchesArgumentException;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.testng.annotations.Test;

public class KllDirectCompactDoublesSketchTest {
private static final DefaultMemoryRequestServer memReqSvr = new DefaultMemoryRequestServer();

@Test
public void checkRODirectUpdatable() {
Expand All @@ -45,23 +47,23 @@ public void checkRODirectUpdatable() {
@Test
public void checkRODirectCompact() {
int k = 20;
KllDoublesSketch sk = KllDoublesSketch.newHeapInstance(k); //Heap
KllDoublesSketch sk = KllDoublesSketch.newHeapInstance(k);
for (int i = 1; i <= k + 1; i++) { sk.update(i); }
Memory srcMem = Memory.wrap(sk.toByteArray()); //case 1 compact readOnly no memReqSvr
Memory srcMem = Memory.wrap(sk.toByteArray());
KllDoublesSketch sk2 = KllDoublesSketch.wrap(srcMem);
println(sk2.toString(true, true));
assertEquals(sk2.getMinValue(), 1.0);
assertEquals(sk2.getMaxValue(), 21.0);
Memory srcMem2 = Memory.wrap(sk2.toByteArray());
KllDoublesSketch sk3 = KllDoublesSketch.writableWrap((WritableMemory)srcMem2, null); //case 1
KllDoublesSketch sk3 = KllDoublesSketch.writableWrap((WritableMemory)srcMem2, null);
assertEquals(sk3.getMinValue(), 1.0F);
assertEquals(sk3.getMaxValue(), 21.0F);
}

@Test
public void checkDirectCompactSingleItem() {
int k = 20;
KllDoublesSketch sk = KllDoublesSketch.newHeapInstance(k); //Heap
KllDoublesSketch sk = KllDoublesSketch.newHeapInstance(k);
sk.update(1);
KllDoublesSketch sk2 = KllDoublesSketch.wrap(Memory.wrap(sk.toByteArray()));
assertEquals(sk2.getDoubleSingleItem(), 1.0);
Expand Down Expand Up @@ -128,6 +130,21 @@ public void checkQuantile() {
println("Med2: " + med2);
}

@Test
public void checkCompactSingleItemMerge() {
int k = 20;
KllDoublesSketch sk1 = KllDoublesSketch.newHeapInstance(k);
sk1.update(1);
KllDoublesSketch sk2 = KllDoublesSketch.wrap(Memory.wrap(sk1.toByteArray()));
KllDoublesSketch sk3 = KllDoublesSketch.newHeapInstance(k);
sk3.merge(sk2);
assertEquals(sk3.getN(), 1);
WritableMemory wmem = WritableMemory.allocate(500);
KllDoublesSketch sk4 = KllDoublesSketch.newDirectInstance(k, wmem, memReqSvr);
sk4.merge(sk2);
assertEquals(sk4.getN(), 1);
}

@Test
public void printlnTest() {
println("PRINTING: " + this.getClass().getName());
Expand Down
Loading

0 comments on commit 231201f

Please sign in to comment.