Skip to content
Permalink
Browse files
Merge pull request #376 from apache/FixSetOpLeakage
Fix set op leakage
  • Loading branch information
leerho committed Dec 3, 2021
2 parents 95a712a + ba37699 commit f77eb5ee27046c5eaf6fb64b5ab01376098af4e2
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 86 deletions.
@@ -25,7 +25,6 @@
import static org.apache.datasketches.Util.REBUILD_THRESHOLD;
import static org.apache.datasketches.Util.simpleLog2OfLong;

import java.lang.reflect.Array;
import java.lang.reflect.Method;
import java.util.Arrays;

@@ -119,7 +118,7 @@ public void setA(final Sketch<S> skA) {

empty_ = skA.isEmpty();
thetaLong_ = skA.getThetaLong();
final DataArrays<S> da = getDataArraysTuple(skA);
final DataArrays<S> da = getCopyOfDataArraysTuple(skA);
summaryArr_ = da.summaryArr; //it may be null
hashArr_ = da.hashArr; //it may be null
curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
@@ -174,7 +173,7 @@ public void notB(final Sketch<S> skB) {
}
case SKA_TRIM: {
thetaLong_ = min(thetaLong_, thetaLongB);
final DataArrays<S> da = trimDataArrays(hashArr_, summaryArr_, thetaLong_);
final DataArrays<S> da = trimAndCopyDataArrays(hashArr_, summaryArr_, thetaLong_, true);
hashArr_ = da.hashArr;
curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
summaryArr_ = da.summaryArr;
@@ -186,7 +185,7 @@ public void notB(final Sketch<S> skB) {
}
case FULL_ANOTB: { //both A and B should have valid entries.
thetaLong_ = min(thetaLong_, thetaLongB);
final DataArrays<S> daR = getAnotbResultArraysTuple(thetaLong_, curCount_, hashArr_, summaryArr_, skB);
final DataArrays<S> daR = getCopyOfResultArraysTuple(thetaLong_, curCount_, hashArr_, summaryArr_, skB);
hashArr_ = daR.hashArr;
curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
summaryArr_ = daR.summaryArr;
@@ -247,7 +246,7 @@ public void notB(final org.apache.datasketches.theta.Sketch skB) {
}
case SKA_TRIM: {
thetaLong_ = min(thetaLong_, thetaLongB);
final DataArrays<S> da = trimDataArrays(hashArr_, summaryArr_,thetaLong_);
final DataArrays<S> da = trimAndCopyDataArrays(hashArr_, summaryArr_,thetaLong_, true);
hashArr_ = da.hashArr;
curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
summaryArr_ = da.summaryArr;
@@ -258,7 +257,7 @@ public void notB(final org.apache.datasketches.theta.Sketch skB) {
}
case FULL_ANOTB: { //both A and B should have valid entries.
thetaLong_ = min(thetaLong_, thetaLongB);
final DataArrays<S> daB = getAnotbResultArraysTheta(thetaLong_, curCount_, hashArr_, summaryArr_, skB);
final DataArrays<S> daB = getCopyOfResultArraysTheta(thetaLong_, curCount_, hashArr_, summaryArr_, skB);
hashArr_ = daB.hashArr;
curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
summaryArr_ = daB.summaryArr;
@@ -282,7 +281,8 @@ public CompactSketch<S> getResult(final boolean reset) {
if (curCount_ == 0) {
result = new CompactSketch<>(null, null, thetaLong_, thetaLong_ == Long.MAX_VALUE);
} else {
result = new CompactSketch<>(hashArr_, summaryArr_, thetaLong_, false);

result = new CompactSketch<>(hashArr_, Util.copySummaryArray(summaryArr_), thetaLong_, false);
}
if (reset) { reset(); }
return result;
@@ -349,24 +349,24 @@ public static <S extends Summary> CompactSketch<S> aNotB(
break;
}
case SKA_TRIM: {
final DataArrays<S> daA = getDataArraysTuple(skA);
final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
final long[] hashArrA = daA.hashArr;
final S[] summaryArrA = daA.summaryArr;
final long minThetaLong = min(thetaLongA, thetaLongB);
final DataArrays<S> da = trimDataArrays(hashArrA, summaryArrA, minThetaLong);
final DataArrays<S> da = trimAndCopyDataArrays(hashArrA, summaryArrA, minThetaLong, false);
result = new CompactSketch<>(da.hashArr, da.summaryArr, minThetaLong, skA.empty_);
break;
}
case SKETCH_A: {
final DataArrays<S> daA = getDataArraysTuple(skA);
final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
result = new CompactSketch<>(daA.hashArr, daA.summaryArr, thetaLongA, skA.empty_);
break;
}
case FULL_ANOTB: { //both A and B should have valid entries.
final DataArrays<S> daA = getDataArraysTuple(skA);
final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
final long minThetaLong = min(thetaLongA, thetaLongB);
final DataArrays<S> daR =
getAnotbResultArraysTuple(minThetaLong, daA.hashArr.length, daA.hashArr, daA.summaryArr, skB);
getCopyOfResultArraysTuple(minThetaLong, daA.hashArr.length, daA.hashArr, daA.summaryArr, skB);
final int countR = (daR.hashArr == null) ? 0 : daR.hashArr.length;
if (countR == 0) {
result = new CompactSketch<>(null, null, minThetaLong, minThetaLong == Long.MAX_VALUE);
@@ -441,24 +441,24 @@ public static <S extends Summary> CompactSketch<S> aNotB(
break;
}
case SKA_TRIM: {
final DataArrays<S> daA = getDataArraysTuple(skA);
final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
final long[] hashArrA = daA.hashArr;
final S[] summaryArrA = daA.summaryArr;
final long minThetaLong = min(thetaLongA, thetaLongB);
final DataArrays<S> da = trimDataArrays(hashArrA, summaryArrA, minThetaLong);
final DataArrays<S> da = trimAndCopyDataArrays(hashArrA, summaryArrA, minThetaLong, false);
result = new CompactSketch<>(da.hashArr, da.summaryArr, minThetaLong, skA.empty_);
break;
}
case SKETCH_A: {
final DataArrays<S> daA = getDataArraysTuple(skA);
final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
result = new CompactSketch<>(daA.hashArr, daA.summaryArr, thetaLongA, skA.empty_);
break;
}
case FULL_ANOTB: { //both A and B should have valid entries.
final DataArrays<S> daA = getDataArraysTuple(skA);
final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
final long minThetaLong = min(thetaLongA, thetaLongB);
final DataArrays<S> daR =
getAnotbResultArraysTheta(minThetaLong, daA.hashArr.length, daA.hashArr, daA.summaryArr, skB);
getCopyOfResultArraysTheta(minThetaLong, daA.hashArr.length, daA.hashArr, daA.summaryArr, skB);
final int countR = (daR.hashArr == null) ? 0 : daR.hashArr.length;
if (countR == 0) {
result = new CompactSketch<>(null, null, minThetaLong, minThetaLong == Long.MAX_VALUE);
@@ -478,7 +478,8 @@ public static <S extends Summary> CompactSketch<S> aNotB(
S[] summaryArr;
}

private static <S extends Summary> DataArrays<S> getDataArraysTuple(
@SuppressWarnings("unchecked")
private static <S extends Summary> DataArrays<S> getCopyOfDataArraysTuple(
final Sketch<S> sk) {
final CompactSketch<S> csk;
final DataArrays<S> da = new DataArrays<>();
@@ -493,40 +494,14 @@ private static <S extends Summary> DataArrays<S> getDataArraysTuple(
da.summaryArr = null;
} else {
da.hashArr = csk.getHashArr().clone(); //deep copy, may not be sorted
da.summaryArr = csk.getSummaryArr().clone(); //shallow copy
da.summaryArr = Util.copySummaryArray(csk.getSummaryArr());
}
return da;
}

@SuppressWarnings("unchecked")
private static <S extends Summary> DataArrays<S> trimDataArrays(
final long[] hashArr,
final S[] summaryArr,
final long minThetaLong) {

//build temporary arrays
final int countIn = hashArr.length;
final long[] tmpHashArr = new long[countIn];
final Class<S> summaryType = (Class<S>) summaryArr.getClass().getComponentType();
final S[] tmpSummaryArr = (S[]) Array.newInstance(summaryType, countIn);
int countResult = 0;
for (int i = 0; i < countIn; i++) {
final long hash = hashArr[i];
if (hash < minThetaLong) {
tmpHashArr[countResult] = hash;
tmpSummaryArr[countResult] = summaryArr[i];
countResult++;
} else { continue; }
}
final DataArrays<S> da = new DataArrays<>();
da.hashArr = Arrays.copyOfRange(tmpHashArr, 0, countResult);
da.summaryArr = Arrays.copyOfRange(tmpSummaryArr, 0, countResult);
return da;
}

@SuppressWarnings("unchecked")
//Both skA and skB must have entries (count > 0)
private static <S extends Summary> DataArrays<S> getAnotbResultArraysTuple(
private static <S extends Summary> DataArrays<S> getCopyOfResultArraysTuple(
final long minThetaLong,
final int countA,
final long[] hashArrA,
@@ -548,8 +523,7 @@ private static <S extends Summary> DataArrays<S> getAnotbResultArraysTuple(

//build temporary arrays of skA
final long[] tmpHashArrA = new long[countA];
final Class<S> summaryType = (Class<S>) summaryArrA.getClass().getComponentType();
final S[] tmpSummaryArrA = (S[]) Array.newInstance(summaryType, countA);
final S[] tmpSummaryArrA = Util.newSummaryArray(summaryArrA, countA);

//search for non matches and build temp arrays
final int lgHTBLen = simpleLog2OfLong(hashTableB.length);
@@ -560,7 +534,7 @@ private static <S extends Summary> DataArrays<S> getAnotbResultArraysTuple(
final int index = hashSearch(hashTableB, lgHTBLen, hash);
if (index == -1) {
tmpHashArrA[nonMatches] = hash;
tmpSummaryArrA[nonMatches] = summaryArrA[i];
tmpSummaryArrA[nonMatches] = (S) summaryArrA[i].copy();
nonMatches++;
}
}
@@ -571,7 +545,7 @@ private static <S extends Summary> DataArrays<S> getAnotbResultArraysTuple(
}

@SuppressWarnings("unchecked")
private static <S extends Summary> DataArrays<S> getAnotbResultArraysTheta(
private static <S extends Summary> DataArrays<S> getCopyOfResultArraysTheta(
final long minThetaLong,
final int countA,
final long[] hashArrA,
@@ -595,8 +569,7 @@ private static <S extends Summary> DataArrays<S> getAnotbResultArraysTheta(

//build temporary result arrays of skA
final long[] tmpHashArrA = new long[countA];
final Class<S> summaryType = (Class<S>) summaryArrA.getClass().getComponentType();
final S[] tmpSummaryArrA = (S[]) Array.newInstance(summaryType, countA);
final S[] tmpSummaryArrA = Util.newSummaryArray(summaryArrA, countA);

//search for non matches and build temp arrays
final int lgHTBLen = simpleLog2OfLong(hashTableB.length);
@@ -607,7 +580,7 @@ private static <S extends Summary> DataArrays<S> getAnotbResultArraysTheta(
final int index = hashSearch(hashTableB, lgHTBLen, hash);
if (index == -1) { //not found
tmpHashArrA[nonMatches] = hash;
tmpSummaryArrA[nonMatches] = summaryArrA[i];
tmpSummaryArrA[nonMatches] = (S) summaryArrA[i].copy();
nonMatches++;
}
}
@@ -618,6 +591,33 @@ private static <S extends Summary> DataArrays<S> getAnotbResultArraysTheta(
return daB;
}

@SuppressWarnings("unchecked")
private static <S extends Summary> DataArrays<S> trimAndCopyDataArrays(
final long[] hashArr,
final S[] summaryArr,
final long minThetaLong,
final boolean copy) {

//build temporary arrays
final int countIn = hashArr.length;
final long[] tmpHashArr = new long[countIn];
final S[] tmpSummaryArr = Util.newSummaryArray(summaryArr, countIn);
int countResult = 0;
for (int i = 0; i < countIn; i++) {
final long hash = hashArr[i];
if (hash < minThetaLong) {
tmpHashArr[countResult] = hash;
tmpSummaryArr[countResult] = (S) (copy ? summaryArr[i].copy() : summaryArr[i]);
countResult++;
} else { continue; }
}
//Remove empty slots
final DataArrays<S> da = new DataArrays<>();
da.hashArr = Arrays.copyOfRange(tmpHashArr, 0, countResult);
da.summaryArr = Arrays.copyOfRange(tmpSummaryArr, 0, countResult);
return da;
}

/**
* Resets this operation back to the empty state.
*/
@@ -97,12 +97,10 @@ HashTables<S> getIntersectHashTables(
final long thetaLong,
final SummarySetOperations<S> summarySetOps) {

final Class<S> summaryType = (Class<S>) summaryTable_.getClass().getComponentType();

//Match nextSketch data with local instance data, filtering by theta
final int maxMatchSize = min(count_, nextTupleSketch.getRetainedEntries());
final long[] matchHashArr = new long[maxMatchSize];
final S[] matchSummariesArr = (S[]) Array.newInstance(summaryType, maxMatchSize);
final S[] matchSummariesArr = Util.newSummaryArray(summaryTable_, maxMatchSize);
int matchCount = 0;
final SketchIterator<S> it = nextTupleSketch.iterator();

@@ -25,8 +25,6 @@
import static org.apache.datasketches.Util.MIN_LG_NOM_LONGS;
import static org.apache.datasketches.Util.ceilingPowerOf2;

import java.lang.reflect.Array;

import org.apache.datasketches.SketchesArgumentException;
import org.apache.datasketches.SketchesStateException;

@@ -200,19 +198,18 @@ public CompactSketch<S> getResult() {
return new CompactSketch<>(null, null, thetaLong_, empty_);
}

//Compact the hash tables
final int tableSize = hashTables_.hashTable_.length;

final long[] hashArr = new long[countIn];
final Class<S> summaryType = (Class<S>) hashTables_.summaryTable_.getClass().getComponentType();
final S[] summaryArr = (S[]) Array.newInstance(summaryType, countIn);
final S[] summaryArr = Util.newSummaryArray(hashTables_.summaryTable_, countIn);

//compact the arrays
int cnt = 0;
for (int i = 0; i < tableSize; i++) {
final long hash = hashTables_.hashTable_[i];
if (hash == 0 || hash > thetaLong_) { continue; }
hashArr[cnt] = hash;
summaryArr[cnt] = hashTables_.summaryTable_[i];
summaryArr[cnt] = (S) hashTables_.summaryTable_[i].copy();
cnt++;
}
assert cnt == countIn;
@@ -299,8 +299,7 @@ public CompactSketch<S> compact() {
return new CompactSketch<>(null, null, thetaLong_, false);
}
final long[] hashArr = new long[getRetainedEntries()];
final S[] summaryArr = (S[])
Array.newInstance(summaryTable_.getClass().getComponentType(), getRetainedEntries());
final S[] summaryArr = Util.newSummaryArray(summaryTable_, getRetainedEntries());
int i = 0;
for (int j = 0; j < hashTable_.length; j++) {
if (summaryTable_[j] != null) {
@@ -418,7 +417,7 @@ void merge(final long hash, final S summary, final SummarySetOperations<S> summa
if (index < 0) {
insertSummary(~index, (S)summary.copy()); //did not find, so insert
} else {
insertSummary(index, summarySetOps.union(summaryTable_[index], summary));
insertSummary(index, summarySetOps.union(summaryTable_[index], (S) summary.copy()));
}
rebuildIfNeeded();
}
@@ -487,9 +486,8 @@ private void updateTheta() {
private void resize(final int newSize) {
final long[] oldHashTable = hashTable_;
final S[] oldSummaryTable = summaryTable_;
final Class<S> summaryType = (Class<S>) summaryTable_.getClass().getComponentType();
hashTable_ = new long[newSize];
summaryTable_ = (S[]) Array.newInstance(summaryType, newSize);
summaryTable_ = Util.newSummaryArray(summaryTable_, newSize);
lgCurrentCapacity_ = Integer.numberOfTrailingZeros(newSize);
count_ = 0;
for (int i = 0; i < oldHashTable.length; i++) {
@@ -22,8 +22,6 @@
import static java.lang.Math.min;
import static org.apache.datasketches.Util.DEFAULT_NOMINAL_ENTRIES;

import java.lang.reflect.Array;

import org.apache.datasketches.QuickSelect;
import org.apache.datasketches.SketchesArgumentException;

@@ -63,7 +61,7 @@ public Union(final int nomEntries, final SummarySetOperations<S> summarySetOps)

/**
* Perform a stateless, pair-wise union operation between two tuple sketches.
* The returned sketch will be cutback to the smaller of the two k values if required.
* The returned sketch will be cut back to the smaller of the two k values if required.
*
* <p>Nulls and empty sketches are ignored.</p>
*
@@ -82,7 +80,7 @@ public CompactSketch<S> union(final Sketch<S> tupleSketchA, final Sketch<S> tupl

/**
* Perform a stateless, pair-wise union operation between a tupleSketch and a thetaSketch.
* The returned sketch will be cutback to the smaller of the two k values if required.
* The returned sketch will be cut back to the smaller of the two k values if required.
*
* <p>Nulls and empty sketches are ignored.</p>
*
@@ -139,7 +137,7 @@ public void union(final org.apache.datasketches.theta.Sketch thetaSketch, final
if (thetaIn < thetaLong_) { thetaLong_ = thetaIn; }
final org.apache.datasketches.theta.HashIterator it = thetaSketch.iterator();
while (it.next()) {
qsk_.merge(it.get(), (S)summary.copy(), summarySetOps_);
qsk_.merge(it.get(), summary, summarySetOps_); //copies summary
}
if (qsk_.thetaLong_ < thetaLong_) {
thetaLong_ = qsk_.thetaLong_;
@@ -181,9 +179,8 @@ public CompactSketch<S> getResult() {
theta = QuickSelect.select(hashArr, 0, numHashes - 1, qsk_.getNominalEntries());
numHashes = qsk_.getNominalEntries();
}
final Class<S> summaryType = (Class<S>) qsk_.getSummaryTable().getClass().getComponentType();
final long[] hashArr = new long[numHashes];
final S[] summaries = (S[]) Array.newInstance(summaryType, numHashes);
final S[] summaries = Util.newSummaryArray(qsk_.getSummaryTable(), numHashes);
final SketchIterator<S> it = qsk_.iterator();
int i = 0;
while (it.next()) {

0 comments on commit f77eb5e

Please sign in to comment.