Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ public static <T> KllItemsSketch<T> wrap(

//END of Constructors

@Override
public Class<T> getClassOfT() { return serDe.getClassOfT(); }

@Override
public double[] getCDF(final T[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
Expand All @@ -156,11 +153,29 @@ public double[] getCDF(final T[] splitPoints, final QuantileSearchCriteria searc
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEquallySized,
public Class<T> getClassOfT() { return serDe.getClassOfT(); }

@Override
public Comparator<? super T> getComparator() {
return comparator;
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundariesFromNumParts(
final int numEquallySizedParts,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
refreshSortedView();
return itemsSV.getPartitionBoundariesFromNumParts(numEquallySizedParts, searchCrit);
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundariesFromPartSize(
final long nominalPartSizeItems,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
refreshSortedView();
return itemsSV.getPartitionBoundaries(numEquallySized, searchCrit);
return itemsSV.getPartitionBoundariesFromPartSize(nominalPartSizeItems, searchCrit);
}

@Override
Expand Down Expand Up @@ -424,9 +439,8 @@ ItemsSketchSortedView<T> getSV() {
quantiles = (T[]) Array.newInstance(serDe.getClassOfT(), numQuantiles);
cumWeights = new long[numQuantiles];
populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
final double normRankErr = getNormalizedRankError(getK(), true);
return new ItemsSketchSortedView(
quantiles, cumWeights, getN(), comparator, getMaxItem(), getMinItem(), normRankErr);
final QuantilesGenericAPI<T> sk = KllItemsSketch.this;
return new ItemsSketchSortedView(quantiles, cumWeights, sk);
}

private void populateFromSketch(final Object[] srcQuantiles, final int[] srcLevels,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public List<PartitionBoundsRow<T>> partition(final S sk) {
this.numLevels = (int)max(1, ceil(log(guessNumParts) / log(maxPartsPerSk)));
final int partsPerSk = (int)round(pow(guessNumParts, 1.0 / numLevels));
this.partitionsPerSk = min(partsPerSk, maxPartsPerSk);
final GenericPartitionBoundaries<T> gpb = sk.getPartitionBoundaries(partitionsPerSk, criteria);
final GenericPartitionBoundaries<T> gpb = sk.getPartitionBoundariesFromNumParts(partitionsPerSk, criteria);
final StackElement<T> se = new StackElement<>(gpb, 0, "1");
stack.push(se);
partitionSearch(stack);
Expand All @@ -144,7 +144,7 @@ private void partitionSearch(final ArrayDeque<StackElement<T>> stack) {
if (++se.part <= numParts) {
final PartitionBoundsRow<T> row = new PartitionBoundsRow<>(se);
final S sk = fillReq.getRange(row.lowerBound, row.upperBound, row.rule);
final GenericPartitionBoundaries<T> gpb2 = sk.getPartitionBoundaries(this.partitionsPerSk, criteria);
final GenericPartitionBoundaries<T> gpb2 = sk.getPartitionBoundariesFromNumParts(this.partitionsPerSk, criteria);
final int level = stack.size() + 1;
final String partId = se.levelPartId + "." + se.part + "," + level;
final StackElement<T> se2 = new StackElement<>(gpb2, 0, partId);
Expand Down
33 changes: 21 additions & 12 deletions src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,21 @@ static <T> ItemsSketch<T> copy(final ItemsSketch<T> sketch) {

//END of Constructors

@Override
public Class<T> getClassOfT() { return clazz; }

@Override
public double[] getCDF(final T[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return classicQisSV.getCDF(splitPoints, searchCrit);
}

@Override
public Class<T> getClassOfT() { return clazz; }

@Override
public Comparator<? super T> getComparator() {
return comparator_;
}

@Override
public T getMaxItem() {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
Expand All @@ -277,11 +282,21 @@ public T getMinItem() {
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEquallySized,
public GenericPartitionBoundaries<T> getPartitionBoundariesFromNumParts(
final int numEquallySizedParts,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return classicQisSV.getPartitionBoundaries(numEquallySized, searchCrit);
return classicQisSV.getPartitionBoundariesFromNumParts(numEquallySizedParts, searchCrit);
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundariesFromPartSize(
final long nominalPartSizeItems,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return classicQisSV.getPartitionBoundariesFromPartSize(nominalPartSizeItems, searchCrit);
}

@Override
Expand Down Expand Up @@ -577,10 +592,6 @@ Object[] getCombinedBuffer() {
return combinedBuffer_;
}

Comparator<? super T> getComparator() {
return comparator_;
}

/**
* Loads the Combined Buffer, min and max from the given items array.
* The Combined Buffer is always in non-compact form and must be pre-allocated.
Expand Down Expand Up @@ -656,9 +667,7 @@ private static <T> ItemsSketchSortedView<T> getSV(final ItemsSketch<T> sk) {
throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
}

final double normRankErr = getNormalizedRankError(sk.getK(), true);
return new ItemsSketchSortedView<>(
svQuantiles, svCumWeights, sk.getN(), comparator, sk.getMaxItem(), sk.getMinItem(), normRankErr);
return new ItemsSketchSortedView<>(svQuantiles, svCumWeights, sk);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import org.apache.datasketches.common.SketchesStateException;

/**
* Implements PartitionBoundaries
* This defines the returned results of the getParitionBoundaries() function and
* includes the basic methods needed to construct actual partitions.
*/
final public class GenericPartitionBoundaries<T> implements PartitionBoundaries {
public final class GenericPartitionBoundaries<T> {
private long totalN; //totalN of source sketch
private T[] boundaries; //quantiles at the boundaries
private long[] natRanks; //natural ranks at the boundaries
Expand All @@ -36,7 +37,7 @@ final public class GenericPartitionBoundaries<T> implements PartitionBoundaries
private T minItem; //of the source sketch
private QuantileSearchCriteria searchCrit; //of the source sketch query to getPartitionBoundaries.
//computed
private long[] numDeltaItems; //num of items in each part
private long[] numDeltaItems; //num of items in each partition
private int numPartitions; //num of partitions

public GenericPartitionBoundaries(
Expand All @@ -48,15 +49,15 @@ public GenericPartitionBoundaries(
final T minItem,
final QuantileSearchCriteria searchCrit) {
this.totalN = totalN;
this.boundaries = boundaries; //SpotBugs EI_EXPOSE_REP2 copying from sketch class to this "friend" class.
this.boundaries = boundaries; //SpotBugs EI_EXPOSE_REP2 OK: copying from sketch class to this "friend" class.
this.natRanks = natRanks; // "
this.normRanks = normRanks; // "
this.maxItem = maxItem;
this.minItem = minItem;
this.searchCrit = searchCrit;
//check and compute
final int len = boundaries.length;
if (len < 2) { throw new SketchesStateException("Source sketch is empty"); }
if (len < 2) { throw new SketchesStateException("Source sketch is empty"); } //class is final, this is ok
numDeltaItems = new long[len];
numDeltaItems[0] = 0; // index 0 is always 0
for (int i = 1; i < len; i++) {
Expand All @@ -67,7 +68,10 @@ public GenericPartitionBoundaries(
this.numPartitions = len - 1;
}

@Override
/**
* Gets the length of the input stream offered to the underlying sketch.
* @return the length of the input stream offered to the underlying sketch.
*/
public long getN() { return totalN; }

/**
Expand Down Expand Up @@ -100,16 +104,32 @@ public GenericPartitionBoundaries(
*/
public T[] getBoundaries() { return boundaries.clone(); }

@Override
/**
* Gets an ordered array of natural ranks of the associated array of partition boundaries utilizing
* a specified search criterion. Natural ranks are integral values on the interval [1, N]
* @return an array of natural ranks.
*/
public long[] getNaturalRanks() { return natRanks.clone(); }

@Override
/**
* Gets an ordered array of normalized ranks of the associated array of partition boundaries utilizing
* a specified search criterion. Normalized ranks are double values on the interval [0.0, 1.0].
* @return an array of normalized ranks.
*/
public double[] getNormalizedRanks() { return normRanks.clone(); }

@Override
/**
* Gets the number of items to be included for each partition as an array.
* The count at index 0 is 0. The number of items included in the first partition, defined by the boundaries at
* index 0 and index 1, is at index 1 in this array, etc.
* @return the number of items to be included for each partition as an array.
*/
public long[] getNumDeltaItems() { return numDeltaItems.clone(); }

@Override
/**
* Gets the number of partitions
* @return the number of partitions
*/
public int getNumPartitions() { return numPartitions; }

/**
Expand All @@ -130,7 +150,10 @@ public GenericPartitionBoundaries(
*/
public T getMinItem() { return minItem; }

@Override
/**
* Gets the search criteria specified for the source sketch
* @return The search criteria specified for the source sketch
*/
public QuantileSearchCriteria getSearchCriteria() { return searchCrit; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* @author Alexander Saydakov
* @author Lee Rhodes
*/
public interface GenericSortedView<T> extends PartitioningFeature<T>, SortedView {
public interface GenericSortedView<T> extends PartitioningFeature<T>, SketchPartitionLimits, SortedView {

/**
* Returns an approximation to the Cumulative Distribution Function (CDF) of the input stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.datasketches.quantilescommon;

import static java.lang.Math.min;
import static org.apache.datasketches.quantilescommon.GenericInequalitySearch.find;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
Expand All @@ -38,43 +39,57 @@
* @author Lee Rhodes
*/
public class ItemsSketchSortedView<T> implements GenericSortedView<T> {
private static final double PARTITIONING_ERROR_FACTOR = 2.0;
private final T[] quantiles;
private final long[] cumWeights; //cumulative natural weights
private final long totalN;
private final Comparator<? super T> comparator;
private final T maxItem;
private final T minItem;
private final Class<T> clazz;
private final double normRankErr;//assumes PMF type error
private final double normRankError;
private final int numRetItems;

/**
* Construct from elements, also used in testing.
* @param quantiles sorted array of quantiles
* @param cumWeights sorted, monotonically increasing cumulative weights.
* @param totalN the total number of items presented to the sketch.
* @param comparator the Comparator for type T
* @param maxItem of type T
* @param minItem of type T
* @param normRankErr the normalized rank error of the originating sketch.
* @param sk the underlying quantile sketch.
*/
@SuppressWarnings("unchecked")
public ItemsSketchSortedView(
final T[] quantiles,
final long[] cumWeights, //or Natural Ranks
final QuantilesGenericAPI<T> sk) {
this.quantiles = quantiles;
this.cumWeights = cumWeights;
this.totalN = sk.getN();
this.comparator = sk.getComparator();
this.maxItem = sk.getMaxItem();
this.minItem = sk.getMinItem();
this.clazz = sk.getClassOfT();
this.normRankError = sk.getNormalizedRankError(true);
this.numRetItems = sk.getNumRetained();
}

//Used for testing
ItemsSketchSortedView(
final T[] quantiles,
final long[] cumWeights,
final long totalN,
final Comparator<? super T> comparator,
final T maxItem,
final T minItem,
final double normRankErr) {
final Class<T> clazz,
final double normRankError,
final int numRetItems) {
this.quantiles = quantiles;
this.cumWeights = cumWeights;
this.totalN = totalN;
this.comparator = comparator;
this.maxItem = maxItem;
this.minItem = minItem;
this.clazz = (Class<T>)quantiles[0].getClass();
this.normRankErr = normRankErr;
this.clazz = clazz;
this.normRankError = normRankError;
this.numRetItems = numRetItems;
}

//end of constructors
Expand Down Expand Up @@ -118,29 +133,38 @@ public int getNumRetained() {
}

@Override
@SuppressWarnings("unchecked")
public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEquallySized,
public int getMaxPartitions() {
return (int) min(1.0 / normRankError, numRetItems / 2.0);
}

@Override
public GenericPartitionBoundaries<T> getPartitionBoundariesFromPartSize(
final long nominalPartitionSize,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(QuantilesAPI.EMPTY_MSG); }
final long totalN = this.totalN;
final int maxParts = (int) (totalN / Math.ceil(normRankErr * PARTITIONING_ERROR_FACTOR) );
final int svLen = cumWeights.length;

if (numEquallySized > maxParts) {
final long minPartSizeItems = getMinPartitionSizeItems();
if (nominalPartitionSize < minPartSizeItems) {
throw new SketchesArgumentException(QuantilesAPI.UNSUPPORTED_MSG
+ "The requested number of partitions is too large for the 'k' of this sketch "
+ "if it exceeds the maximum number of partitions allowed by the error threshold for the 'k' of this sketch."
+ "Requested Partitions: " + numEquallySized + " > " + maxParts);
+ " The requested nominal partition size is too small for this sketch.");
}
if (numEquallySized > svLen / 2.0) {
final long totalN = this.totalN;
final int numEquallySizedParts = (int) min(totalN / minPartSizeItems, getMaxPartitions());
return getPartitionBoundariesFromNumParts(numEquallySizedParts);
}

@Override
@SuppressWarnings("unchecked")
public GenericPartitionBoundaries<T> getPartitionBoundariesFromNumParts(
final int numEquallySizedParts,
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(QuantilesAPI.EMPTY_MSG); }
final int maxParts = getMaxPartitions();
if (numEquallySizedParts > maxParts) {
throw new SketchesArgumentException(QuantilesAPI.UNSUPPORTED_MSG
+ "The requested number of partitions is too large for the number of retained items "
+ "if it exceeds maximum number of retained items divided by 2."
+ "Requested Partitions: " + numEquallySized + " > "
+ "Retained Items / 2: " + (svLen / 2));
+ " The requested number of partitions is too large for this sketch.");
}

final double[] searchNormRanks = evenlySpacedDoubles(0, 1.0, numEquallySized + 1);
final double[] searchNormRanks = evenlySpacedDoubles(0, 1.0, numEquallySizedParts + 1);
final int partArrLen = searchNormRanks.length;
final T[] partQuantiles = (T[]) Array.newInstance(clazz, partArrLen);
final long[] partNatRanks = new long[partArrLen];
Expand All @@ -150,6 +174,7 @@ public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEqually
// which are absolutely required when partitioning, especially inner partitions.

//Are the minItem and maxItem already in place?
final int svLen = cumWeights.length;
int adjLen = svLen; //this will be the length of the local copies of quantiles and cumWeights
final boolean adjLow = quantiles[0] != minItem; //if true, adjust the low end
final boolean adjHigh = quantiles[svLen - 1] != maxItem; //if true, adjust the high end
Expand Down
Loading