Skip to content
Permalink
Browse files
fix issues not initially caught by intellij, let ArrayOfTuplesSerDe a…
…void copying data every time
  • Loading branch information
jmalkin committed Jun 30, 2017
1 parent 50ccf9b commit 7d2874262e3636a66828f3832c55d7800ee28616
Showing 7 changed files with 37 additions and 30 deletions.
@@ -15,6 +15,7 @@
import org.apache.pig.data.WritableByteArray;

import com.yahoo.memory.Memory;
import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.ArrayOfItemsSerDe;

/**
@@ -27,10 +28,9 @@ public class ArrayOfTuplesSerDe extends ArrayOfItemsSerDe<Tuple> {
@Override
public byte[] serializeToByteArray(final Tuple[] items) {
final WritableByteArray wba = new WritableByteArray();
final DataOutputStream os = new DataOutputStream(wba);
try {
try (final DataOutputStream os = new DataOutputStream(wba)) {
for (Tuple t : items) {
// BinInterSedes is more efficient, but only suitable for intermediate data within a job
// BinInterSedes is more efficient, but only suitable for intermediate data within a job.
DataReaderWriter.writeDatum(os, t);
}
} catch (final IOException e) {
@@ -42,20 +42,17 @@ public byte[] serializeToByteArray(final Tuple[] items) {

@Override
public Tuple[] deserializeFromMemory(final Memory mem, final int numItems) {
// if we could get the correct offset into the region, the following avoids a copy:
//final byte[] bytes = (byte[]) ((WritableMemory) mem).getArray();
final int size = (int) mem.getCapacity();
final byte[] bytes = new byte[size];
mem.getByteArray(0, bytes, 0, size);

final DataInputStream is = new DataInputStream(new ByteArrayInputStream(bytes));
final byte[] bytes = (byte[]) ((WritableMemory) mem).getArray();
final int offset = (int) ((WritableMemory) mem).getRegionOffset(0L);
final int length = (int) mem.getCapacity();

final Tuple[] result = new Tuple[numItems];
try {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(bytes, offset, length);
final DataInputStream dis = new DataInputStream(bais)) {
for (int i = 0; i < numItems; ++i) {
// BinInterSedes is more efficient, but only suitable for intermediate data within a job
// we know we're getting Tuples back in this case
result[i] = (Tuple) DataReaderWriter.readDatum(is);
// BinInterSedes is more efficient, but only suitable for intermediate data within a job.
// We know we're getting Tuples back in this case so cast is safe
result[i] = (Tuple) DataReaderWriter.readDatum(dis);
}
} catch (final IOException e) {
throw new RuntimeException("Error deserializing tuple: " + e.getMessage());
@@ -88,7 +88,7 @@ static DataBag createDataBagFromSketch(final VarOptItemsSketch<Tuple> sketch) {

try {
// create (weight, item) tuples to add to output bag
for (final VarOptItemsSamples.WeightedSample ws : samples) {
for (final VarOptItemsSamples<Tuple>.WeightedSample ws : samples) {
final Tuple weightedSample = TUPLE_FACTORY.newTuple(2);
weightedSample.set(0, ws.getWeight());
weightedSample.set(1, ws.getItem());
@@ -26,6 +26,7 @@

public class DataToVarOptSketchTest {
@Test
@SuppressWarnings("unused")
public void checkConstructors() {
// these three should work
DataToVarOptSketch udf = new DataToVarOptSketch();
@@ -73,33 +73,37 @@ public void checkExec() {
}
assertEquals(cumResultWt, cumWt, EPS);
} catch (final IOException e) {
fail("Unexpected IOException");
fail("Unexpected IOException" + e.getMessage());
}
}

@Test
public void validOutputSchemaTest() throws IOException {
public void validOutputSchemaTest() {
final GetVarOptSamples udf = new GetVarOptSamples();

final Schema serializedSketch = new Schema();
serializedSketch.add(new Schema.FieldSchema("field1", DataType.BYTEARRAY));
try {
final Schema serializedSketch = new Schema();
serializedSketch.add(new Schema.FieldSchema("field1", DataType.BYTEARRAY));

final Schema output = udf.outputSchema(serializedSketch);
assertEquals(output.size(), 1);
assertEquals(output.getField(0).type, DataType.BAG);
final Schema output = udf.outputSchema(serializedSketch);
assertEquals(output.size(), 1);
assertEquals(output.getField(0).type, DataType.BAG);

final List<Schema.FieldSchema> outputFields = output.getField(0).schema.getFields();
assertEquals(outputFields.size(), 2);
final List<Schema.FieldSchema> outputFields = output.getField(0).schema.getFields();
assertEquals(outputFields.size(), 2);

// check high-level structure
assertEquals(outputFields.get(0).alias, WEIGHT_ALIAS);
assertEquals(outputFields.get(0).type, DataType.DOUBLE);
assertEquals(outputFields.get(1).alias, RECORD_ALIAS);
assertEquals(outputFields.get(1).type, DataType.TUPLE);
// check high-level structure
assertEquals(outputFields.get(0).alias, WEIGHT_ALIAS);
assertEquals(outputFields.get(0).type, DataType.DOUBLE);
assertEquals(outputFields.get(1).alias, RECORD_ALIAS);
assertEquals(outputFields.get(1).type, DataType.TUPLE);
} catch (final IOException e) {
fail("Unexpected IOException: " + e.getMessage());
}
}

@Test
public void badOutputSchemaTest() throws IOException {
public void badOutputSchemaTest() {
final GetVarOptSamples udf = new GetVarOptSamples();

try {
@@ -31,6 +31,7 @@ public class VarOptCommonAlgebraicTest {
private static final ArrayOfTuplesSerDe serDe_ = new ArrayOfTuplesSerDe();

// constructors: just make sure result not null with valid args, throw exceptions if invalid
@SuppressWarnings("unused")
@Test
public void rawTuplesToSketchConstructors() {
VarOptCommonImpl.RawTuplesToSketchTuple udf;
@@ -66,6 +67,7 @@ public void rawTuplesToSketchConstructors() {
}
}

@SuppressWarnings("unused")
@Test
public void unionSketchesAsSketchConstructors() {
VarOptCommonImpl.UnionSketchesAsTuple udf;
@@ -101,6 +103,7 @@ public void unionSketchesAsSketchConstructors() {
}
}

@SuppressWarnings("unused")
@Test
public void unionSketchesAsByteArrayConstructors() {
VarOptCommonImpl.UnionSketchesAsByteArray udf;
@@ -31,6 +31,7 @@ public class VarOptSamplingTest {
static final double EPS = 1e-10;
private static final ArrayOfTuplesSerDe serDe_ = new ArrayOfTuplesSerDe();

@SuppressWarnings("unused")
@Test
public void baseConstructors() {
// these three should work
@@ -26,6 +26,7 @@
import com.yahoo.sketches.sampling.VarOptItemsUnion;

public class VarOptUnionTest {
@SuppressWarnings("unused")
@Test
public void checkConstructors() {
// these three should work

0 comments on commit 7d28742

Please sign in to comment.