Skip to content

Commit

Permalink
Merge 4585ddc into 8044759
Browse files Browse the repository at this point in the history
  • Loading branch information
leerho committed Oct 21, 2021
2 parents 8044759 + 4585ddc commit 8f7a58d
Show file tree
Hide file tree
Showing 9 changed files with 1,533 additions and 39 deletions.
13 changes: 10 additions & 3 deletions src/main/java/org/apache/datasketches/theta/AnotBimpl.java
Expand Up @@ -78,6 +78,7 @@ public void setA(final Sketch skA) {

//process A
hashArr_ = getHashArrA(skA);
hashArr_ = (hashArr_ == null) ? new long[0] : hashArr_;
empty_ = false;
thetaLong_ = skA.getThetaLong();
curCount_ = hashArr_.length;
Expand All @@ -93,6 +94,7 @@ public void notB(final Sketch skB) {

//process B
hashArr_ = getResultHashArr(thetaLong_, curCount_, hashArr_, skB);
hashArr_ = (hashArr_ == null) ? new long[0] : hashArr_;
curCount_ = hashArr_.length;
empty_ = curCount_ == 0 && thetaLong_ == Long.MAX_VALUE;
}
Expand All @@ -119,17 +121,22 @@ public CompactSketch aNotB(final Sketch skA, final Sketch skB, final boolean dst
}
//Both skA & skB are not null

final long minThetaLong = Math.min(skA.getThetaLong(), skB.getThetaLong());

if (skA.isEmpty()) { return skA.compact(dstOrdered, dstMem); }
//A is not Empty
checkSeedHashes(skA.getSeedHash(), seedHash_);

if (skB.isEmpty()) { return skA.compact(dstOrdered, dstMem); }
if (skB.isEmpty()) {
return skA.compact(dstOrdered, dstMem);
}
checkSeedHashes(skB.getSeedHash(), seedHash_);
//Both skA & skB are not empty

//process A
final long[] hashArrA = getHashArrA(skA);
final int countA = hashArrA.length;
final long minThetaLong = Math.min(skA.getThetaLong(), skB.getThetaLong());
final int countA = (hashArrA == null) ? 0 : hashArrA.length;


//process B
final long[] hashArrOut = getResultHashArr(minThetaLong, countA, hashArrA, skB); //out is clone
Expand Down
105 changes: 81 additions & 24 deletions src/main/java/org/apache/datasketches/tuple/AnotB.java
Expand Up @@ -37,7 +37,7 @@
*
* <p>The stateful operation is as follows:</p>
* <pre><code>
* AnotB anotb = SetOperationBuilder.buildAnotB();
* AnotB anotb = new AnotB();
*
* anotb.setA(Sketch skA); //The first argument.
* anotb.notB(Sketch skB); //The second (subtraction) argument.
Expand All @@ -49,7 +49,7 @@
*
* <p>The stateless operation is as follows:</p>
* <pre><code>
* AnotB anotb = SetOperationBuilder.buildAnotB();
* AnotB anotb = new AnotB();
*
* CompactSketch csk = anotb.aNotB(Sketch skA, Sketch skB);
* </code></pre>
Expand All @@ -69,6 +69,7 @@ public final class AnotB<S extends Summary> {
private long[] hashArr_ = null; //always in compact form, not necessarily sorted
private S[] summaryArr_ = null; //always in compact form, not necessarily sorted
private int curCount_ = 0;
private SummaryFactory<S> summaryFactory_;

private static final Method GET_CACHE;

Expand All @@ -95,35 +96,45 @@ public final class AnotB<S extends Summary> {
* With a null as the first argument, we cannot know what the user's intent is.
* Since it is very likely that a <i>null</i> is a programming error, we throw a an exception.</p>
*
* <p>An enpty input argument will set the internal state to empty.</p>
* <p>An empty input argument will set the internal state to empty.</p>
*
* <p>Rationale: An empty set is a mathematically legal concept. Although it makes any subsequent,
* valid argument for B irrelvant, we must allow this and assume the user knows what they are
* valid argument for B irrelevant, we must allow this and assume the user knows what they are
* doing.</p>
*
* <p>Performing {@link #getResult(boolean)} just after this step will return a compact form of
* the given argument.</p>
*
* @param skA The incoming sketch for the first argument, <i>A</i>.
*/
@SuppressWarnings("unchecked")
public void setA(final Sketch<S> skA) {
if (skA == null) {
reset();
throw new SketchesArgumentException("The input argument <i>A</i> may not be null");
}
summaryFactory_ = skA.getSummaryFactory();
if (skA.isEmpty()) {
reset();
return;
}
//skA is not empty
empty_ = false;
thetaLong_ = skA.getThetaLong();

//process A
empty_ = false;
thetaLong_ = skA.getThetaLong();
final DataArrays<S> da = getDataArraysA(skA);

hashArr_ = da.hashArr;
summaryArr_ = da.summaryArr;
hashArr_ = (hashArr_ == null) ? new long[0] : hashArr_;
curCount_ = hashArr_.length;

summaryArr_ = da.summaryArr;
if (summaryArr_ == null) {
final S summary = skA.getSummaryFactory().newSummary();
final Class<S> summaryType = (Class<S>)summary.getClass();
summaryArr_ = (S[]) Array.newInstance(summaryType, 0);
}
}

/**
Expand All @@ -133,7 +144,7 @@ public void setA(final Sketch<S> skA) {
*
* <p>An input argument of null or empty is ignored.</p>
*
* <p>Rationale: A <i>null</i> for the second or following arguments is more tollerable because
* <p>Rationale: A <i>null</i> for the second or following arguments is more tolerable because
* <i>A NOT null</i> is still <i>A</i> even if we don't know exactly what the null represents. It
* clearly does not have any content that overlaps with <i>A</i>. Also, because this can be part of
* a multistep operation with multiple <i>notB</i> steps. Other following steps can still produce
Expand All @@ -143,18 +154,28 @@ public void setA(final Sketch<S> skA) {
*
* @param skB The incoming Tuple sketch for the second (or following) argument <i>B</i>.
*/
@SuppressWarnings("unchecked")
public void notB(final Sketch<S> skB) {
if (empty_ || skB == null || skB.isEmpty() || hashArr_ == null) { return; }
if (empty_ || skB == null || skB.isEmpty()) { return; }
//skB is not empty
final long thetaLongB = skB.getThetaLong();
thetaLong_ = Math.min(thetaLong_, thetaLongB);
summaryFactory_ = skB.getSummaryFactory();
if (summaryArr_ == null) {
final S summary = summaryFactory_.newSummary();
final Class<S> summaryType = (Class<S>)summary.getClass();
summaryArr_ = (S[]) Array.newInstance(summaryType, 0);
}

//process B
final DataArrays<S> daB = getResultArraysTuple(thetaLong_, curCount_, hashArr_, summaryArr_, skB);

hashArr_ = daB.hashArr;
hashArr_ = (hashArr_ == null) ? new long[0] : hashArr_;
curCount_ = hashArr_.length;

summaryArr_ = daB.summaryArr;

curCount_ = hashArr_.length;
empty_ = curCount_ == 0 && thetaLong_ == Long.MAX_VALUE;
}

Expand All @@ -167,7 +188,7 @@ public void notB(final Sketch<S> skB) {
*
* <p>An input argument of null or empty is ignored.</p>
*
* <p>Rationale: A <i>null</i> for the second or following arguments is more tollerable because
* <p>Rationale: A <i>null</i> for the second or following arguments is more tolerable because
* <i>A NOT null</i> is still <i>A</i> even if we don't know exactly what the null represents. It
* clearly does not have any content that overlaps with <i>A</i>. Also, because this can be part of
* a multistep operation with multiple <i>notB</i> steps. Other following steps can still produce
Expand All @@ -177,23 +198,32 @@ public void notB(final Sketch<S> skB) {
*
* @param skB The incoming Theta sketch for the second (or following) argument <i>B</i>.
*/
@SuppressWarnings("unchecked")
public void notB(final org.apache.datasketches.theta.Sketch skB) {
if (empty_ || skB == null || skB.isEmpty()) { return; }
//skB is not empty
final long thetaLongB = skB.getThetaLong();
thetaLong_ = Math.min(thetaLong_, thetaLongB);
if (summaryArr_ == null) {
final S summary = summaryFactory_.newSummary();
final Class<S> summaryType = (Class<S>)summary.getClass();
summaryArr_ = (S[]) Array.newInstance(summaryType, 0);
}

//process B
final DataArrays<S> daB = getResultArraysTheta(thetaLong_, curCount_, hashArr_, summaryArr_, skB);

hashArr_ = daB.hashArr;
hashArr_ = (hashArr_ == null) ? new long[0] : hashArr_;
curCount_ = hashArr_.length;

summaryArr_ = daB.summaryArr;

curCount_ = hashArr_.length;
empty_ = curCount_ == 0 && thetaLong_ == Long.MAX_VALUE;
}

/**
* Gets the result of the mutistep, stateful operation AnotB that have been executed with calls
* Gets the result of the multistep, stateful operation AnotB that have been executed with calls
* to {@link #setA(Sketch)} and ({@link #notB(Sketch)} or
* {@link #notB(org.apache.datasketches.theta.Sketch)}).
*
Expand Down Expand Up @@ -235,25 +265,39 @@ public CompactSketch<S> getResult(final boolean reset) {
* @param <S> Type of Summary
* @return the result as an unordered {@link CompactSketch}
*/
@SuppressWarnings("unchecked")
public static <S extends Summary>
CompactSketch<S> aNotB(final Sketch<S> skA, final Sketch<S> skB) {
if (skA == null || skB == null) {
throw new SketchesArgumentException("Neither argument may be null");
}
//Both skA & skB are not null

final long minThetaLong = Math.min(skA.getThetaLong(), skB.getThetaLong());

if (skA.isEmpty()) { return skA.compact(); }
if (skB.isEmpty()) { return skA.compact(); }
//Both skA & skB are not empty
//Both skA & skB are not empty, and skB has valid entries

//Process A
final DataArrays<S> da = getDataArraysA(skA);
final long[] hashArrA = da.hashArr;
final S[] summaryArrA = da.summaryArr;
long[] hashArrA = da.hashArr;
hashArrA = (hashArrA == null) ? new long[0] : hashArrA;
final int countA = hashArrA.length;

S[] summaryArrA = da.summaryArr;
if (summaryArrA == null) {
final S summary = skA.getSummaryFactory().newSummary();
final Class<S> summaryType = (Class<S>)summary.getClass();
summaryArrA = (S[]) Array.newInstance(summaryType, 0);
}

if (countA == 0) {
return new CompactSketch<S>(new long[0], summaryArrA, minThetaLong, false);
}

//Process B
final long minThetaLong = Math.min(skA.getThetaLong(), skB.getThetaLong());
final DataArrays<S> daB = getResultArraysTuple(minThetaLong, countA, hashArrA, summaryArrA, skB);

final long[] hashArr = daB.hashArr;
final S[] summaryArr = daB.summaryArr;
final int curCountOut = hashArr.length;
Expand Down Expand Up @@ -287,27 +331,40 @@ CompactSketch<S> aNotB(final Sketch<S> skA, final Sketch<S> skB) {
* @param <S> Type of Summary
* @return the result as an unordered {@link CompactSketch}
*/
@SuppressWarnings("unchecked")
public static <S extends Summary>
CompactSketch<S> aNotB(final Sketch<S> skA, final org.apache.datasketches.theta.Sketch skB) {
if (skA == null || skB == null) {
throw new SketchesArgumentException("Neither argument may be null");
}
//Both skA & skB are not null

final long minThetaLong = Math.min(skA.getThetaLong(), skB.getThetaLong());

if (skA.isEmpty()) { return skA.compact(); }
if (skB.isEmpty()) { return skA.compact(); }
//Both skA & skB are not empty
if (skB.isEmpty() && skB.getRetainedEntries() == 0) { return skA.compact(); }
//Both skA & skB are not empty, and skB has valid entries

//Process A
final DataArrays<S> da = getDataArraysA(skA);
final long[] hashArrA = da.hashArr;
final S[] summaryArrA = da.summaryArr;
long[] hashArrA = da.hashArr;
hashArrA = (hashArrA == null) ? new long[0] : hashArrA;
final int countA = hashArrA.length;

S[] summaryArrA = da.summaryArr;
if (summaryArrA == null) {
final S summary = skA.getSummaryFactory().newSummary();
final Class<S> summaryType = (Class<S>)summary.getClass();
summaryArrA = (S[]) Array.newInstance(summaryType, 0);
}

if (countA == 0) {
return new CompactSketch<S>(new long[0], summaryArrA, minThetaLong, false);
}

//Process B
final long minThetaLong = Math.min(skA.getThetaLong(), skB.getThetaLong());
final DataArrays<S> daB = getResultArraysTheta(minThetaLong, countA, hashArrA, summaryArrA, skB);

final DataArrays<S> daB = getResultArraysTheta(minThetaLong, countA, hashArrA, summaryArrA, skB);
final long[] hashArr = daB.hashArr;
final S[] summaryArr = daB.summaryArr;
final int countOut = hashArr.length;
Expand Down
Expand Up @@ -96,7 +96,7 @@ private enum Flags { IS_BIG_ENDIAN, IS_EMPTY, HAS_ENTRIES, IS_THETA_INCLUDED }
final boolean hasEntries = (flags & 1 << Flags.HAS_ENTRIES.ordinal()) > 0;
if (hasEntries) {
int classNameLength = 0;
if (version == serialVersionWithSummaryClassNameUID) {
if (version == serialVersionWithSummaryClassNameUID) { //Obsolete?
classNameLength = mem.getByte(offset++);
}
final int count = mem.getInt(offset);
Expand Down
22 changes: 19 additions & 3 deletions src/main/java/org/apache/datasketches/tuple/Intersection.java
Expand Up @@ -107,14 +107,19 @@ public void intersect(final Sketch<S> tupleSketch) {
if (tupleSketch == null) { throw new SketchesArgumentException("Sketch must not be null"); }
final boolean firstCall = firstCall_;
firstCall_ = false;
final boolean emptyIn = tupleSketch.isEmpty();
if (empty_ || emptyIn) { //empty rule
//Because of the definition of null above and the Empty Rule (which is OR), empty_ must be true.
//Whatever the current internal state, we make our local empty.
resetToEmpty();
return;
}

// input sketch could be first or next call
final long thetaLongIn = tupleSketch.getThetaLong();
final int countIn = tupleSketch.getRetainedEntries();
thetaLong_ = min(thetaLong_, thetaLongIn); //Theta rule
// Empty rule extended in case incoming sketch does not have empty bit properly set
final boolean emptyIn = countIn == 0 && thetaLongIn == Long.MAX_VALUE;
empty_ |= emptyIn; //empty rule

if (countIn == 0) {
hashTables_.clear();
return;
Expand Down Expand Up @@ -274,12 +279,23 @@ public boolean hasResult() {
* Resets the internal set to the initial state, which represents the Universal Set
*/
public void reset() {
hardReset();
}

private void hardReset() {
empty_ = false;
thetaLong_ = Long.MAX_VALUE;
hashTables_.clear();
firstCall_ = true;
}

private void resetToEmpty() {
empty_ = true;
thetaLong_ = Long.MAX_VALUE;
hashTables_.clear();
firstCall_ = false;
}

static int getLgTableSize(final int count) {
final int tableSize = max(ceilingPowerOf2((int) ceil(count / 0.75)), 1 << MIN_LG_NOM_LONGS);
return Integer.numberOfTrailingZeros(tableSize);
Expand Down
Expand Up @@ -51,7 +51,7 @@ private enum Flags { IS_BIG_ENDIAN, IS_IN_SAMPLING_MODE, IS_EMPTY, HAS_ENTRIES,
private int lgCurrentCapacity_;
private final int lgResizeFactor_;
private int count_;
private final SummaryFactory<S> summaryFactory_;
//private final SummaryFactory<S> summaryFactory_;
private final float samplingProbability_;
private int rebuildThreshold_;
private long[] hashTable_;
Expand Down Expand Up @@ -127,8 +127,8 @@ private enum Flags { IS_BIG_ENDIAN, IS_IN_SAMPLING_MODE, IS_EMPTY, HAS_ENTRIES,
nomEntries_ = ceilingPowerOf2(nomEntries);
lgResizeFactor_ = lgResizeFactor;
samplingProbability_ = samplingProbability;
summaryFactory_ = summaryFactory;
thetaLong_ = (long) (Long.MAX_VALUE * (double) samplingProbability);
summaryFactory_ = summaryFactory; //super
thetaLong_ = (long) (Long.MAX_VALUE * (double) samplingProbability); //super
lgCurrentCapacity_ = Integer.numberOfTrailingZeros(startingSize);
hashTable_ = new long[startingSize];
summaryTable_ = null; // wait for the first summary to call Array.newInstance()
Expand Down Expand Up @@ -292,7 +292,8 @@ public void reset() {
@SuppressWarnings("unchecked")
public CompactSketch<S> compact() {
if (getRetainedEntries() == 0) {
return new CompactSketch<>(null, null, thetaLong_, empty_);
if (empty_) { return new CompactSketch<>(null, null, Long.MAX_VALUE, true); }
return new CompactSketch<>(null, null, thetaLong_, false);
}
final long[] hashArr = new long[getRetainedEntries()];
final S[] summaryArr = (S[])
Expand Down Expand Up @@ -424,10 +425,6 @@ void setEmpty(final boolean value) {
empty_ = value;
}

SummaryFactory<S> getSummaryFactory() {
return summaryFactory_;
}

int findOrInsert(final long hash) {
final int index = HashOperations.hashSearchOrInsert(hashTable_, lgCurrentCapacity_, hash);
if (index < 0) {
Expand Down

0 comments on commit 8f7a58d

Please sign in to comment.