Skip to content
Permalink
Browse files
address most of the PR comments, except for what to do with exception…
…s in outputSchema
  • Loading branch information
jmalkin committed Jun 29, 2017
1 parent 580ff50 commit 50ccf9b2968b750d0189b241bdbc91153069c370
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 29 deletions.
@@ -86,8 +86,7 @@ public void accumulate(final Tuple inputTuple) throws IOException {
sketch_ = VarOptItemsSketch.newInstance(targetK_);
}

for (Tuple t : samples) {
// first element is weight
for (final Tuple t : samples) {
final double weight = (double) t.get(weightIdx_);
sketch_.update(t, weight);
}
@@ -47,8 +47,7 @@ static VarOptItemsSketch<Tuple> rawTuplesToSketch(final Tuple inputTuple,
final DataBag samples = (DataBag) inputTuple.get(0);
final VarOptItemsSketch<Tuple> sketch = VarOptItemsSketch.newInstance(k);

for (Tuple t : samples) {
// first element is weight
for (final Tuple t : samples) {
final double weight = (double) t.get(weightIdx);
sketch.update(t, weight);
}
@@ -66,7 +65,7 @@ static VarOptItemsUnion<Tuple> unionSketches(final Tuple inputTuple, final int k
final VarOptItemsUnion<Tuple> union = VarOptItemsUnion.newInstance(k);

final DataBag sketchBag = (DataBag) inputTuple.get(0);
for (Tuple t : sketchBag) {
for (final Tuple t : sketchBag) {
final DataByteArray dba = (DataByteArray) t.get(0);
final Memory mem = Memory.wrap(dba.get());
union.update(mem, SERDE);
@@ -78,9 +77,7 @@ static VarOptItemsUnion<Tuple> unionSketches(final Tuple inputTuple, final int k
// Serializes a sketch to a DataByteArray and wraps it in a Tuple
static Tuple wrapSketchInTuple(final VarOptItemsSketch<Tuple> sketch) throws IOException {
final DataByteArray dba = new DataByteArray(sketch.toByteArray(SERDE));
final Tuple outputTuple = TUPLE_FACTORY.newTuple(1);
outputTuple.set(0, dba);
return outputTuple;
return TUPLE_FACTORY.newTuple(dba);
}

// Produces a DataBag containing the samples from the input sketch
@@ -91,7 +88,7 @@ static DataBag createDataBagFromSketch(final VarOptItemsSketch<Tuple> sketch) {

try {
// create (weight, item) tuples to add to output bag
for (VarOptItemsSamples.WeightedSample ws : samples) {
for (final VarOptItemsSamples.WeightedSample ws : samples) {
final Tuple weightedSample = TUPLE_FACTORY.newTuple(2);
weightedSample.set(0, ws.getWeight());
weightedSample.set(1, ws.getItem());
@@ -87,8 +87,7 @@ public void accumulate(final Tuple inputTuple) throws IOException {
sketch_ = VarOptItemsSketch.newInstance(targetK_);
}

for (Tuple t : samples) {
// first element is weight
for (final Tuple t : samples) {
final double weight = (double) t.get(weightIdx_);
sketch_.update(t, weight);
}
@@ -115,14 +114,14 @@ public Schema outputSchema(final Schema input) {
throw new IllegalArgumentException("Degenerate input schema to VarOptSampling");
}

// first element must be a bag, first element of tuples must be the weight (float or double)
// first element must be a bag, weightIdx_ element of tuples must be a float or double
if (input.getField(0).type != DataType.BAG) {
throw new IllegalArgumentException("VarOpt input must be a data bag: "
+ input.toString());
}

final Schema record = input.getField(0).schema; // record has a tuple in field 0
final Schema fields = record.getField(0).schema; //
final Schema fields = record.getField(0).schema;
if (fields.getField(weightIdx_).type != DataType.DOUBLE
&& fields.getField(weightIdx_).type != DataType.FLOAT) {
throw new IllegalArgumentException("weightIndex item of VarOpt tuple must be a "
@@ -74,11 +74,7 @@ public void accumulate(final Tuple inputTuple) throws IOException {

@Override
public DataByteArray getValue() {
if (union_ == null) {
return null;
}

return new DataByteArray(union_.getResult().toByteArray(SERDE));
return union_ == null ? null : new DataByteArray(union_.getResult().toByteArray(SERDE));
}

@Override
@@ -39,18 +39,21 @@ public void checkConstructors() {

try {
new DataToVarOptSketch("-1");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new DataToVarOptSketch("-1", "3");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new DataToVarOptSketch("10", "-1");
fail("Accepted weight index");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -153,26 +156,30 @@ public void badOutputSchemaTest() throws IOException {
// degenerate input schemas
try {
udf.outputSchema(null);
fail("Accepted null schema");
} catch (final IllegalArgumentException e) {
// expected
}

try {
udf.outputSchema(new Schema());
fail("Accepted empty schema");
} catch (final IllegalArgumentException e) {
// expected
}

// expecting weight in element 0
// expecting weight in element 0 (based on constructor arg)
try {
udf.outputSchema(inputSchema);
fail("Accepted non-weight value in weightIndex column");
} catch (final IllegalArgumentException e) {
// expected
}

// passing in Tuple instead of DataBag
try {
udf.outputSchema(tupleSchema);
fail("Accepted Tuple instead of DataBag");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -53,7 +53,7 @@ public void checkExec() {
double cumWt = 0.0;
for (int i = 1; i <= n; ++i) {
final Tuple t = TupleFactory.getInstance().newTuple(2);
final double wt = 1.0 * i;
final double wt = 1.0 * i;
t.set(0, wt);
t.set(1, i);
vis.update(t, wt);
@@ -83,10 +83,7 @@ public void validOutputSchemaTest() throws IOException {

final Schema serializedSketch = new Schema();
serializedSketch.add(new Schema.FieldSchema("field1", DataType.BYTEARRAY));
//final Schema inputSchema = new Schema();
//inputSchema.add(new Schema.FieldSchema("record", serializedSketch, DataType.TUPLE));

//final Schema output = udf.outputSchema(inputSchema);
final Schema output = udf.outputSchema(serializedSketch);
assertEquals(output.size(), 1);
assertEquals(output.getField(0).type, DataType.BAG);
@@ -107,12 +104,14 @@ public void badOutputSchemaTest() throws IOException {

try {
udf.outputSchema(null);
fail("Accepted null schema");
} catch (final IllegalArgumentException e) {
// expected
}

try {
udf.outputSchema(new Schema());
fail("Accepted empty schema");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -121,6 +120,7 @@ public void badOutputSchemaTest() throws IOException {
final Schema wrongSchema = new Schema();
wrongSchema.add(new Schema.FieldSchema("field", DataType.BOOLEAN));
udf.outputSchema(wrongSchema);
fail("Accepted schema with no DataByteArray");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -46,18 +46,21 @@ public void rawTuplesToSketchConstructors() {

try {
new VarOptCommonImpl.RawTuplesToSketchTuple("-1");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.RawTuplesToSketchTuple("-1", "3");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.RawTuplesToSketchTuple("10", "-1");
fail("Accepted negative weight index");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -78,18 +81,21 @@ public void unionSketchesAsSketchConstructors() {

try {
new VarOptCommonImpl.UnionSketchesAsTuple("-1");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.UnionSketchesAsTuple("-1", "3");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.UnionSketchesAsTuple("10", "-1");
fail("Accepted negative weight index");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -110,18 +116,21 @@ public void unionSketchesAsByteArrayConstructors() {

try {
new VarOptCommonImpl.UnionSketchesAsByteArray("-1");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.UnionSketchesAsByteArray("-1", "3");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptCommonImpl.UnionSketchesAsByteArray("10", "-1");
fail("Accepted negative weight index");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -45,18 +45,21 @@ public void baseConstructors() {

try {
new VarOptSampling("-1");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptSampling("-1", "3");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptSampling("10", "-1");
fail("Accepted negative weight index");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -76,18 +79,21 @@ public void algebraicFinalConstructors() {

try {
new VarOptSampling.Final("-1");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptSampling.Final("-1", "3");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new VarOptSampling.Final("10", "-1");
fail("Accepted negative weight index");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -99,7 +105,6 @@ public void standardAccumulate() {
final VarOptSampling udf = new VarOptSampling(Integer.toString(k), "0");

final DataBag inputBag = BagFactory.getInstance().newDefaultBag();
final Tuple inputTuple = TupleFactory.getInstance().newTuple(1);
double cumWeight = 0.0;
try {
for (int i = 1; i < k; ++i) {
@@ -110,7 +115,7 @@ public void standardAccumulate() {
inputBag.add(t);
cumWeight += i;
}
inputTuple.set(0, inputBag);
final Tuple inputTuple = TupleFactory.getInstance().newTuple(inputBag);

assertNull(udf.getValue());
udf.accumulate(inputTuple);
@@ -251,26 +256,30 @@ public void badOutputSchemaTest() throws IOException {
// degenerate input schemas
try {
udf.outputSchema(null);
fail("Accepted null schema");
} catch (final IllegalArgumentException e) {
// expected
}

try {
udf.outputSchema(new Schema());
fail("Accepted empty schema");
} catch (final IllegalArgumentException e) {
// expected
}

// expecting weight in element 0
// expecting weight in element 0 (based on constructor args)
try {
udf.outputSchema(inputSchema);
fail("Accepted non-weight in weightIndex column");
} catch (final IllegalArgumentException e) {
// expected
}

// passing in Tuple instead of DataBag
try {
udf.outputSchema(tupleSchema);
fail("Accepted input Tuple instead of DataBag");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -37,6 +37,7 @@ public void checkConstructors() {

try {
new VarOptUnion("-1");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -20,7 +20,7 @@ public class PigUtil {
*/
public static Tuple objectsToTuple(final Object ... objects) {
final Tuple tuple = TupleFactory.getInstance().newTuple();
for (Object object: objects) { tuple.append(object); }
for (final Object object: objects) { tuple.append(object); }
return tuple;
}

@@ -31,7 +31,7 @@ public static Tuple objectsToTuple(final Object ... objects) {
*/
public static DataBag tuplesToBag(final Tuple ... tuples) {
final DataBag bag = BagFactory.getInstance().newDefaultBag();
for (Tuple tuple: tuples) { bag.add(tuple); }
for (final Tuple tuple: tuples) { bag.add(tuple); }
return bag;
}

@@ -43,7 +43,7 @@ public static DataBag tuplesToBag(final Tuple ... tuples) {
*/
public static <T> DataBag listToBagOfTuples(final List<T> list) {
final DataBag bag = BagFactory.getInstance().newDefaultBag();
for (Object object: list) {
for (final Object object: list) {
final Tuple tuple = TupleFactory.getInstance().newTuple();
tuple.append(object);
bag.add(tuple);

0 comments on commit 50ccf9b

Please sign in to comment.