Skip to content
Permalink
Browse files
Merge pull request #41 from DataSketches/varopt_udf
varopt revisions
  • Loading branch information
leerho committed Jun 30, 2017
2 parents 580ff50 + 42b2a24 commit 4ca70246f843f6f5d6833fcae566fb12c1c4a4cd
Showing 14 changed files with 87 additions and 63 deletions.
@@ -15,6 +15,7 @@
import org.apache.pig.data.WritableByteArray;

import com.yahoo.memory.Memory;
import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.ArrayOfItemsSerDe;

/**
@@ -27,10 +28,9 @@ public class ArrayOfTuplesSerDe extends ArrayOfItemsSerDe<Tuple> {
@Override
public byte[] serializeToByteArray(final Tuple[] items) {
final WritableByteArray wba = new WritableByteArray();
final DataOutputStream os = new DataOutputStream(wba);
try {
try (final DataOutputStream os = new DataOutputStream(wba)) {
for (Tuple t : items) {
// BinInterSedes is more efficient, but only suitable for intermediate data within a job
// BinInterSedes is more efficient, but only suitable for intermediate data within a job.
DataReaderWriter.writeDatum(os, t);
}
} catch (final IOException e) {
@@ -42,20 +42,17 @@ public byte[] serializeToByteArray(final Tuple[] items) {

@Override
public Tuple[] deserializeFromMemory(final Memory mem, final int numItems) {
// if we could get the correct offset into the region, the following avoids a copy:
//final byte[] bytes = (byte[]) ((WritableMemory) mem).getArray();
final int size = (int) mem.getCapacity();
final byte[] bytes = new byte[size];
mem.getByteArray(0, bytes, 0, size);

final DataInputStream is = new DataInputStream(new ByteArrayInputStream(bytes));
final byte[] bytes = (byte[]) ((WritableMemory) mem).getArray();
final int offset = (int) ((WritableMemory) mem).getRegionOffset(0L);
final int length = (int) mem.getCapacity();

final Tuple[] result = new Tuple[numItems];
try {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(bytes, offset, length);
final DataInputStream dis = new DataInputStream(bais)) {
for (int i = 0; i < numItems; ++i) {
// BinInterSedes is more efficient, but only suitable for intermediate data within a job
// we know we're getting Tuples back in this case
result[i] = (Tuple) DataReaderWriter.readDatum(is);
// BinInterSedes is more efficient, but only suitable for intermediate data within a job.
// We know we're getting Tuples back in this case so cast is safe
result[i] = (Tuple) DataReaderWriter.readDatum(dis);
}
} catch (final IOException e) {
throw new RuntimeException("Error deserializing tuple: " + e.getMessage());
@@ -86,8 +86,7 @@ public void accumulate(final Tuple inputTuple) throws IOException {
sketch_ = VarOptItemsSketch.newInstance(targetK_);
}

for (Tuple t : samples) {
// first element is weight
for (final Tuple t : samples) {
final double weight = (double) t.get(weightIdx_);
sketch_.update(t, weight);
}
@@ -148,7 +147,7 @@ public Schema outputSchema(final Schema input) {
.getClass().getName().toLowerCase(), input), DataType.BYTEARRAY));
}
catch (final FrontendException e) {
return null;
throw new RuntimeException(e);
}
}
}
@@ -62,7 +62,7 @@ public Schema outputSchema(final Schema input) {
return new Schema(new Schema.FieldSchema(getSchemaName(this
.getClass().getName().toLowerCase(), input), weightedSampleSchema, DataType.BAG));
} catch (final FrontendException e) {
return null;
throw new RuntimeException(e);
}
}
}
@@ -129,7 +129,7 @@ public Schema outputSchema(final Schema input) {
.getClass().getName().toLowerCase(), source), recordSchema, DataType.TUPLE));
}
catch (final FrontendException e) {
// fall through
throw new RuntimeException(e);
}
}
return null;
@@ -130,7 +130,7 @@ public Schema outputSchema(final Schema input) {
.getClass().getName().toLowerCase(), source), source, DataType.TUPLE));
}
} catch (final FrontendException e) {
// fall through
throw new RuntimeException(e);
}
}

@@ -47,8 +47,7 @@ static VarOptItemsSketch<Tuple> rawTuplesToSketch(final Tuple inputTuple,
final DataBag samples = (DataBag) inputTuple.get(0);
final VarOptItemsSketch<Tuple> sketch = VarOptItemsSketch.newInstance(k);

for (Tuple t : samples) {
// first element is weight
for (final Tuple t : samples) {
final double weight = (double) t.get(weightIdx);
sketch.update(t, weight);
}
@@ -66,7 +65,7 @@ static VarOptItemsUnion<Tuple> unionSketches(final Tuple inputTuple, final int k
final VarOptItemsUnion<Tuple> union = VarOptItemsUnion.newInstance(k);

final DataBag sketchBag = (DataBag) inputTuple.get(0);
for (Tuple t : sketchBag) {
for (final Tuple t : sketchBag) {
final DataByteArray dba = (DataByteArray) t.get(0);
final Memory mem = Memory.wrap(dba.get());
union.update(mem, SERDE);
@@ -78,9 +77,7 @@ static VarOptItemsUnion<Tuple> unionSketches(final Tuple inputTuple, final int k
// Serializes a sketch to a DataByteArray and wraps it in a Tuple
static Tuple wrapSketchInTuple(final VarOptItemsSketch<Tuple> sketch) throws IOException {
final DataByteArray dba = new DataByteArray(sketch.toByteArray(SERDE));
final Tuple outputTuple = TUPLE_FACTORY.newTuple(1);
outputTuple.set(0, dba);
return outputTuple;
return TUPLE_FACTORY.newTuple(dba);
}

// Produces a DataBag containing the samples from the input sketch
@@ -91,7 +88,7 @@ static DataBag createDataBagFromSketch(final VarOptItemsSketch<Tuple> sketch) {

try {
// create (weight, item) tuples to add to output bag
for (VarOptItemsSamples.WeightedSample ws : samples) {
for (final VarOptItemsSamples<Tuple>.WeightedSample ws : samples) {
final Tuple weightedSample = TUPLE_FACTORY.newTuple(2);
weightedSample.set(0, ws.getWeight());
weightedSample.set(1, ws.getItem());
@@ -87,8 +87,7 @@ public void accumulate(final Tuple inputTuple) throws IOException {
sketch_ = VarOptItemsSketch.newInstance(targetK_);
}

for (Tuple t : samples) {
// first element is weight
for (final Tuple t : samples) {
final double weight = (double) t.get(weightIdx_);
sketch_.update(t, weight);
}
@@ -115,14 +114,14 @@ public Schema outputSchema(final Schema input) {
throw new IllegalArgumentException("Degenerate input schema to VarOptSampling");
}

// first element must be a bag, first element of tuples must be the weight (float or double)
// first element must be a bag, weightIdx_ element of tuples must be a float or double
if (input.getField(0).type != DataType.BAG) {
throw new IllegalArgumentException("VarOpt input must be a data bag: "
+ input.toString());
}

final Schema record = input.getField(0).schema; // record has a tuple in field 0
final Schema fields = record.getField(0).schema; //
final Schema fields = record.getField(0).schema;
if (fields.getField(weightIdx_).type != DataType.DOUBLE
&& fields.getField(weightIdx_).type != DataType.FLOAT) {
throw new IllegalArgumentException("weightIndex item of VarOpt tuple must be a "
@@ -138,7 +137,7 @@ public Schema outputSchema(final Schema input) {
.getClass().getName().toLowerCase(), record), weightedSampleSchema, DataType.BAG));
}
catch (final FrontendException e) {
return null;
throw new RuntimeException(e);
}
}

@@ -74,11 +74,7 @@ public void accumulate(final Tuple inputTuple) throws IOException {

@Override
public DataByteArray getValue() {
if (union_ == null) {
return null;
}

return new DataByteArray(union_.getResult().toByteArray(SERDE));
return union_ == null ? null : new DataByteArray(union_.getResult().toByteArray(SERDE));
}

@Override
@@ -26,6 +26,7 @@

public class DataToVarOptSketchTest {
@Test
@SuppressWarnings("unused")
public void checkConstructors() {
// these three should work
DataToVarOptSketch udf = new DataToVarOptSketch();
@@ -39,18 +40,21 @@ public void checkConstructors() {

try {
new DataToVarOptSketch("-1");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new DataToVarOptSketch("-1", "3");
fail("Accepted negative k");
} catch (final IllegalArgumentException e) {
// expected
}

try {
new DataToVarOptSketch("10", "-1");
fail("Accepted weight index");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -153,26 +157,30 @@ public void badOutputSchemaTest() throws IOException {
// degenerate input schemas
try {
udf.outputSchema(null);
fail("Accepted null schema");
} catch (final IllegalArgumentException e) {
// expected
}

try {
udf.outputSchema(new Schema());
fail("Accepted empty schema");
} catch (final IllegalArgumentException e) {
// expected
}

// expecting weight in element 0
// expecting weight in element 0 (based on constructor arg)
try {
udf.outputSchema(inputSchema);
fail("Accepted non-weight value in weightIndex column");
} catch (final IllegalArgumentException e) {
// expected
}

// passing in Tuple instead of DataBag
try {
udf.outputSchema(tupleSchema);
fail("Accepted Tuple instead of DataBag");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -53,7 +53,7 @@ public void checkExec() {
double cumWt = 0.0;
for (int i = 1; i <= n; ++i) {
final Tuple t = TupleFactory.getInstance().newTuple(2);
final double wt = 1.0 * i;
final double wt = 1.0 * i;
t.set(0, wt);
t.set(1, i);
vis.update(t, wt);
@@ -73,46 +73,49 @@ public void checkExec() {
}
assertEquals(cumResultWt, cumWt, EPS);
} catch (final IOException e) {
fail("Unexpected IOException");
fail("Unexpected IOException" + e.getMessage());
}
}

@Test
public void validOutputSchemaTest() throws IOException {
public void validOutputSchemaTest() {
final GetVarOptSamples udf = new GetVarOptSamples();

final Schema serializedSketch = new Schema();
serializedSketch.add(new Schema.FieldSchema("field1", DataType.BYTEARRAY));
//final Schema inputSchema = new Schema();
//inputSchema.add(new Schema.FieldSchema("record", serializedSketch, DataType.TUPLE));
try {
final Schema serializedSketch = new Schema();
serializedSketch.add(new Schema.FieldSchema("field1", DataType.BYTEARRAY));

//final Schema output = udf.outputSchema(inputSchema);
final Schema output = udf.outputSchema(serializedSketch);
assertEquals(output.size(), 1);
assertEquals(output.getField(0).type, DataType.BAG);
final Schema output = udf.outputSchema(serializedSketch);
assertEquals(output.size(), 1);
assertEquals(output.getField(0).type, DataType.BAG);

final List<Schema.FieldSchema> outputFields = output.getField(0).schema.getFields();
assertEquals(outputFields.size(), 2);
final List<Schema.FieldSchema> outputFields = output.getField(0).schema.getFields();
assertEquals(outputFields.size(), 2);

// check high-level structure
assertEquals(outputFields.get(0).alias, WEIGHT_ALIAS);
assertEquals(outputFields.get(0).type, DataType.DOUBLE);
assertEquals(outputFields.get(1).alias, RECORD_ALIAS);
assertEquals(outputFields.get(1).type, DataType.TUPLE);
// check high-level structure
assertEquals(outputFields.get(0).alias, WEIGHT_ALIAS);
assertEquals(outputFields.get(0).type, DataType.DOUBLE);
assertEquals(outputFields.get(1).alias, RECORD_ALIAS);
assertEquals(outputFields.get(1).type, DataType.TUPLE);
} catch (final IOException e) {
fail("Unexpected IOException: " + e.getMessage());
}
}

@Test
public void badOutputSchemaTest() throws IOException {
public void badOutputSchemaTest() {
final GetVarOptSamples udf = new GetVarOptSamples();

try {
udf.outputSchema(null);
fail("Accepted null schema");
} catch (final IllegalArgumentException e) {
// expected
}

try {
udf.outputSchema(new Schema());
fail("Accepted empty schema");
} catch (final IllegalArgumentException e) {
// expected
}
@@ -121,6 +124,7 @@ public void badOutputSchemaTest() throws IOException {
final Schema wrongSchema = new Schema();
wrongSchema.add(new Schema.FieldSchema("field", DataType.BOOLEAN));
udf.outputSchema(wrongSchema);
fail("Accepted schema with no DataByteArray");
} catch (final IllegalArgumentException e) {
// expected
}

0 comments on commit 4ca7024

Please sign in to comment.