Skip to content
Permalink
Browse files
support weight index in more places, add some unit tests
  • Loading branch information
jmalkin committed Jun 22, 2017
1 parent bd3d7b7 commit 9cde0683479bbb03d37c77b730416a9735e3016f
Show file tree
Hide file tree
Showing 4 changed files with 561 additions and 4 deletions.
@@ -88,7 +88,7 @@ public void accumulate(final Tuple inputTuple) throws IOException {

for (Tuple t : samples) {
// first element is weight
final double weight = (double) t.get(0);
final double weight = (double) t.get(weightIdx_);
sketch_.update(t, weight);
}
}
@@ -32,7 +32,8 @@ class VarOptCommonImpl {
private static final ArrayOfTuplesSerDe SERDE = new ArrayOfTuplesSerDe();

// Produces a sketch from a bag of input Tuples
static VarOptItemsSketch<Tuple> rawTuplesToSketch(final Tuple inputTuple, final int k)
static VarOptItemsSketch<Tuple> rawTuplesToSketch(final Tuple inputTuple,
final int k, final int weightIdx)
throws IOException {
assert inputTuple != null;
assert inputTuple.size() >= 1;
@@ -43,7 +44,7 @@ static VarOptItemsSketch<Tuple> rawTuplesToSketch(final Tuple inputTuple, final

for (Tuple t : samples) {
// first element is weight
final double weight = (double) t.get(0);
final double weight = (double) t.get(weightIdx);
sketch.update(t, weight);
}

@@ -149,7 +150,7 @@ public Tuple exec(final Tuple inputTuple) throws IOException {
return null;
}

final VarOptItemsSketch<Tuple> sketch = rawTuplesToSketch(inputTuple, targetK_);
final VarOptItemsSketch<Tuple> sketch = rawTuplesToSketch(inputTuple, targetK_, weightIdx_);
return wrapSketchInTuple(sketch);
}
}
@@ -0,0 +1,308 @@
package com.yahoo.sketches.pig.sampling;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.io.IOException;
import java.util.HashMap;

import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.testng.annotations.Test;

import com.yahoo.memory.Memory;
import com.yahoo.sketches.sampling.VarOptItemsSamples;
import com.yahoo.sketches.sampling.VarOptItemsSketch;
import com.yahoo.sketches.sampling.VarOptItemsUnion;

public class VarOptCommonAlgebraicTest {
private static final ArrayOfTuplesSerDe serDe_ = new ArrayOfTuplesSerDe();

// constructors: just make sure result not null with valid args, throw exceptions if invalid
@Test
public void rawTuplesToSketchConstructors() {
VarOptCommonImpl.RawTuplesToSketchTuple udf;

udf = new VarOptCommonImpl.RawTuplesToSketchTuple();
assertNotNull(udf);

udf = new VarOptCommonImpl.RawTuplesToSketchTuple("5");
assertNotNull(udf);

udf = new VarOptCommonImpl.RawTuplesToSketchTuple("5", "3");
assertNotNull(udf);

try {
new VarOptCommonImpl.RawTuplesToSketchTuple("-1");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.RawTuplesToSketchTuple("-1", "3");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.RawTuplesToSketchTuple("10", "-1");
} catch (final IllegalArgumentException e) {
// expected
}
}

@Test
public void unionSketchesAsSketchConstructors() {
VarOptCommonImpl.UnionSketchesAsTuple udf;

udf = new VarOptCommonImpl.UnionSketchesAsTuple();
assertNotNull(udf);

udf = new VarOptCommonImpl.UnionSketchesAsTuple("5");
assertNotNull(udf);

udf = new VarOptCommonImpl.UnionSketchesAsTuple("5", "3");
assertNotNull(udf);

try {
new VarOptCommonImpl.UnionSketchesAsTuple("-1");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.UnionSketchesAsTuple("-1", "3");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.UnionSketchesAsTuple("10", "-1");
} catch (final IllegalArgumentException e) {
// expected
}
}

@Test
public void unionSketchesAsByteArrayConstructors() {
VarOptCommonImpl.UnionSketchesAsByteArray udf;

udf = new VarOptCommonImpl.UnionSketchesAsByteArray();
assertNotNull(udf);

udf = new VarOptCommonImpl.UnionSketchesAsByteArray("5");
assertNotNull(udf);

udf = new VarOptCommonImpl.UnionSketchesAsByteArray("5", "3");
assertNotNull(udf);

try {
new VarOptCommonImpl.UnionSketchesAsByteArray("-1");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.UnionSketchesAsByteArray("-1", "3");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.UnionSketchesAsByteArray("10", "-1");
} catch (final IllegalArgumentException e) {
// expected
}
}

// exec: sketches generally in sampling mode
@Test
public void rawTuplesToSketchTupleExec() {
final int k = 5;
final int wtIdx = 1;
final VarOptCommonImpl.RawTuplesToSketchTuple udf;

udf = new VarOptCommonImpl.RawTuplesToSketchTuple(Integer.toString(k), Integer.toString(wtIdx));

char id = 'a';
double wt = 1.0;

final DataBag inputBag = BagFactory.getInstance().newDefaultBag();
try {
for (int i = 0; i < k + 1; ++i) {
final Tuple t = TupleFactory.getInstance().newTuple(2);
t.set(0, Character.toString(id));
t.set(1, wt);
inputBag.add(t);

++id;
wt += 1.0;
}
} catch (final ExecException e) {
fail("Unexpected ExecException creating input data");
}

try {
// degenerate input first
Tuple result = udf.exec(null);
assertNull(result);

Tuple inputTuple = TupleFactory.getInstance().newTuple(0);
result = udf.exec(inputTuple);
assertNull(result);

inputTuple = TupleFactory.getInstance().newTuple(1);
inputTuple.set(0, null);
result = udf.exec(inputTuple);
assertNull(result);

// now test real input
inputTuple.set(0, inputBag);
result = udf.exec(inputTuple);
assertEquals(result.size(), 1);
final DataByteArray dba = (DataByteArray) result.get(0);

final VarOptItemsSketch<Tuple> vis;
vis = VarOptItemsSketch.heapify(Memory.wrap(dba.get()), serDe_);
assertEquals(vis.getN(), k + 1);
assertEquals(vis.getK(), k);

// just validating the original weights are within the expected range
for (VarOptItemsSamples<Tuple>.WeightedSample ws : vis.getSketchSamples()) {
final Tuple t = ws.getItem();
assertTrue((double) t.get(wtIdx) >= 1.0);
assertTrue((double) t.get(wtIdx) <= (k + 1.0));
}
} catch (final IOException e) {
fail("Unexpected IOException calling exec()");
}
}

@Test
public void unionSketchesDegenerateInput() {
try {
// Tuple version
final VarOptCommonImpl.UnionSketchesAsTuple udfTuple;
udfTuple = new VarOptCommonImpl.UnionSketchesAsTuple("4");
Tuple result = udfTuple.exec(null);
assertNull(result);

Tuple inputTuple = TupleFactory.getInstance().newTuple(0);
result = udfTuple.exec(inputTuple);
assertNull(result);

inputTuple = TupleFactory.getInstance().newTuple(1);
inputTuple.set(0, null);
result = udfTuple.exec(inputTuple);
assertNull(result);

// DataByteArray version
final VarOptCommonImpl.UnionSketchesAsByteArray udfBA;
udfBA = new VarOptCommonImpl.UnionSketchesAsByteArray("4");
DataByteArray output = udfBA.exec(null);
assertNull(output);

inputTuple = TupleFactory.getInstance().newTuple(0);
output = udfBA.exec(inputTuple);
assertNull(output);

inputTuple = TupleFactory.getInstance().newTuple(1);
inputTuple.set(0, null);
output = udfBA.exec(inputTuple);
assertNull(output);
} catch (final IOException e) {
fail("Unexpected IOException calling exec()");
}
}

@Test
public void unionSketchesExec() {
// Only difference between UnionSketchesAsTuple and UnionSketchesAsByteArray is that one wraps
// the resulting serialized sketch in a tuple. If the union result is still in exact mode, the
// two sketches should be identical.
final int numSketches = 3;
final int numItemsPerSketch = 10; // numSketches * numItemsPerSketch should be < k here
final int k = 100;
final String kStr = Integer.toString(k);
final VarOptCommonImpl.UnionSketchesAsTuple udfTuple;
final VarOptCommonImpl.UnionSketchesAsByteArray udfBA;

udfTuple = new VarOptCommonImpl.UnionSketchesAsTuple(kStr);
udfBA = new VarOptCommonImpl.UnionSketchesAsByteArray(kStr);

char id = 'a';
double wt = 1.0;

final DataBag inputBag = BagFactory.getInstance().newDefaultBag();
final VarOptItemsUnion<Tuple> union = VarOptItemsUnion.newInstance(k);
final VarOptItemsSketch<Tuple> vis = VarOptItemsSketch.newInstance(k);

// Create numSketches VarOpt sketches and serialize them. Also create a standard union to
// compare against at the end.
try {
for (int j = 0; j < numSketches; ++j) {
vis.reset();
for (int i = 0; i < numItemsPerSketch; ++i) {
final Tuple t = TupleFactory.getInstance().newTuple(2);
t.set(0, Character.toString(id));
t.set(1, wt);
vis.update(t, wt);

++id;
wt += 1.0;
}

final Tuple wrapper = TupleFactory.getInstance().newTuple(1);
wrapper.set(0, new DataByteArray(vis.toByteArray(serDe_)));
inputBag.add(wrapper);
union.update(vis);
}
} catch (final ExecException e) {
fail("Unexpected ExecException creating input data");
}

try {
final Tuple inputTuple = TupleFactory.getInstance().newTuple(1);
inputTuple.set(0, inputBag);

final DataByteArray outArray = udfBA.exec(inputTuple);
final VarOptItemsSketch<Tuple> sketch1
= VarOptItemsSketch.heapify(Memory.wrap(outArray.get()), serDe_);

final Tuple outTuple = udfTuple.exec(inputTuple);
final DataByteArray dba = (DataByteArray) outTuple.get(0);
final VarOptItemsSketch<Tuple> sketch2
= VarOptItemsSketch.heapify(Memory.wrap(dba.get()), serDe_);

final VarOptItemsSketch<Tuple> expectedResult = union.getResult();
compareResults(sketch1, expectedResult);
compareResults(sketch2, expectedResult);
} catch (final IOException e) {
fail("Unexpected IOException calling exec()");
}
}

void compareResults(final VarOptItemsSketch<Tuple> s1,
final VarOptItemsSketch<Tuple> s2) {
assertEquals(s1.getN(), s2.getN());
assertEquals(s1.getK(), s2.getK());

final HashMap<Tuple, Double> items = new HashMap<>(s1.getNumSamples());
for (VarOptItemsSamples<Tuple>.WeightedSample ws : s1.getSketchSamples()) {
items.put(ws.getItem(), ws.getWeight());
}

for (VarOptItemsSamples<Tuple>.WeightedSample ws : s2.getSketchSamples()) {
assertEquals(items.get(ws.getItem()), ws.getWeight());
}
}


}

0 comments on commit 9cde068

Please sign in to comment.