Permalink
Show file tree
Hide file tree
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Merge branch 'master' of git@github.com:DataSketches/sketches-pig.git
- Loading branch information
Showing
38 changed files
with
4,077 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Copyright 2019, Verizon Media. | ||
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms. | ||
*/ | ||
|
||
package com.yahoo.sketches.pig.cpc; | ||
|
||
import java.io.IOException; | ||
|
||
import org.apache.log4j.Logger; | ||
import org.apache.pig.EvalFunc; | ||
import org.apache.pig.data.DataBag; | ||
import org.apache.pig.data.DataByteArray; | ||
import org.apache.pig.data.Tuple; | ||
|
||
import com.yahoo.sketches.cpc.CpcSketch; | ||
import com.yahoo.sketches.cpc.CpcUnion; | ||
|
||
/** | ||
* Class used to calculate the final pass of an <i>Algebraic</i> sketch | ||
* operation. It will receive a bag of values returned by either the <i>Intermediate</i> | ||
* stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and | ||
* interpret both types. | ||
* | ||
* @author Alexander Saydakov | ||
*/ | ||
abstract class AlgebraicFinal extends EvalFunc<DataByteArray> { | ||
|
||
private final int lgK_; | ||
private final long seed_; | ||
private DataByteArray emptySketch_; // this is to cash an empty sketch tuple | ||
private boolean isFirstCall_ = true; // for logging | ||
|
||
/** | ||
* Constructor with primitives for the final passes of an Algebraic function. | ||
* | ||
* @param lgK parameter controlling the sketch size and accuracy | ||
* @param seed for the hash function | ||
*/ | ||
public AlgebraicFinal(final int lgK, final long seed) { | ||
lgK_ = lgK; | ||
seed_ = seed; | ||
} | ||
|
||
@Override | ||
public DataByteArray exec(final Tuple inputTuple) throws IOException { | ||
if (isFirstCall_) { | ||
Logger.getLogger(getClass()).info("Algebraic was used"); | ||
isFirstCall_ = false; | ||
} | ||
final DataByteArray dba = process(inputTuple, lgK_, seed_, isInputRaw()); | ||
if (dba == null) { | ||
return getEmptySketch(); | ||
} | ||
return dba; | ||
} | ||
|
||
static DataByteArray process(final Tuple inputTuple, final int lgK, final long seed, boolean isInputRaw) throws IOException { | ||
if (inputTuple == null || inputTuple.size() == 0) { | ||
return null; | ||
} | ||
CpcSketch sketch = null; | ||
CpcUnion union = null; | ||
final DataBag outerBag = (DataBag) inputTuple.get(0); | ||
if (outerBag == null) { | ||
return null; | ||
} | ||
for (final Tuple dataTuple: outerBag) { | ||
final Object f0 = dataTuple.get(0); // inputTuple.bag0.dataTupleN.f0 | ||
if (f0 == null) { | ||
continue; | ||
} | ||
if (f0 instanceof DataBag) { | ||
final DataBag innerBag = (DataBag) f0; // inputTuple.bag0.dataTupleN.f0:bag | ||
if (innerBag.size() == 0) { continue; } | ||
// If field 0 of a dataTuple is a Bag, all innerTuples of this inner bag | ||
// will be passed into the union. | ||
// It is due to system bagged outputs from multiple mapper Initial functions. | ||
// The Intermediate stage was bypassed. | ||
if (isInputRaw) { | ||
if (sketch == null) { | ||
sketch = new CpcSketch(lgK, seed); | ||
} | ||
DataToSketch.updateSketch(innerBag, sketch); | ||
} else { | ||
if (union == null) { | ||
union = new CpcUnion(lgK, seed); | ||
} | ||
UnionSketch.updateUnion(innerBag, union, seed); | ||
} | ||
} else if (f0 instanceof DataByteArray) { // inputTuple.bag0.dataTupleN.f0:DBA | ||
// If field 0 of a dataTuple is a DataByteArray, we assume it is a sketch | ||
// due to system bagged outputs from multiple mapper Intermediate functions. | ||
// Each dataTuple.DBA:sketch will merged into the union. | ||
final DataByteArray dba = (DataByteArray) f0; | ||
if (union == null) { | ||
union = new CpcUnion(lgK, seed); | ||
} | ||
union.update(CpcSketch.heapify(dba.get(), seed)); | ||
} else { // we should never get here | ||
throw new IllegalArgumentException("dataTuple.Field0 is not a DataBag or DataByteArray: " | ||
+ f0.getClass().getName()); | ||
} | ||
} | ||
if (sketch != null && union != null) { | ||
union.update(sketch); | ||
sketch = null; | ||
} | ||
if (sketch != null) { | ||
return new DataByteArray(sketch.toByteArray()); | ||
} else if (union != null) { | ||
return new DataByteArray(union.getResult().toByteArray()); | ||
} | ||
return null; | ||
} | ||
|
||
abstract boolean isInputRaw(); | ||
|
||
private DataByteArray getEmptySketch() { | ||
if (emptySketch_ == null) { | ||
emptySketch_ = new DataByteArray(new CpcSketch(lgK_, seed_).toByteArray()); | ||
} | ||
return emptySketch_; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright 2019, Verizon Media. | ||
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms. | ||
*/ | ||
|
||
package com.yahoo.sketches.pig.cpc; | ||
|
||
import java.io.IOException; | ||
|
||
import org.apache.log4j.Logger; | ||
import org.apache.pig.EvalFunc; | ||
import org.apache.pig.data.Tuple; | ||
|
||
/** | ||
* Class used to calculate the initial pass of an Algebraic sketch operation. | ||
* | ||
* <p>The Initial class simply passes through all records unchanged so that they can be | ||
* processed by the intermediate processor instead. | ||
* | ||
* @author Alexander Saydakov | ||
*/ | ||
public class AlgebraicInitial extends EvalFunc<Tuple> { | ||
|
||
private boolean isFirstCall_ = true; | ||
|
||
/** | ||
* Default constructor for the initial pass of an Algebraic function. | ||
*/ | ||
public AlgebraicInitial() {} | ||
|
||
/** | ||
* Constructor for the initial pass of an Algebraic function. Pig will call this and pass the | ||
* same constructor arguments as the original UDF. In this case the arguments are ignored. | ||
* | ||
* @param lgK in a form of a String | ||
*/ | ||
public AlgebraicInitial(final String lgK) {} | ||
|
||
/** | ||
* Constructor for the initial pass of an Algebraic function. Pig will call this and pass the | ||
* same constructor arguments as the original UDF. In this case the arguments are ignored. | ||
* | ||
* @param lgK in a form of a String | ||
* @param seed in a form of a String | ||
*/ | ||
public AlgebraicInitial(final String lgK, final String seed) {} | ||
|
||
@Override | ||
public Tuple exec(final Tuple inputTuple) throws IOException { | ||
if (isFirstCall_) { | ||
Logger.getLogger(getClass()).info("Algebraic was used"); | ||
isFirstCall_ = false; | ||
} | ||
return inputTuple; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Copyright 2019, Verizon Media. | ||
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms. | ||
*/ | ||
|
||
package com.yahoo.sketches.pig.cpc; | ||
|
||
import java.io.IOException; | ||
|
||
import org.apache.log4j.Logger; | ||
import org.apache.pig.EvalFunc; | ||
import org.apache.pig.data.DataByteArray; | ||
import org.apache.pig.data.Tuple; | ||
import org.apache.pig.data.TupleFactory; | ||
|
||
import com.yahoo.sketches.cpc.CpcSketch; | ||
|
||
/** | ||
* Class used to calculate the intermediate combiner pass of an <i>Algebraic</i> sketch | ||
* operation. This is called from the combiner, and may be called multiple times (from a mapper | ||
* and from a reducer). It will receive a bag of values returned by either <i>Intermediate</i> | ||
* or <i>Initial</i> stages, so it needs to be able to differentiate between and | ||
* interpret both types. | ||
* | ||
* @author Alexander Saydakov | ||
*/ | ||
abstract class AlgebraicIntermediate extends EvalFunc<Tuple> { | ||
|
||
private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance(); | ||
|
||
private final int lgK_; | ||
private final long seed_; | ||
private Tuple emptySketchTuple_; // this is to cash an empty sketch tuple | ||
private boolean isFirstCall_ = true; // for logging | ||
|
||
/** | ||
* Constructor with primitives for the intermediate pass of an Algebraic function. | ||
* | ||
* @param lgK parameter controlling the sketch size and accuracy | ||
* @param seed | ||
*/ | ||
public AlgebraicIntermediate(final int lgK, final long seed) { | ||
lgK_ = lgK; | ||
seed_ = seed; | ||
} | ||
|
||
@Override | ||
public Tuple exec(final Tuple inputTuple) throws IOException { | ||
if (isFirstCall_) { | ||
Logger.getLogger(getClass()).info("Algebraic was used"); | ||
isFirstCall_ = false; | ||
} | ||
final DataByteArray dba = AlgebraicFinal.process(inputTuple, lgK_, seed_, isInputRaw()); | ||
if (dba == null) { | ||
return getEmptySketchTuple(); | ||
} | ||
return TUPLE_FACTORY.newTuple(dba); | ||
} | ||
|
||
abstract boolean isInputRaw(); | ||
|
||
private Tuple getEmptySketchTuple() { | ||
if (emptySketchTuple_ == null) { | ||
emptySketchTuple_ = TUPLE_FACTORY.newTuple(new DataByteArray( | ||
new CpcSketch(lgK_, seed_).toByteArray())); | ||
} | ||
return emptySketchTuple_; | ||
} | ||
|
||
} |
Oops, something went wrong.