Skip to content
Permalink
Browse files
stricter input schema validation
  • Loading branch information
jmalkin committed Jun 23, 2017
1 parent 9a6d09e commit 72da232d06520ce69622f2c6b936689292233446
Showing 4 changed files with 71 additions and 64 deletions.
@@ -22,7 +22,7 @@
import com.yahoo.sketches.sampling.VarOptItemsSketch;

/**
* This UDF creates a binary version of a VarOpt sampling over input tuples. The resulting
* Creates a binary version of a VarOpt sampling over input tuples. The resulting
* <tt>DataByteArray</tt> can be read in Pig with <tt>GetVarOptSamples</tt>, although the
* per-record schema will be lost. It implements both the <tt>Accumulator</tt> and
* <tt>Algebraic</tt> interfaces for efficient performance.
@@ -45,7 +45,7 @@ public DataToVarOptSketch(final String kStr) {
weightIdx_ = DEFAULT_WEIGHT_IDX;

if (targetK_ < 1) {
throw new IllegalArgumentException("VarOptSampling requires target sample size >= 1: "
throw new IllegalArgumentException("DataToVarOptSketch requires target sample size >= 1: "
+ targetK_);
}
}
@@ -60,11 +60,11 @@ public DataToVarOptSketch(final String kStr, final String weightIdxStr) {
weightIdx_ = Integer.parseInt(weightIdxStr);

if (targetK_ < 1) {
throw new IllegalArgumentException("VarOptSampling requires target sample size >= 1: "
throw new IllegalArgumentException("DataToVarOptSketch requires target sample size >= 1: "
+ targetK_);
}
if (weightIdx_ < 0) {
throw new IllegalArgumentException("VarOptSampling requires weight index >= 0: "
throw new IllegalArgumentException("DataToVarOptSketch requires weight index >= 0: "
+ weightIdx_);
}
}
@@ -125,32 +125,31 @@ public String getFinal() {

@Override
public Schema outputSchema(final Schema input) {
if (input != null && input.size() > 0) {
try {
Schema record = input;

// first element must be a bag, first element of tuples must be the weight (float or double)
if (record.getField(0).type != DataType.BAG) {
throw new IllegalArgumentException("VarOpt input must be a data bag: "
+ record.toString());
}

record = record.getField(0).schema; // record has a tuple in field 0
final Schema fields = record.getField(weightIdx_).schema; //
if (fields.getField(weightIdx_).type != DataType.DOUBLE
&& fields.getField(0).type != DataType.FLOAT) {
throw new IllegalArgumentException("First item of VarOpt tuple must be a "
+ "weight (double/float), found " + fields.getField(0).type
+ ": " + fields.toString());

}
try {
if (input == null || input.size() == 0) {
throw new IllegalArgumentException("Degenerate input schema to VarOptSampling");
}
catch (final FrontendException e) {
// fall through

// first element must be a bag, first element of tuples must be the weight (float or double)
if (input.getField(0).type != DataType.BAG) {
throw new IllegalArgumentException("VarOpt input must be a data bag: "
+ input.toString());
}

final Schema record = input.getField(0).schema; // record has a tuple in field 0
final Schema fields = record.getField(weightIdx_).schema; //
if (fields.getField(weightIdx_).type != DataType.DOUBLE
&& fields.getField(0).type != DataType.FLOAT) {
throw new IllegalArgumentException("weightIndex item of VarOpt tuple must be a "
+ "weight (double/float), found " + fields.getField(0).type
+ ": " + fields.toString());
}
}

return new Schema(new Schema.FieldSchema(getSchemaName(this
.getClass().getName().toLowerCase(), input), DataType.BYTEARRAY));
return new Schema(new Schema.FieldSchema(getSchemaName(this
.getClass().getName().toLowerCase(), input), DataType.BYTEARRAY));
}
catch (final FrontendException e) {
return null;
}
}
}
@@ -110,38 +110,36 @@ public void cleanup() {

@Override
public Schema outputSchema(final Schema input) {
if (input != null && input.size() > 0) {
try {
Schema record = input;

// first element must be a bag, first element of tuples must be the weight (float or double)
if (record.getField(0).type != DataType.BAG) {
throw new IllegalArgumentException("VarOpt input must be a data bag: "
+ record.toString());
}

record = record.getField(0).schema; // record has a tuple in field 0
final Schema fields = record.getField(0).schema; //
if (fields.getField(weightIdx_).type != DataType.DOUBLE
&& fields.getField(weightIdx_).type != DataType.FLOAT) {
throw new IllegalArgumentException("First item of VarOpt tuple must be a "
+ "weight (double/float), found " + fields.getField(0).type
+ ": " + fields.toString());

}

final Schema weightedSampleSchema = new Schema();
weightedSampleSchema.add(new Schema.FieldSchema(WEIGHT_ALIAS, DataType.DOUBLE));
weightedSampleSchema.add(new Schema.FieldSchema(RECORD_ALIAS, record, DataType.TUPLE));

return new Schema(new Schema.FieldSchema(getSchemaName(this
.getClass().getName().toLowerCase(), record), weightedSampleSchema, DataType.BAG));
try {
if (input == null || input.size() == 0) {
throw new IllegalArgumentException("Degenerate input schema to VarOptSampling");
}
catch (final FrontendException e) {
// fall through

// first element must be a bag, first element of tuples must be the weight (float or double)
if (input.getField(0).type != DataType.BAG) {
throw new IllegalArgumentException("VarOpt input must be a data bag: "
+ input.toString());
}

final Schema record = input.getField(0).schema; // record has a tuple in field 0
final Schema fields = record.getField(0).schema; //
if (fields.getField(weightIdx_).type != DataType.DOUBLE
&& fields.getField(weightIdx_).type != DataType.FLOAT) {
throw new IllegalArgumentException("weightIndex item of VarOpt tuple must be a "
+ "weight (double/float), found " + fields.getField(0).type
+ ": " + fields.toString());
}

final Schema weightedSampleSchema = new Schema();
weightedSampleSchema.add(new Schema.FieldSchema(WEIGHT_ALIAS, DataType.DOUBLE));
weightedSampleSchema.add(new Schema.FieldSchema(RECORD_ALIAS, record, DataType.TUPLE));

return new Schema(new Schema.FieldSchema(getSchemaName(this
.getClass().getName().toLowerCase(), record), weightedSampleSchema, DataType.BAG));
}
catch (final FrontendException e) {
return null;
}
return null;
}

@Override
@@ -1,5 +1,7 @@
package com.yahoo.sketches.pig.sampling;

import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.DEFAULT_TARGET_K;

import java.io.IOException;

import org.apache.pig.AccumulatorEvalFunc;
@@ -25,7 +27,6 @@
* @author Jon Malkin
*/
public class VarOptUnion extends AccumulatorEvalFunc<DataByteArray> implements Algebraic {
private static final int DEFAULT_TARGET_K = 1024;
private static final ArrayOfTuplesSerDe SERDE = new ArrayOfTuplesSerDe();

private final int maxK_;
@@ -38,8 +39,8 @@ public class VarOptUnion extends AccumulatorEvalFunc<DataByteArray> implements A
public VarOptUnion(final String kStr) {
maxK_ = Integer.parseInt(kStr);

if (maxK_ < 2) {
throw new IllegalArgumentException("ReservoirUnion requires max reservoir size >= 2: "
if (maxK_ < 1) {
throw new IllegalArgumentException("VarOptUnion requires max sample size >= 1: "
+ maxK_);
}
}
@@ -179,10 +179,6 @@ public void algebraicFinal() {
public void validOutputSchemaTest() throws IOException {
VarOptSampling udf = new VarOptSampling("5", "1");

// first a couple degenerate cases
assertNull(udf.outputSchema(null));
assertNull(udf.outputSchema(new Schema()));

final Schema recordSchema = new Schema();
recordSchema.add(new Schema.FieldSchema("field1", DataType.CHARARRAY));
recordSchema.add(new Schema.FieldSchema("field2", DataType.DOUBLE));
@@ -231,6 +227,19 @@ public void badOutputSchemaTest() throws IOException {

final VarOptSampling udf = new VarOptSampling("5", "0");

// degenerate input schemas
try {
udf.outputSchema(null);
} catch (final IllegalArgumentException e) {
// expected
}

try {
udf.outputSchema(new Schema());
} catch (final IllegalArgumentException e) {
// expected
}

// expecting weight in element 0
try {
udf.outputSchema(inputSchema);

0 comments on commit 72da232

Please sign in to comment.