Skip to content
Permalink
Browse files
Fixed some tests. Other minor fixes.
  • Loading branch information
leerho committed Apr 7, 2022
1 parent f3d8029 commit 1d79763484cc82d6698303ecea2543cb1307ce4a
Showing 11 changed files with 101 additions and 42 deletions.
@@ -62,14 +62,14 @@ final class KllDirectDoublesSketch extends KllDoublesSketch {
WritableMemory itemsArrUpdatable;

/**
* The actual constructor.
* The constructor with Memory that can be off-heap.
* @param wmem the current WritableMemory
* @param memReqSvr the given MemoryRequestServer to request a larger WritableMemory
* @param memVal the MemoryValadate object
*/
KllDirectDoublesSketch(final WritableMemory wmem, final MemoryRequestServer memReqSvr,
final KllMemoryValidate memVal) {
super(SketchType.DOUBLES_SKETCH, wmem, memReqSvr);
super(wmem, memReqSvr);
updatableMemory = memVal.updatableMemory && memReqSvr != null;
levelsArrUpdatable = memVal.levelsArrUpdatable;
minMaxArrUpdatable = memVal.minMaxArrUpdatable;
@@ -62,14 +62,14 @@ final class KllDirectFloatsSketch extends KllFloatsSketch {
WritableMemory itemsArrUpdatable;

/**
* The actual constructor
* The constructor with Memory that can be off-heap.
* @param wmem the current WritableMemory
* @param memReqSvr the given MemoryRequestServer to request a larger WritableMemory
* @param memVal the MemoryValadate object
*/
KllDirectFloatsSketch(final WritableMemory wmem, final MemoryRequestServer memReqSvr,
final KllMemoryValidate memVal) {
super(SketchType.FLOATS_SKETCH, wmem, memReqSvr);
super(wmem, memReqSvr);
updatableMemory = memVal.updatableMemory && memReqSvr != null;
levelsArrUpdatable = memVal.levelsArrUpdatable;
minMaxArrUpdatable = memVal.minMaxArrUpdatable;
@@ -31,8 +31,8 @@

public abstract class KllDoublesSketch extends KllSketch {

KllDoublesSketch(final SketchType sketchType, final WritableMemory wmem, final MemoryRequestServer memReqSvr) {
super(sketchType, wmem, memReqSvr);
KllDoublesSketch(final WritableMemory wmem, final MemoryRequestServer memReqSvr) {
super(SketchType.DOUBLES_SKETCH, wmem, memReqSvr);
}

/**
@@ -110,7 +110,11 @@ public static KllDoublesSketch writableWrap(
final WritableMemory srcMem,
final MemoryRequestServer memReqSvr) {
final KllMemoryValidate memVal = new KllMemoryValidate(srcMem);
return new KllDirectDoublesSketch(srcMem, memReqSvr, memVal);
if (memVal.updatableMemory) {
return new KllDirectDoublesSketch(srcMem, memReqSvr, memVal);
} else {
return heapify(srcMem);
}
}

/**
@@ -129,18 +129,19 @@ static float[] getFloatsQuantiles(final KllSketch mine, final double[] fractions
static void mergeFloatImpl(final KllSketch mine, final KllSketch other) {
if (other.isEmpty()) { return; }
final long finalN = mine.getN() + other.getN();
//update this sketch with level0 items from the other sketch
final float[] otherFloatItemsArr = other.getFloatItemsArray();
final int otherNumLevels = other.getNumLevels();
final int[] otherLevelsArr = other.getLevelsArray();
for (int i = otherLevelsArr[0]; i < otherLevelsArr[1]; i++) {
KllFloatsHelper.updateFloat(mine, otherFloatItemsArr[i]);
}
// after the level 0 update, we capture the key mutable variables
//capture my min & max, minK
final float myMin = mine.getMinFloatValue();
final float myMax = mine.getMaxFloatValue();
final int myMinK = mine.getMinK();

//update this sketch with level0 items from the other sketch
for (int i = otherLevelsArr[0]; i < otherLevelsArr[1]; i++) {
KllFloatsHelper.updateFloat(mine, otherFloatItemsArr[i]);
}
// after the level 0 update, we capture the state of levels and items arrays
final int myCurNumLevels = mine.getNumLevels();
final int[] myCurLevelsArr = mine.getLevelsArray();
final float[] myCurFloatItemsArr = mine.getFloatItemsArray();
@@ -31,8 +31,8 @@

public abstract class KllFloatsSketch extends KllSketch {

KllFloatsSketch(final SketchType sketchType, final WritableMemory wmem, final MemoryRequestServer memReqSvr) {
super(sketchType, wmem, memReqSvr);
KllFloatsSketch(final WritableMemory wmem, final MemoryRequestServer memReqSvr) {
super(SketchType.FLOATS_SKETCH, wmem, memReqSvr);
}

/**
@@ -110,7 +110,11 @@ public static KllFloatsSketch writableWrap(
final WritableMemory srcMem,
final MemoryRequestServer memReqSvr) {
final KllMemoryValidate memVal = new KllMemoryValidate(srcMem);
return new KllDirectFloatsSketch(srcMem, memReqSvr, memVal);
if (memVal.updatableMemory) {
return new KllDirectFloatsSketch(srcMem, memReqSvr, memVal);
} else {
return heapify(srcMem);
}
}

/**
@@ -54,7 +54,7 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
* experimental as they have not been as well characterized.
*/
KllHeapDoublesSketch(final int k, final int m) {
super(SketchType.DOUBLES_SKETCH, null, null);
super(null, null);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.k = k;
@@ -75,7 +75,7 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
* @param memVal the MemoryCheck object
*/
KllHeapDoublesSketch(final Memory mem, final KllMemoryValidate memVal) {
super(SketchType.DOUBLES_SKETCH,null, null );
super(null, null );
k = memVal.k;
m = memVal.m;
KllHelper.buildHeapKllSketchFromMemory(this, memVal);
@@ -54,7 +54,7 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
* experimental as they have not been as well characterized.
*/
KllHeapFloatsSketch(final int k, final int m) {
super(SketchType.FLOATS_SKETCH, null, null);
super(null, null);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.k = k;
@@ -75,7 +75,7 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
* @param memVal the MemoryCheck object
*/
KllHeapFloatsSketch(final Memory mem, final KllMemoryValidate memVal) {
super(SketchType.FLOATS_SKETCH, null, null);
super(null, null);
k = memVal.k;
m = memVal.m;
KllHelper.buildHeapKllSketchFromMemory(this, memVal);
@@ -63,7 +63,7 @@
*
* @author Lee Rhodes, Kevin Lang
*/
public abstract class KllSketch {
abstract class KllSketch {

public enum SketchType { FLOATS_SKETCH, DOUBLES_SKETCH }

@@ -167,7 +167,7 @@ public static int getKFromEpsilon(final double epsilon, final boolean pmf) {
* @param k parameter that controls size of the sketch and accuracy of estimates
* @param n stream length
* @return upper bound on the compact serialized size
* @deprecated use {@link #getMaxSerializedSizeBytes(int, long, SketchType, boolean)} instead.
* @deprecated use getMaxSerializedSizeBytes(int, long, SketchType, boolean) instead.
* Version 3.2.0
*/
@Deprecated
@@ -292,6 +292,23 @@ public WritableMemory getWritableMemory() {
return wmem;
}

/**
* Returns true if this sketch's data structure is backed by WritableMemory.
* @return true if this sketch's data structure is backed by WritableMemory.
*/
public boolean hasMemory() {
return wmem != null;
}

/**
* Returns true if the backing resource is direct (off-heap) memory.
* This is the case for allocated direct memory, memory mapped files.
* @return true if the backing resource is direct (off-heap) memory.
*/
public boolean isDirect() {
return wmem.isDirect();
}

/**
* Returns true if this sketch is empty.
* @return empty flag
@@ -308,6 +325,10 @@ public final boolean isEstimationMode() {
return getNumLevels() > 1;
}

/**
* Returns true if the backing WritableMemory is in updatable format.
* @return true if the backing WritableMemory is in updatable format.
*/
public final boolean isUpdatableMemory() {
return updatablMemory;
}
@@ -28,11 +28,9 @@

import org.apache.datasketches.Family;
import org.apache.datasketches.SketchesArgumentException;
import org.apache.datasketches.kll.KllSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;


/**
* This is a stochastic streaming sketch that enables near-real time analysis of the
* approximate distribution of real values from a very large stream in a single pass.
@@ -444,7 +442,6 @@ public double getNormalizedRankError(final boolean pmf) {
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
* @return if pmf is true, the normalized rank error for the getPMF() function.
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
* @see KllSketch
*/
public static double getNormalizedRankError(final int k, final boolean pmf) {
return Util.getNormalizedRankError(k, pmf);
@@ -458,7 +455,6 @@ public static double getNormalizedRankError(final int k, final boolean pmf) {
* returns the value of <em>k</em> assuming the input epsilon is the desired "single-sided"
* epsilon for all the other queries.
* @return the value of <i>k</i> given a value of epsilon.
* @see KllSketch
*/
public static int getKFromEpsilon(final double epsilon, final boolean pmf) {
return Util.getKFromEpsilon(epsilon, pmf);
@@ -607,7 +603,6 @@ public static int getCompactStorageBytes(final int k, final long n) {
return metaPreLongs + Util.computeRetainedItems(k, n) << 3;
}


/**
* Returns the number of bytes this sketch would require to store in native form: compact for
* a CompactDoublesSketch, non-compact for an UpdateDoublesSketch.
@@ -31,7 +31,6 @@
import org.apache.datasketches.memory.WritableMemory;
import org.testng.annotations.Test;

@SuppressWarnings("javadoc")
public class KllDirectDoublesSketchTest {

private static final double PMF_EPS_FOR_K_8 = 0.35; // PMF rank error (epsilon) for k=8
@@ -377,9 +376,7 @@ public void serializeDeserializeOneItemViaUpdatableWritableWrap() {
public void serializeDeserializeFullViaCompactHeapify() {
final KllDoublesSketch sketch1 = getDDSketch(200, 0);
final int n = 1000;
for (int i = 0; i < n; i++) {
sketch1.update(i);
}
for (int i = 0; i < n; i++) { sketch1.update(i); }
final byte[] bytes = sketch1.toByteArray();
final KllDoublesSketch sketch2 = KllDoublesSketch.heapify(Memory.wrap(bytes));
assertEquals(bytes.length, sketch1.getCurrentCompactSerializedSizeBytes());
@@ -599,6 +596,8 @@ public void checkMergeKllDoublesSketch() {
KllDoublesSketch sk2 = KllDoublesSketch.newHeapInstance(20);
for (int i = 1; i <= 21; i++ ) { sk2.update(i + 100); }
sk.merge(sk2);
assertEquals(sk.getMinValue(), 1.0);
assertEquals(sk.getMaxValue(), 121.0);
}

@Test
@@ -609,15 +608,18 @@ public void checkReverseMergeKllDoubleSketch() {
KllDoublesSketch sk2 = KllDoublesSketch.newHeapInstance(20);
for (int i = 1; i <= 21; i++ ) { sk2.update(i + 100); }
sk2.merge(sk);
assertEquals(sk2.getMinValue(), 1.0);
assertEquals(sk2.getMaxValue(), 121.0);
}

// @Test
// public void checkWrapKllDoubleSketch() {
// KllDoublesSketch sk = new KllDoublesSketch(20);
// for (int i = 1; i <= 21; i++ ) { sk.update(i); }
// Memory srcMem = Memory.wrap(sk.toByteArray());
// KllDoublesSketch sk2 = KllDoublesSketch.writableWrap(srcMem, memReqSvr);
// }
@Test
public void checkWrapCompactForm() {
KllDoublesSketch sk = KllDoublesSketch.newHeapInstance(20);
for (int i = 1; i <= 21; i++ ) { sk.update(i); }
WritableMemory srcMem = WritableMemory.writableWrap(sk.toByteArray()); //note: Not updatable
KllDoublesSketch sk2 = KllDoublesSketch.writableWrap(srcMem, memReqSvr);
println(sk2.toString(true, true));
}

private static KllDoublesSketch getDDSketch(final int k, final int n) {
KllDoublesSketch sk = KllDoublesSketch.newHeapInstance(k);
@@ -266,15 +266,14 @@ public void mergeMinMinValueFromOther() {

@Test
public void mergeMinAndMaxFromOther() {
final KllFloatsSketch sketch1 = getDFSketch(8, 0); //was 200
final KllFloatsSketch sketch2 = getDFSketch(8, 0); //was 200
for (int i = 1; i <= 9; i++) { //was 1_000_000
final KllFloatsSketch sketch1 = getDFSketch(200, 0);
final KllFloatsSketch sketch2 = getDFSketch(200, 0);
for (int i = 1; i <= 1_000_000; i++) {
sketch1.update(i);
}
//System.out.println(sketch1.toString(true, true));
sketch2.merge(sketch1);
assertEquals(sketch2.getMinValue(), 1F);
assertEquals(sketch2.getMaxValue(), 9F); //was 1_000_000
assertEquals(sketch2.getMaxValue(), 1_000_000F);
}

@SuppressWarnings("unused")
@@ -589,6 +588,39 @@ public void checkHeapify() {
assertEquals(sk2.getMaxValue(), 100.0);
}

@Test
public void checkMergeKllFloatsSketch() {
WritableMemory dstMem = WritableMemory.allocate(6000);
KllFloatsSketch sk = KllFloatsSketch.newDirectInstance(20, dstMem, memReqSvr);
for (int i = 1; i <= 21; i++) { sk.update(i); }
KllFloatsSketch sk2 = KllFloatsSketch.newHeapInstance(20);
for (int i = 1; i <= 21; i++ ) { sk2.update(i + 100); }
sk.merge(sk2);
assertEquals(sk.getMinValue(), 1.0);
assertEquals(sk.getMaxValue(), 121.0);
}

@Test
public void checkReverseMergeKllFloatsSketch() {
WritableMemory dstMem = WritableMemory.allocate(6000);
KllFloatsSketch sk = KllFloatsSketch.newDirectInstance(20, dstMem, memReqSvr);
for (int i = 1; i <= 21; i++) { sk.update(i); }
KllFloatsSketch sk2 = KllFloatsSketch.newHeapInstance(20);
for (int i = 1; i <= 21; i++ ) { sk2.update(i + 100); }
sk2.merge(sk);
assertEquals(sk2.getMinValue(), 1.0);
assertEquals(sk2.getMaxValue(), 121.0);
}

@Test
public void checkWrapCompactForm() {
KllFloatsSketch sk = KllFloatsSketch.newHeapInstance(20);
for (int i = 1; i <= 21; i++ ) { sk.update(i); }
WritableMemory srcMem = WritableMemory.writableWrap(sk.toByteArray()); //note: Not updatable
KllFloatsSketch sk2 = KllFloatsSketch.writableWrap(srcMem, memReqSvr);
println(sk2.toString(true, true));
}

private static KllFloatsSketch getDFSketch(final int k, final int n) {
KllFloatsSketch sk = KllFloatsSketch.newHeapInstance(k);
for (int i = 1; i <= n; i++) { sk.update(i); }

0 comments on commit 1d79763

Please sign in to comment.