Skip to content
Permalink
Browse files
tuple api change, null quantiles and PMF from empty sketch
  • Loading branch information
AlexanderSaydakov committed Apr 3, 2018
1 parent 15b038d commit 83922ce894e0055f9e50264d981eba6166493187
Showing 17 changed files with 173 additions and 107 deletions.
@@ -145,7 +145,7 @@
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>0.10.3</version>
<version>0.11.0</version>
</dependency>

<!-- Pig -->
@@ -46,7 +46,9 @@ public Tuple exec(final Tuple input) throws IOException {
}
splitPoints[i - 1] = (double) input.get(i);
}
return Util.doubleArrayToTuple(sketch.getPMF(splitPoints));
final double[] pmf = sketch.getPMF(splitPoints);
if (pmf == null) { return null; }
return Util.doubleArrayToTuple(pmf);
}

}
@@ -49,7 +49,9 @@ public Tuple exec(final Tuple input) throws IOException {
}
splitPoints[i - 1] = (String) input.get(i);
}
return Util.doubleArrayToTuple(sketch.getPMF(splitPoints));
final double[] pmf = sketch.getPMF(splitPoints);
if (pmf == null) { return null; }
return Util.doubleArrayToTuple(pmf);
}

}
@@ -8,7 +8,9 @@
import org.apache.pig.Algebraic;

import com.yahoo.sketches.tuple.DoubleSummary;
import com.yahoo.sketches.tuple.DoubleSummaryDeserializer;
import com.yahoo.sketches.tuple.DoubleSummaryFactory;
import com.yahoo.sketches.tuple.DoubleSummarySetOperations;

/**
* This UDF creates a Sketch&lt;DoubleSummary&gt; from raw data.
@@ -86,7 +88,7 @@ public Initial(final String sketchSize, final String summaryMode) {}
* Default sketch size and default mode
*/
public IntermediateFinal() {
super(new DoubleSummaryFactory());
super(new DoubleSummaryFactory(), new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
@@ -95,7 +97,8 @@ public IntermediateFinal() {
* @param sketchSize String representation of sketch size
*/
public IntermediateFinal(final String sketchSize) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory());
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(), new DoubleSummarySetOperations(),
new DoubleSummaryDeserializer());
}

/**
@@ -105,7 +108,9 @@ public IntermediateFinal(final String sketchSize) {
* @param summaryMode String representation of mode (sum, min or max)
*/
public IntermediateFinal(final String sketchSize, final String summaryMode) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode)));
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode)),
new DoubleSummarySetOperations(DoubleSummary.Mode.valueOf(summaryMode)),
new DoubleSummaryDeserializer());
}

}
@@ -16,7 +16,9 @@
import org.apache.pig.data.Tuple;

import com.yahoo.sketches.tuple.Sketch;
import com.yahoo.sketches.tuple.SummaryDeserializer;
import com.yahoo.sketches.tuple.SummaryFactory;
import com.yahoo.sketches.tuple.SummarySetOperations;
import com.yahoo.sketches.tuple.Union;
import com.yahoo.sketches.tuple.UpdatableSketch;
import com.yahoo.sketches.tuple.UpdatableSketchBuilder;
@@ -34,17 +36,21 @@
public abstract class DataToSketchAlgebraicIntermediateFinal<U, S extends UpdatableSummary<U>>
extends EvalFunc<Tuple> {
private final int sketchSize_;
private final SummaryFactory<S> summaryFactory_;
private final SummarySetOperations<S> summarySetOps_;
private final SummaryDeserializer<S> summaryDeserializer_;
private final UpdatableSketchBuilder<U, S> sketchBuilder_;
private boolean isFirstCall_ = true;

/**
* Constructs a function given a summary factory, default sketch size and default
* sampling probability of 1.
* @param summaryFactory an instance of SummaryFactory
* @param summarySetOps an instance of SummarySetOperaions
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public DataToSketchAlgebraicIntermediateFinal(final SummaryFactory<S> summaryFactory) {
this(DEFAULT_NOMINAL_ENTRIES, 1f, summaryFactory);
public DataToSketchAlgebraicIntermediateFinal(final SummaryFactory<S> summaryFactory,
final SummarySetOperations<S> summarySetOps, final SummaryDeserializer<S> summaryDeserializer) {
this(DEFAULT_NOMINAL_ENTRIES, 1f, summaryFactory, summarySetOps, summaryDeserializer);
}

/**
@@ -54,10 +60,13 @@ public DataToSketchAlgebraicIntermediateFinal(final SummaryFactory<S> summaryFac
* It represents nominal number of entries in the sketch. Forced to the nearest power of 2
* greater than given value.
* @param summaryFactory an instance of SummaryFactory
* @param summarySetOps an instance of SummarySetOperaions
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public DataToSketchAlgebraicIntermediateFinal(final int sketchSize,
final SummaryFactory<S> summaryFactory) {
this(sketchSize, 1f, summaryFactory);
final SummaryFactory<S> summaryFactory, final SummarySetOperations<S> summarySetOps,
final SummaryDeserializer<S> summaryDeserializer) {
this(sketchSize, 1f, summaryFactory, summarySetOps, summaryDeserializer);
}

/**
@@ -67,11 +76,15 @@ public DataToSketchAlgebraicIntermediateFinal(final int sketchSize,
* greater than given value.
* @param samplingProbability parameter from 0 to 1 inclusive
* @param summaryFactory an instance of SummaryFactory
* @param summarySetOps an instance of SummarySetOperaions
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public DataToSketchAlgebraicIntermediateFinal(final int sketchSize,
final float samplingProbability, final SummaryFactory<S> summaryFactory) {
public DataToSketchAlgebraicIntermediateFinal(final int sketchSize, final float samplingProbability,
final SummaryFactory<S> summaryFactory, final SummarySetOperations<S> summarySetOps,
final SummaryDeserializer<S> summaryDeserializer) {
sketchSize_ = sketchSize;
summaryFactory_ = summaryFactory;
summarySetOps_ = summarySetOps;
summaryDeserializer_ = summaryDeserializer;
sketchBuilder_ = new UpdatableSketchBuilder<U, S>(summaryFactory)
.setNominalEntries(sketchSize).setSamplingProbability(samplingProbability);
}
@@ -83,7 +96,7 @@ public Tuple exec(final Tuple inputTuple) throws IOException {
Logger.getLogger(getClass()).info("algebraic is used");
isFirstCall_ = false;
}
final Union<S> union = new Union<S>(sketchSize_, summaryFactory_);
final Union<S> union = new Union<S>(sketchSize_, summarySetOps_);

final DataBag bag = (DataBag) inputTuple.get(0);
if (bag == null) {
@@ -102,7 +115,7 @@ public Tuple exec(final Tuple inputTuple) throws IOException {
// This is a sketch from a prior call to the
// Intermediate function. merge it with the
// current sketch.
final Sketch<S> incomingSketch = Util.deserializeSketchFromTuple(dataTuple);
final Sketch<S> incomingSketch = Util.deserializeSketchFromTuple(dataTuple, summaryDeserializer_);
union.update(incomingSketch);
} else {
// we should never get here.
@@ -14,9 +14,11 @@

import com.yahoo.memory.Memory;
import com.yahoo.sketches.tuple.DoubleSummary;
import com.yahoo.sketches.tuple.DoubleSummaryDeserializer;
import com.yahoo.sketches.tuple.Sketch;
import com.yahoo.sketches.tuple.SketchIterator;
import com.yahoo.sketches.tuple.Sketches;
import com.yahoo.sketches.tuple.SummaryDeserializer;

/**
* This UDF converts a Sketch&lt;DoubleSummary&gt; to estimates.
@@ -29,14 +31,18 @@
*/
public class DoubleSummarySketchToEstimates extends EvalFunc<Tuple> {

private static final SummaryDeserializer<DoubleSummary> SUMMARY_DESERIALIZER =
new DoubleSummaryDeserializer();

@Override
public Tuple exec(final Tuple input) throws IOException {
if ((input == null) || (input.size() == 0)) {
return null;
}

final DataByteArray dba = (DataByteArray) input.get(0);
final Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(dba.get()));
final Sketch<DoubleSummary> sketch = Sketches.heapifySketch(
Memory.wrap(dba.get()), SUMMARY_DESERIALIZER);

final Tuple output = TupleFactory.getInstance().newTuple(2);
output.set(0, sketch.getEstimate());
@@ -15,9 +15,11 @@
import com.yahoo.sketches.quantiles.DoublesSketch;
import com.yahoo.sketches.quantiles.UpdateDoublesSketch;
import com.yahoo.sketches.tuple.DoubleSummary;
import com.yahoo.sketches.tuple.DoubleSummaryDeserializer;
import com.yahoo.sketches.tuple.Sketch;
import com.yahoo.sketches.tuple.SketchIterator;
import com.yahoo.sketches.tuple.Sketches;
import com.yahoo.sketches.tuple.SummaryDeserializer;

/**
* This UDF is to get a percentile value from a Sketch&lt;DoubleSummary&gt;.
@@ -29,6 +31,8 @@
*/
public class DoubleSummarySketchToPercentile extends EvalFunc<Double> {

private static final SummaryDeserializer<DoubleSummary> SUMMARY_DESERIALIZER =
new DoubleSummaryDeserializer();
private static final int QUANTILES_SKETCH_SIZE = 1024;

@Override
@@ -38,7 +42,8 @@ public Double exec(final Tuple input) throws IOException {
}

final DataByteArray dba = (DataByteArray) input.get(0);
final Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(dba.get()));
final Sketch<DoubleSummary> sketch = Sketches.heapifySketch(
Memory.wrap(dba.get()), SUMMARY_DESERIALIZER);

final double percentile = (double) input.get(1);
if ((percentile < 0) || (percentile > 100)) {
@@ -8,7 +8,10 @@
import org.apache.pig.Algebraic;

import com.yahoo.sketches.tuple.DoubleSummary;
import com.yahoo.sketches.tuple.DoubleSummaryDeserializer;
import com.yahoo.sketches.tuple.DoubleSummaryFactory;
import com.yahoo.sketches.tuple.DoubleSummarySetOperations;
import com.yahoo.sketches.tuple.SummarySetOperations;

/**
* This is to union Sketch&lt;DoubleSummary&gt;.
@@ -20,15 +23,15 @@ public class UnionDoubleSummarySketch extends UnionSketch<DoubleSummary> impleme
* Constructor with default sketch size and default mode (sum)
*/
public UnionDoubleSummarySketch() {
super(new DoubleSummaryFactory());
super(new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
* Constructor with default mode (sum)
* @param sketchSize String representation of sketch size
*/
public UnionDoubleSummarySketch(final String sketchSize) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory());
super(Integer.parseInt(sketchSize), new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
@@ -37,7 +40,9 @@ public UnionDoubleSummarySketch(final String sketchSize) {
* @param summaryMode String representation of mode (sum, min or max)
*/
public UnionDoubleSummarySketch(final String sketchSize, final String summaryMode) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode)));
super(Integer.parseInt(sketchSize),
new DoubleSummarySetOperations(DoubleSummary.Mode.valueOf(summaryMode)),
new DoubleSummaryDeserializer());
}

@Override
@@ -84,7 +89,7 @@ public static class IntermediateFinal extends UnionSketchAlgebraicIntermediateFi
* Default sketch size and default mode.
*/
public IntermediateFinal() {
super(new DoubleSummaryFactory());
super(new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
@@ -93,7 +98,7 @@ public IntermediateFinal() {
* @param sketchSize String representation of sketch size
*/
public IntermediateFinal(final String sketchSize) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory());
super(Integer.parseInt(sketchSize), new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
@@ -103,7 +108,9 @@ public IntermediateFinal(final String sketchSize) {
* @param summaryMode String representation of mode (sum, min or max)
*/
public IntermediateFinal(final String sketchSize, final String summaryMode) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode)));
super(Integer.parseInt(sketchSize),
new DoubleSummarySetOperations(DoubleSummary.Mode.valueOf(summaryMode)),
new DoubleSummaryDeserializer());
}
}

@@ -20,7 +20,8 @@
import com.yahoo.sketches.tuple.Sketch;
import com.yahoo.sketches.tuple.Sketches;
import com.yahoo.sketches.tuple.Summary;
import com.yahoo.sketches.tuple.SummaryFactory;
import com.yahoo.sketches.tuple.SummaryDeserializer;
import com.yahoo.sketches.tuple.SummarySetOperations;
import com.yahoo.sketches.tuple.Union;

/**
@@ -29,16 +30,19 @@
*/
public abstract class UnionSketch<S extends Summary> extends EvalFunc<Tuple> implements Accumulator<Tuple> {
private final int sketchSize_;
private final SummaryFactory<S> summaryFactory_;
private final SummarySetOperations<S> summarySetOps_;
private final SummaryDeserializer<S> summaryDeserializer_;
private Union<S> union_;
private boolean isFirstCall_ = true;

/**
* Constructs a function given a summary factory and default sketch size
* @param summaryFactory an instance of SummaryFactory
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public UnionSketch(final SummaryFactory<S> summaryFactory) {
this(DEFAULT_NOMINAL_ENTRIES, summaryFactory);
public UnionSketch(final SummarySetOperations<S> summarySetOps,
final SummaryDeserializer<S> summaryDeserializer) {
this(DEFAULT_NOMINAL_ENTRIES, summarySetOps, summaryDeserializer);
}

/**
@@ -47,11 +51,14 @@ public UnionSketch(final SummaryFactory<S> summaryFactory) {
* It represents nominal number of entries in the sketch. Forced to the nearest power of 2
* greater than given value.
* @param summaryFactory an instance of SummaryFactory
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public UnionSketch(final int sketchSize, final SummaryFactory<S> summaryFactory) {
public UnionSketch(final int sketchSize, final SummarySetOperations<S> summarySetOps,
final SummaryDeserializer<S> summaryDeserializer) {
super();
this.sketchSize_ = sketchSize;
this.summaryFactory_ = summaryFactory;
sketchSize_ = sketchSize;
summarySetOps_ = summarySetOps;
summaryDeserializer_ = summaryDeserializer;
}

@Override
@@ -65,8 +72,8 @@ public Tuple exec(final Tuple inputTuple) throws IOException {
return null;
}
final DataBag bag = (DataBag) inputTuple.get(0);
final Union<S> union = new Union<S>(sketchSize_, summaryFactory_);
updateUnion(bag, union);
final Union<S> union = new Union<S>(sketchSize_, summarySetOps_);
updateUnion(bag, union, summaryDeserializer_);
return Util.tupleFactory.newTuple(new DataByteArray(union.getResult().toByteArray()));
}

@@ -83,9 +90,9 @@ public void accumulate(final Tuple inputTuple) throws IOException {
final DataBag bag = (DataBag) inputTuple.get(0);
if (bag == null || bag.size() == 0) { return; }
if (union_ == null) {
union_ = new Union<S>(sketchSize_, summaryFactory_);
union_ = new Union<S>(sketchSize_, summarySetOps_);
}
updateUnion(bag, union_);
updateUnion(bag, union_, summaryDeserializer_);
}

@Override
@@ -101,13 +108,13 @@ public void cleanup() {
if (union_ != null) { union_.reset(); }
}

private static <S extends Summary> void updateUnion(final DataBag bag, final Union<S> union)
throws ExecException {
private static <S extends Summary> void updateUnion(final DataBag bag, final Union<S> union,
final SummaryDeserializer<S> summaryDeserializer) throws ExecException {
for (final Tuple innerTuple: bag) {
if ((innerTuple.size() != 1) || (innerTuple.get(0) == null)) {
continue;
}
final Sketch<S> incomingSketch = Util.deserializeSketchFromTuple(innerTuple);
final Sketch<S> incomingSketch = Util.deserializeSketchFromTuple(innerTuple, summaryDeserializer);
union.update(incomingSketch);
}
}

0 comments on commit 83922ce

Please sign in to comment.