Skip to content
Permalink
Browse files
commenting and clean-up
  • Loading branch information
jmalkin committed Jun 13, 2017
1 parent b8cc5e0 commit a440f9d7be1e76f47f8b84386b3691588b11fda3
Showing 4 changed files with 26 additions and 11 deletions.
@@ -84,7 +84,7 @@ public void cleanup() {

@Override
public String getInitial() {
return VarOptCommonImpl.RawTuplesToSketchTupleImpl.class.getName();
return VarOptCommonImpl.RawTuplesToSketchTuple.class.getName();
}

@Override
@@ -7,7 +7,7 @@

import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.RECORD_ALIAS;
import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.WEIGHT_ALIAS;
import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.createResultFromSketch;
import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.createDataBagFromSketch;

import java.io.IOException;

@@ -45,7 +45,7 @@ public DataBag exec(final Tuple inputTuple) throws IOException {
final Memory mem = Memory.wrap(dba.get());
final VarOptItemsSketch<Tuple> sketch = VarOptItemsSketch.heapify(mem, SERDE);

return createResultFromSketch(sketch);
return createDataBagFromSketch(sketch);
}

@Override
@@ -16,6 +16,8 @@
import com.yahoo.sketches.sampling.VarOptItemsUnion;

/**
* A collection of methods and constants used across VarOpt UDFs.
*
* @author Jon Malkin
*/
class VarOptCommonImpl {
@@ -28,6 +30,7 @@ class VarOptCommonImpl {
private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
private static final ArrayOfTuplesSerDe SERDE = new ArrayOfTuplesSerDe();

// Produces a sketch from a bag of input Tuples
static VarOptItemsSketch<Tuple> rawTuplesToSketch(final Tuple inputTuple, final int k)
throws IOException {
assert inputTuple != null;
@@ -46,6 +49,7 @@ static VarOptItemsSketch<Tuple> rawTuplesToSketch(final Tuple inputTuple, final
return sketch;
}

// Produces a union from a bag of serialized sketches
static VarOptItemsUnion<Tuple> unionSketches(final Tuple inputTuple, final int k)
throws IOException {
assert inputTuple != null;
@@ -64,14 +68,16 @@ static VarOptItemsUnion<Tuple> unionSketches(final Tuple inputTuple, final int k
return union;
}

// Serializes a sketch to a DataByteArray and wraps it in a Tuple
static Tuple wrapSketchInTuple(final VarOptItemsSketch<Tuple> sketch) throws IOException {
final DataByteArray dba = new DataByteArray(sketch.toByteArray(SERDE));
final Tuple outputTuple = TUPLE_FACTORY.newTuple(1);
outputTuple.set(0, dba);
return outputTuple;
}

static DataBag createResultFromSketch(final VarOptItemsSketch<Tuple> sketch) {
// Produces a DataBag containing the samples from the input sketch
static DataBag createDataBagFromSketch(final VarOptItemsSketch<Tuple> sketch) {
final DataBag output = BAG_FACTORY.newDefaultBag();

final VarOptItemsSamples<Tuple> samples = sketch.getSketchSamples();
@@ -91,18 +97,21 @@ static DataBag createResultFromSketch(final VarOptItemsSketch<Tuple> sketch) {
return output;
}

public static class RawTuplesToSketchTupleImpl extends EvalFunc<Tuple> {
/**
* Adds raw Tuples to a sketch, returns a Tuple with the serialized sketch.
*/
public static class RawTuplesToSketchTuple extends EvalFunc<Tuple> {
private final int targetK_;

public RawTuplesToSketchTupleImpl() {
public RawTuplesToSketchTuple() {
targetK_ = DEFAULT_TARGET_K;
}

/**
* VarOpt sampling constructor.
* @param kStr String indicating the maximum number of desired samples to return.
*/
public RawTuplesToSketchTupleImpl(final String kStr) {
public RawTuplesToSketchTuple(final String kStr) {
targetK_ = Integer.parseInt(kStr);

if (targetK_ < 1) {
@@ -122,6 +131,9 @@ public Tuple exec(final Tuple inputTuple) throws IOException {
}
}

/**
* Unions serialized sketches, returns a Tuple with a sketch obtained from the union result.
*/
public static class UnionSketchesAsTuple extends EvalFunc<Tuple> {
private final int targetK_;

@@ -153,6 +165,9 @@ public Tuple exec(final Tuple inputTuple) throws IOException {
}
}

/**
* Unions serialized sketches, returns a DataByteArray of the sketch obtained from the union.
*/
public static class UnionSketchesAsByteArray extends EvalFunc<DataByteArray> {
private final int targetK_;

@@ -7,7 +7,7 @@

import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.RECORD_ALIAS;
import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.WEIGHT_ALIAS;
import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.createResultFromSketch;
import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.createDataBagFromSketch;
import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.unionSketches;

import java.io.IOException;
@@ -78,7 +78,7 @@ public DataBag getValue() {
return null;
}

return createResultFromSketch(sketch_);
return createDataBagFromSketch(sketch_);
}

@Override
@@ -124,7 +124,7 @@ public Schema outputSchema(final Schema input) {

@Override
public String getInitial() {
return VarOptCommonImpl.RawTuplesToSketchTupleImpl.class.getName();
return VarOptCommonImpl.RawTuplesToSketchTuple.class.getName();
}

@Override
@@ -164,7 +164,7 @@ public DataBag exec(final Tuple inputTuple) throws IOException {
}

final VarOptItemsUnion<Tuple> union = unionSketches(inputTuple, targetK_);
return createResultFromSketch(union.getResult());
return createDataBagFromSketch(union.getResult());
}
}
}

0 comments on commit a440f9d

Please sign in to comment.