Skip to content
Permalink
Browse files
For the various Yielder objects, don't create new Yielders and instea…
…d mutate state. (#12475)

Co-authored-by: imply-cheddar <86940447+imply-cheddar@users.noreply.github.com>
  • Loading branch information
gianm and imply-cheddar committed Apr 27, 2022
1 parent 2fe053c commit e7e49ec9c857641efe1548f438e2d203248de7ea
Showing 5 changed files with 82 additions and 74 deletions.
@@ -78,7 +78,23 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, final YieldingAcc
final Yielder<T> baseYielder = baseSequence.toYielder(null, combiningAccumulator);

try {
return makeYielder(baseYielder, combiningAccumulator, false);
// If the yielder is already done at this point, that means that it ran through all of the inputs
// without hitting a yield(), i.e. it's effectively just a single accumulate() call. As such we just
// return a done yielder with the correct accumulated value.
if (baseYielder.isDone()) {
if (combiningAccumulator.accumulatedSomething()) {
combiningAccumulator.accumulateLastValue();
}
// If we yielded, then the expectation is that we get a Yielder with the yielded value, followed by a done
// yielder. This will happen if we fall through to the normal makeYielder. If the accumulator did not yield
// then the code expects a single Yielder that returns whatever was left over from the accumulation on the
// get() call.
if (!combiningAccumulator.yielded()) {
return Yielders.done(combiningAccumulator.getRetVal(), baseYielder);
}
}

return makeYielder(baseYielder, combiningAccumulator);
}
catch (Throwable t1) {
try {
@@ -94,52 +110,37 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, final YieldingAcc

private <OutType> Yielder<OutType> makeYielder(
final Yielder<T> yielder,
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator,
boolean finalValue
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator
)
{
final Yielder<T> finalYielder;
final OutType retVal;
final boolean finalFinalValue;

if (!yielder.isDone()) {
retVal = combiningAccumulator.getRetVal();
finalYielder = null;
finalFinalValue = false;
} else {
if (!finalValue && combiningAccumulator.accumulatedSomething()) {
combiningAccumulator.accumulateLastValue();
retVal = combiningAccumulator.getRetVal();
finalFinalValue = true;

if (!combiningAccumulator.yielded()) {
return Yielders.done(retVal, yielder);
} else {
finalYielder = Yielders.done(null, yielder);
}
} else {
return Yielders.done(combiningAccumulator.getRetVal(), yielder);
}
}


return new Yielder<OutType>()
{
private Yielder<T> myYielder = yielder;
private CombiningYieldingAccumulator<OutType, T> accum = combiningAccumulator;

@Override
public OutType get()
{
return retVal;
return accum.getRetVal();
}

@Override
public Yielder<OutType> next(OutType initValue)
{
combiningAccumulator.reset();
return makeYielder(
finalYielder == null ? yielder.next(yielder.get()) : finalYielder,
combiningAccumulator,
finalFinalValue
);
accum.reset();
if (myYielder.isDone()) {
return Yielders.done(null, myYielder);
}

myYielder = myYielder.next(myYielder.get());
if (myYielder.isDone() && accum.accumulatedSomething()) {
accum.accumulateLastValue();
if (!accum.yielded()) {
return Yielders.done(accum.getRetVal(), myYielder);
}
}

return this;
}

@Override
@@ -151,7 +152,7 @@ public boolean isDone()
@Override
public void close() throws IOException
{
yielder.close();
myYielder.close();
}
};
}
@@ -66,7 +66,19 @@ public <OutType> Yielder<OutType> toYielder(
final IterType iterator = maker.make();

try {
return makeYielder(initValue, accumulator, iterator);
OutType retVal = initValue;
while (!accumulator.yielded() && iterator.hasNext()) {
retVal = accumulator.accumulate(retVal, iterator.next());
}

if (!accumulator.yielded()) {
return Yielders.done(
retVal,
(Closeable) () -> maker.cleanup(iterator)
);
}

return makeYielder(retVal, accumulator, iterator);
}
catch (Throwable t) {
try {
@@ -80,47 +92,34 @@ public <OutType> Yielder<OutType> toYielder(
}

private <OutType> Yielder<OutType> makeYielder(
final OutType initValue,
final OutType retValue,
final YieldingAccumulator<OutType, T> accumulator,
final IterType iter
)
{
OutType retVal = initValue;
while (!accumulator.yielded() && iter.hasNext()) {
retVal = accumulator.accumulate(retVal, iter.next());
}

if (!accumulator.yielded()) {
return Yielders.done(
retVal,
(Closeable) () -> maker.cleanup(iter)
);
}

final OutType finalRetVal = retVal;
return new Yielder<OutType>()
{
OutType retVal = retValue;

@Override
public OutType get()
{
return finalRetVal;
return retVal;
}

@Override
public Yielder<OutType> next(OutType initValue)
{
accumulator.reset();
try {
return makeYielder(initValue, accumulator, iter);
retVal = initValue;
while (!accumulator.yielded() && iter.hasNext()) {
retVal = accumulator.accumulate(retVal, iter.next());
}
catch (Throwable t) {
try {
maker.cleanup(iter);
}
catch (Exception e) {
t.addSuppressed(e);
}
throw t;

if (accumulator.yielded()) {
return this;
} else {
return Yielders.done(retVal, this);
}
}

@@ -26,7 +26,7 @@

final class WrappingYielder<OutType> implements Yielder<OutType>
{
private final Yielder<OutType> baseYielder;
private Yielder<OutType> baseYielder;
private final SequenceWrapper wrapper;

WrappingYielder(Yielder<OutType> baseYielder, SequenceWrapper wrapper)
@@ -50,7 +50,8 @@ public Yielder<OutType> next(final OutType initValue)
@Override
public Yielder<OutType> get()
{
return new WrappingYielder<>(baseYielder.next(initValue), wrapper);
baseYielder = baseYielder.next(initValue);
return WrappingYielder.this;
}
});
}
@@ -26,9 +26,13 @@
* necessarily good at this job, but it works. I think.
*
* Essentially, you can think of a Yielder as a linked list of items where the Yielder gives you access to the current
* head via get() and it will give you another Yielder representing the next item in the chain via next(). A Yielder
* that isDone() may return anything from both get() and next(), there is no contract and depending on those return
* values will likely lead to bugs.
* head via get() and it will give you another Yielder representing the next item in the chain via next(). When using
* a yielder object, a call to yield() on the yielding accumulator will result in a new Yielder being returned whose
* get() method will return the return value of the accumulator from the call that called yield().
*
* When a call to next() exhausts the underlying data stream without having a yield() call, various implementations
* of Sequences and Yielders assume that they will receive a Yielder where isDone() is true and get() will return the
* accumulated value up until that point.
*
* Once next is called, there is no guarantee and no requirement that references to old Yielder objects will continue
* to obey the contract.
@@ -60,9 +64,8 @@
Yielder<T> next(T initValue);

/**
* Returns true if this is the last Yielder in the chain. A Yielder that isDone() may return anything
* from both get() and next(), there is no contract and depending on those return values will likely lead to bugs.
* It will probably break your code to call next() on a Yielder that is done and expect something good from it.
* Returns true if this is the last Yielder in the chain. Review the class level javadoc for an understanding
* of the contract for other methods when isDone() is true.
*
* Once next() is called on this Yielder object, all further operations on this object are undefined.
*
@@ -26,6 +26,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.ExplodingSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -254,6 +255,8 @@ private void testCombining(
int limit
) throws Exception
{
final String prefix = StringUtils.format("yieldEvery[%d], limit[%d]", yieldEvery, limit);

// Test that closing works too
final CountDownLatch closed = new CountDownLatch(1);
final Closeable closeable = closed::countDown;
@@ -276,7 +279,7 @@ private void testCombining(

List<Pair<Integer, Integer>> merged = seq.toList();

Assert.assertEquals(expected, merged);
Assert.assertEquals(prefix, expected, merged);

Yielder<Pair<Integer, Integer>> yielder = seq.toYielder(
null,
@@ -318,16 +321,17 @@ public boolean apply(
}
);

int i = 0;
if (expectedVals.hasNext()) {
while (!yielder.isDone()) {
final Pair<Integer, Integer> expectedVal = expectedVals.next();
final Pair<Integer, Integer> actual = yielder.get();
Assert.assertEquals(expectedVal, actual);
Assert.assertEquals(StringUtils.format("%s, i[%s]", prefix, i++), expectedVal, actual);
yielder = yielder.next(actual);
}
}
Assert.assertTrue(yielder.isDone());
Assert.assertFalse(expectedVals.hasNext());
Assert.assertTrue(prefix, yielder.isDone());
Assert.assertFalse(prefix, expectedVals.hasNext());
yielder.close();

Assert.assertTrue("resource closed", closed.await(10000, TimeUnit.MILLISECONDS));

0 comments on commit e7e49ec

Please sign in to comment.