Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/main/java/com/yahoo/sketches/sampling/VarOptItemsUnion.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ public void update(final Memory mem, final ArrayOfItemsSerDe<T> serDe) {
}
}

/**
* Union a reservoir sketch. The reservoir sample is treated as if all items were added with a
* weight of 1.0.
*
* @param reservoirIn The reservoir sketch to be merged
*/
public void update(final ReservoirItemsSketch<T> reservoirIn) {
if (reservoirIn != null) {
mergeReservoirInto(reservoirIn);
}
}

/**
* Gets the varopt sketch resulting from the union of any input sketches.
*
Expand Down Expand Up @@ -390,6 +402,61 @@ private void mergeInto(final VarOptItemsSketch<T> sketch) {
}
}

/**
* Used to merge a reservoir sample into varopt, assuming the reservoir was built with items
* of weight 1.0. Logic is very similar to mergeInto() for a sketch with no heavy items.
* @param reservoir Reservoir sketch to merge into this union
*/
private void mergeReservoirInto(final ReservoirItemsSketch<T> reservoir) {
final long reservoirN = reservoir.getN();
if (reservoirN == 0) {
return;
}

n_ += reservoirN;

final int reservoirK = reservoir.getK();
if (reservoir.getN() <= reservoirK) {
// exact mode, so just insert and be done
for (T item : reservoir.getRawSamplesAsList()) {
gadget_.update(item, 1.0, false);
}
} else {
// sampling mode. We'll replicate a weight-correcting iterator
final double reservoirTau = reservoir.getImplicitSampleWeight();

double cumWeight = 0.0;
final ArrayList<T> samples = reservoir.getRawSamplesAsList();
for (int i = 0; i < reservoirK - 1; ++i) {
gadget_.update(samples.get(i), reservoirTau, true);
cumWeight += reservoirTau;
}
// correct for any numerical discrepancies with the last item
gadget_.update(samples.get(reservoirK - 1), reservoir.getN() - cumWeight, true);

// resolve tau
final double outerTau = getOuterTau();

if (outerTauDenom == 0) {
// detect first estimation mode sketch and grab its tau
outerTauNumer = reservoirN;
outerTauDenom = reservoirK;
} else if (reservoirTau > outerTau) {
// switch to a bigger value of outerTau
outerTauNumer = reservoirN;
outerTauDenom = reservoirK;
} else if (reservoirTau == outerTau) {
// Ok if previous equality test isn't quite perfect. Mistakes in either direction should
// be fairly benign.
// Without conceptually changing outerTau, update number and denominator. In particular,
// add the total weight of the incoming reservoir to the running total.
outerTauNumer += reservoirN;
outerTauDenom += reservoirK;
}
// do nothing if reservoir "tau" is no smaller than outerTau
}
}

/**
* When there are no marked items in H, teh gadget is mathematically equivalent to a valid
* varopt sketch. This method simply returns a copy (without perserving marks).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ public void checkEmptySketch() {
final Memory mem = Memory.wrap(sketchBytes);

// only minPreLongs bytes and should deserialize to empty
assert sketchBytes != null;
assertEquals(sketchBytes.length, Family.VAROPT.getMinPreLongs() << 3);
final ArrayOfStringsSerDe serDe = new ArrayOfStringsSerDe();
final VarOptItemsSketch<String> loadedVis = VarOptItemsSketch.heapify(mem, serDe);
Expand Down
174 changes: 167 additions & 7 deletions src/test/java/com/yahoo/sketches/sampling/VarOptItemsUnionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,145 @@ public void unionSmallSamplingSketch() {
assertEquals(result.getTotalWtR(), 96.0, EPS); // n1+n2 light items, ignore the heavy one
}

@Test
public void unionExactReservoirSketch() {
// build a varopt union which contains both heavy and light items, then copy it and
// compare unioning:
// 1. A varopt sketch of items with weight 1.0
// 2. A reservoir sample made of the same input items as above
// and we should find that the resulting unions are equivalent.

final int k = 20;
final long n = 2 * k;

final VarOptItemsSketch<Long> baseVis = VarOptItemsSketch.newInstance(k);
for (long i = 1; i <= n; ++i) {
baseVis.update(-i, i);
}
baseVis.update(-n - 1L, n * n);
baseVis.update(-n - 2L, n * n);
baseVis.update(-n - 3L, n * n);

final VarOptItemsUnion<Long> union1 = VarOptItemsUnion.newInstance(k);
union1.update(baseVis);

final ArrayOfLongsSerDe serDe = new ArrayOfLongsSerDe();
final Memory unionImg = Memory.wrap(union1.toByteArray(serDe));
final VarOptItemsUnion<Long> union2 = VarOptItemsUnion.heapify(unionImg, serDe);

compareUnionsExact(union1, union2); // sanity check

final VarOptItemsSketch<Long> vis = VarOptItemsSketch.newInstance(k);
final ReservoirItemsSketch<Long> ris = ReservoirItemsSketch.newInstance(k);

union2.update((ReservoirItemsSketch<Long>) null);
union2.update(ris); // empty

compareUnionsExact(union1, union2); // union2 should be unchanged

for (long i = 1; i < k - 1; ++i) {
ris.update(i);
vis.update(i, 1.0);
}

union1.update(vis);
union2.update(ris);

compareUnionsEquivalent(union1, union2);
}

@Test
public void unionSamplingReservoirSketch() {
// Like unionExactReservoirSketch, but merge in reservoir first, with reservoir in sampling mode
final int k = 20;
final long n = k * k;

final VarOptItemsUnion<Long> union1 = VarOptItemsUnion.newInstance(k);
final VarOptItemsUnion<Long> union2 = VarOptItemsUnion.newInstance(k);

compareUnionsExact(union1, union2); // sanity check

final VarOptItemsSketch<Long> vis = VarOptItemsSketch.newInstance(k);
final ReservoirItemsSketch<Long> ris = ReservoirItemsSketch.newInstance(k);

for (long i = 1; i < n; ++i) {
ris.update(i);
vis.update(i, 1.0);
}

union1.update(vis);
union2.update(ris);
compareUnionsEquivalent(union1, union2);

// repeat to trigger equal tau scenario
union1.update(vis);
union2.update(ris);
compareUnionsEquivalent(union1, union2);

// create and add a sketch with some heavy items
final VarOptItemsSketch<Long> newVis = VarOptItemsSketch.newInstance(k);
for (long i = 1; i <= n; ++i) {
newVis.update(-i, i);
}
newVis.update(-n - 1L, n * n);
newVis.update(-n - 2L, n * n);
newVis.update(-n - 3L, n * n);

union1.update(newVis);
union2.update(newVis);
compareUnionsEquivalent(union1, union2);
}

@Test
public void unionReservoirVariousTauValues() {
final int k = 20;
final long n = 2 * k;

final VarOptItemsSketch<Long> baseVis = VarOptItemsSketch.newInstance(k);
for (long i = 1; i <= n; ++i) {
baseVis.update(-i, 1.0);
}

final VarOptItemsUnion<Long> union1 = VarOptItemsUnion.newInstance(k);
union1.update(baseVis);

final ArrayOfLongsSerDe serDe = new ArrayOfLongsSerDe();
final Memory unionImg = Memory.wrap(union1.toByteArray(serDe));
final VarOptItemsUnion<Long> union2 = VarOptItemsUnion.heapify(unionImg, serDe);

compareUnionsExact(union1, union2); // sanity check

// reservoir tau will be greater than gadget's tau
VarOptItemsSketch<Long> vis = VarOptItemsSketch.newInstance(k);
ReservoirItemsSketch<Long> ris = ReservoirItemsSketch.newInstance(k);
for (long i = 1; i < 2 * n; ++i) {
ris.update(i);
vis.update(i, 1.0);
}

union1.update(vis);
union2.update(ris);
compareUnionsEquivalent(union1, union2);

// reservoir tau will be smaller than gadget's tau
vis = VarOptItemsSketch.newInstance(k);
ris = ReservoirItemsSketch.newInstance(k);
for (long i = 1; i <= k + 1; ++i) {
ris.update(i);
vis.update(i, 1.0);
}

union1.update(vis);
union2.update(ris);
compareUnionsEquivalent(union1, union2);
}

@Test
public void serializeEmptyUnion() {
final int k = 100;
final VarOptItemsUnion<String> union = VarOptItemsUnion.newInstance(k);
// null inputs to update() should leave the union empty
union.update(null);
union.update((VarOptItemsSketch<String>) null);
union.update(null, new ArrayOfStringsSerDe());

final ArrayOfStringsSerDe serDe = new ArrayOfStringsSerDe();
Expand Down Expand Up @@ -225,7 +358,7 @@ public void serializeExactUnion() {
final Memory mem = Memory.wrap(unionBytes);

final VarOptItemsUnion<Long> rebuilt = VarOptItemsUnion.heapify(mem, serDe);
compareUnions(rebuilt, union);
compareUnionsExact(rebuilt, union);

assertEquals(rebuilt.toString(), union.toString());
}
Expand All @@ -252,21 +385,48 @@ public void serializeSamplingUnion() {
final Memory mem = Memory.wrap(unionBytes);

final VarOptItemsUnion<Long> rebuilt = VarOptItemsUnion.heapify(mem, serDe);
compareUnions(rebuilt, union);
compareUnionsExact(rebuilt, union);

assertEquals(rebuilt.toString(), union.toString());
}

static <T> void compareUnions(final VarOptItemsUnion<T> u1,
final VarOptItemsUnion<T> u2) {
private static <T> void compareUnionsExact(final VarOptItemsUnion<T> u1,
final VarOptItemsUnion<T> u2) {
assertEquals(u1.getOuterTau(), u2.getOuterTau());

final VarOptItemsSamples<T> s1 = u1.getResult().getSketchSamples();
final VarOptItemsSamples<T> s2 = u2.getResult().getSketchSamples();
final VarOptItemsSketch<T> sketch1 = u1.getResult();
final VarOptItemsSketch<T> sketch2 = u2.getResult();
assertEquals(sketch1.getN(), sketch2.getN());
assertEquals(sketch1.getHRegionCount(), sketch2.getHRegionCount());
assertEquals(sketch1.getRRegionCount(), sketch2.getRRegionCount());

final VarOptItemsSamples<T> s1 = sketch1.getSketchSamples();
final VarOptItemsSamples<T> s2 = sketch2.getSketchSamples();

assertEquals(s1.getNumSamples(), s2.getNumSamples());
assertEquals(s1.weights(), s2.weights());
assertEquals(s1.items(), s2.items());
}

private static <T> void compareUnionsEquivalent(final VarOptItemsUnion<T> u1,
final VarOptItemsUnion<T> u2) {
assertEquals(u1.getOuterTau(), u2.getOuterTau());

final VarOptItemsSketch<T> sketch1 = u1.getResult();
final VarOptItemsSketch<T> sketch2 = u2.getResult();
assertEquals(sketch1.getN(), sketch2.getN());
assertEquals(sketch1.getHRegionCount(), sketch2.getHRegionCount());
assertEquals(sketch1.getRRegionCount(), sketch2.getRRegionCount());

final VarOptItemsSamples<T> s1 = sketch1.getSketchSamples();
final VarOptItemsSamples<T> s2 = sketch2.getSketchSamples();

assertEquals(s1.getNumSamples(), s2.getNumSamples());
assertEquals(s1.weights(), s2.weights());
// only compare exact items; others can differ as long as weights match
for (int i = 0; i < sketch1.getHRegionCount(); ++i) {
assertEquals(s1.items(i), s2.items(i));
}
}

/**
Expand Down