New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An analogue to Iterator for things that might throw, be closeable, block, etc. #973

Open
gissuebot opened this Issue Oct 31, 2014 · 19 comments

Comments

Projects
None yet
3 participants
@gissuebot

gissuebot commented Oct 31, 2014

Original issue created by pettermahlen on 2012-04-16 at 05:56 PM


I have a use case where we're iterating over large files, and want to be able to (sometimes) continue processing the file in spite of errors for single entries. In that regard, there are two problems with the current AbstractIterator implementation:

  1. Any exception (from, for instance, being unable to parse the next entry in the file) will be thrown from the hasNext() method, making it impossible to write

while (iterator.hasNext()) {
  try {
     iterator.next();
  }
  catch (SomeException e)
}

  1. Once an exception has been thrown, the AbstractIterator will be in State.FAILED, meaning that no further processing can happen.

I've modified (and renamed) the AbstractIterator class as shown here: https://gist.github.com/2400295. Does this seem like a useful feature?

In implementation terms, I'm not sure that the mechanism of storing exception to wrap+rethrow later is that great, but it's the best I could think of.

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by kevinb@google.com on 2012-04-16 at 06:36 PM


(No comment entered for this change.)


Status: Acknowledged
Labels: Type-Enhancement, Package-Collect

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by cpovirk@google.com on 2012-05-10 at 07:45 PM


Iterators generally aren't expected to throw exceptions except in the case of programmer error, after which it's typically not possible to forge ahead with the remaining elements. Deviating from this may be useful on occasion, but I'd be nervous "endorsing" it by providing library support, particularly when our other Iterator utilities are likely to interact poorly with such an Iterator.

That said, I'm sympathetic to the idea of a nice, common interface to "heavier" streams than what Iterator is used with, and we've had various internal discussions about a "CheckedIterator." It could solve a number of problems:

  • hasNext()/next(), as check-then-act API, is poor for multiple client threads.
  • The methods could be made into proper blocking methods, with optional support for interruption and timeouts.
  • The interface might be parameterized on an exception type as well as on a value type so that the methods can declared "throws X" instead of a generic "throws Exception."
  • The underlying operation may require a close() call at the end, and we could provide separate utility methods that make this harder to forget (and that know what to do in the case of an exception, as I described earlier, and in the case of an interruption/timeout).
  • The type won't look like an Iterator (as described above), and there will be less temptation to make the producer look like an Iterable, as initializing the stream may be expensive (and perhaps initializing it multiple times is outright impossible).

We may one day add such a type, but it's far off. As far as this issue goes, I would suggest any of:

  1. Wait for us. (But we're talking months to years of waiting, so I recommend against it.)
  2. Use your custom class. (I'm hesitant to put it in Guava as described above, but you may judge its benefits to outweigh its downsides in your particular use case.)
  3. Use an Iterator<ValueOrException> or Queue<ValueOrException>, where ValueOrException is some custom class containing, well, you know...

As for this issue, I could generalize it to be about CheckedIterator, or you could make a renewed pitch for your class, or we could close it. Any preferences? In any case, thanks for the submission.

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by pettermahlen on 2012-05-11 at 07:27 AM


I agree completely with your comments. I never felt really comfortable that the Iterator implementation I submitted doesn't properly support foreach(), for one thing. So the fit isn't ideal and I don't really want to make a pitch for that class.

However, I might be interested in experimenting a little bit with a version/implementation of the alternative interface you mention, out of personal interest mostly. Do you have any more details about the requirements you have in mind?

As for the issue, I have no preferences, feel free to close it or generalise it. Thanks for the thoughtful evaluation!

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by cpovirk@google.com on 2012-05-11 at 01:47 PM


I think I got most of it, but here are a few other things that people have pointed out:

  • Probably we don't need remove().
  • The close() method should be part of implementing Closeable so that, under JDK7, the iterator can be used with "try with resources."
  • The interface might look a lot like Queue (but without the insertion methods) if we want full generality. (Possibly it would also benefit from methods like those in Uninterruptibles.)

Now that I've dumped all that here, I might as well repurpose this bug for anything that you or others have to add. (We still probably won't get to it any time soon, but we might someday, and even if we don't, any discoveries might be useful to others writing their own.)


Labels: -Package-Collect, Package-Concurrent

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by pettermahlen on 2012-05-14 at 12:47 PM


Your description - as well as my own use case - makes me wonder about the name of the type/s. If it's not an iterator, it should probably not be called anything to do with Iterator as that is a very specific and strong concept in Java. I was thinking about alternatives that are closer to Queue, such as Sequence/Succession/Series, when it struck me that what I want it for is a producer/consumer scenario, and it sounds like that's what you're describing too. So maybe Producer is a better name? It feels like it will be something that will be able to give you some undetermined/undeterminable number of things until it runs out, and then it should be closeable, which matches a Producer pretty well. I was also thinking about Generator (as it sounds a little like Iterator), but when I came up with Producer, I liked that better.

With the above reasoning, the main differences between a Producer and a Queue would be:

  • No visibility of the producer side - no methods for adding things.
  • The ability to explicitly close it, so it would have a lifecycle.
  • Less ability to manipulate the queue - no checking of anything but the head, no removal (but then remove would be the same as next + discard).
    It would be more limited and more focused. I think I would rather use an interface like that than a regular Queue in the case I've got, but I'm not sure if it's different enough.

Regarding the exception handling, I was wondering if it might be a good idea to have different versions of the interface, something like:

public interface ThrowingProducer<T, E> extends Closeable {
   T next(long timeout, TimeUnit timeUnit) throws InterruptedException, E; // etc.
}

public interface UninterruptibleThrowingProducer<T, E> extends ThrowingProducer<T, E> {
   @Override
   T next(long timeout, TimeUnit timeUnit) throws E; // removes InterruptedException from the method signature.
}

public interface Producer<T> extends ThrowingProducer<T, Exception> {
  // no E, meaning you get one less checked exception to deal with.
   @Override
   T next(long timeout, TimeUnit timeUnit) throws InterruptedException;
}

For the uninterruptible version, it would be easy to wrap an interruptible Producer with something that forwards the call like the methods in Uninterruptibles do:

public class Producers {
  <T> UninterruptibleProducer<T> uninterruptible(Producer<T> toWrap) {
    return new UninterruptiblyForwardingProducer(toWrap);
  }
}

I don't like the combinatorial explosion of interfaces, but I also don't particularly like the idea of having four versions of each method in the API: no timeout, explicit timeout, uninterruptible without timeout and uninterruptible with timeout. And I somehow would prefer not to keep adding methods to the Uninterruptibles class as well.

Any comments on the above? Is any of it worth sketching out in code?

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by cpovirk@google.com on 2012-05-14 at 06:59 PM


You may be right about distancing this from the "Iterator" name. "Producer" happens to have some Google-internal baggage, but maybe "Generator" is the way to go.

Exception-handling: Hmm. We've tried to stay out of the business of UninterruptibleFoo after a mildly bad experience with UninterruptibleFuture, preferring instead the Uninterruptibles.fooUninterruptibly methods (or, on occasion, foo and fooUninterruptibly methods both on the interface itself). That's partially because of the combinatorial explosion that you mention.

For the exception type, we could go a couple directions:

  • Throw ExecutionException, as in Future and Cache. Maybe provide wrappers along the lines of Futures.get(Cache, Class<? extends Exception>).
  • Throw a generic type (as we've discussed so far).

If we go with the latter, I'm inclined to give the short name ("Generator" or whatever, rather than "ThrowingGenerator") to the parameterized type, and I'd probably skip a subtype for <RuntimeException>, both because users probably don't have to write it more than once and because this "typedef" pattern isn't as seamless as a real typedef (notably, you can't assign a ThrowingGenerator<E, RuntimeException> to a NonThrowingGenerator<E>).

I hesitate to say "Code is up" because I admit that I've gotten far enough behind on other stuff that it will be hard to prioritize this, but this remains something that we should give serious attention to... at some point.

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by kevinb@google.com on 2012-05-15 at 03:08 PM


Agree to avoid "iterator" in the name. Consider Source<T> (or Source<T, X>).

Null shouldn't be supported. Null return from next() should definitively mean end of stream.

Let's consider only the throwing case. (e.g. call ThrowingProducer just Producer, or whatever).

Should we consider adding the same goodies we have on FluentIterable directly to this?


Status: Accepted

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by em...@soldal.org on 2012-05-15 at 09:18 PM


Source makes me think it will be related to Sink from the hashing package...

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by kevinb@google.com on 2012-05-16 at 06:42 PM


That's exactly why that class is called PrimitiveSink instead of just Sink. :-)

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by pettermahlen on 2012-05-30 at 03:13 PM


I haven't been able to find any contiguous time to work on this yet, but I've been thinking a little about it every now and then. One thing that I'm struggling with is clearly defining the rationale for the Producer - the discussion has been very much in terms of what features and behaviours are desirable, but not so much yet about why and what use cases to support, or why those use cases can't be handled by existing abstractions.

Here's our use case (which repeats with some variation in a couple of different services) with a little more detail than in the original issue:

  1. We need to parse and process files that are too large to fit in memory.
  2. The data in the files is unreliable, but we want to be able to use all the data we can parse, and report on the rest.
  3. We want to be able to report syntactical and semantic errors (detected during processing) using the same logic.
  4. Processing may be slow and may require external service calls, hence concurrent processing of parsed entries would be useful to reduce total processing latency.

So, a Producer (I personally like that name best, and not being a Google employee, I'm happy to disregard Google-internal baggage ;) - if that's unrealistic, my second choice would be Source, I think) as discussed up to this point would be a perfect fit. The Producer could do the parsing, and would simply throw a ParseException for bad entries. Later validation can also report errors, and something at the top can collect the information and aggregate and report it.

It would be interesting to get some input about other use cases.

I've also thought a bit about differentiation from other abstractions, in particular BlockingQueue and Iterator. Here's what I think would distinguish Producers from the above:

  • It doesn't define how things are produced, unlike a BlockingQueue.
  • It can throw exceptions, unlike BlockingQueue and Iterator.
  • It is intended for concurrent usage, unlike Iterator.
  • It defines a lifecycle (implements Closeable), unlike BlockingQueue and Iterator. I think there's another lifecycle aspect in that if null is used to signal 'end of stream', then once a Producer has returned null, it should return null for all subsequent requests.
  • The 'remove' operation as defined in Iterator doesn't make sense: I think the Producer should define about three ways to interact with it: next(), peek() and hasData(), where the first two are blocking and would have versions with timeouts, and the last is non-blocking. next() removes the next entry, whereas peek() just inspects it. Another option for peek() is to make it throw a NoSuchElementException, like BlockingQueue.element(), if there is no data. The point of the hasData() call would be to give consumers some approximate idea about whether they will have to wait or not for new data, which might be useful for instrumentation of an application.
  • size(), contains(), etc., also don't make sense.

In terms of our use case, the desire to collect statistics about individual entries using a single mechanism is what drives the need to throw exceptions, and is the main thing that makes it less than ideal to use a queue or iterator. There are adequate ways around the concurrent consumers and lifecycle management aspects, I think, although it would be somewhat nicer to have that baked into the abstraction.

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by wasserman.louis on 2012-05-30 at 03:34 PM


I guess I must admit that I'm not sure how generally applicable this kind of thing would be. It feels to me more like it might be better modeled as a stream processing sort of thing than an iteration thing?

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by kevinb@google.com on 2012-05-30 at 07:45 PM


(No comment entered for this change.)


Labels: -Type-Enhancement, Type-Addition

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by pettermahlen on 2012-05-31 at 08:10 AM


Obviously, I also have doubts about general applicability - but I don't quite understand what you mean by stream processing as opposed to iteration. How would that look?

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by wasserman.louis on 2012-05-31 at 08:45 AM


I'm not...quite sure yet, but the description of your application reminds me extremely strongly of Haskell stream processing abstractions and iteratees. Honestly, I don't understand that abstraction especially well myself at the moment, but...what I do know is that it's a very powerful abstraction for incremental, composable, and parallel parsing and processing operations, which sounds very much like your use case.

I don't know how much of it could be translated over to Java, but if we're hunting for abstractions to match this use case, there may be some ideas worth stealing there.

After this morning, I'll have the free time to do some more research, but if you're interested, the best sources I've located are probably
http://okmij.org/ftp/Streams.html (Oleg Kiselyov, who is generally credited with developing the abstraction)
http://themonadreader.wordpress.com/2010/05/12/issue-16/ pages 19-36 (from the Monad.Reader, a sort-of-journal; this is a gentler introduction than Oleg's page)

@gissuebot

This comment has been minimized.

gissuebot commented Oct 31, 2014

Original comment posted by wasserman.louis on 2012-05-31 at 08:55 AM


Other things that immediately pop out of those pages that seem appropriate to this discussion:
  * "all iteratee errors are hence potentially restartable"
  * "incremental processing of a single stream by several consumers in parallel" -- this doesn't refer to parallelism per se, but doing several independent operations over the same stream, but I nevertheless suspect that "parallelism" in the sense of multiple asynchronous operations could be worked in there.

Anyway, I'm going to play around with this some after this morning, even if it's entirely possible I'm on crack, and overkilling the problem.

@gissuebot

This comment has been minimized.

gissuebot commented Nov 1, 2014

Original comment posted by pettermahlen on 2012-08-24 at 01:49 PM


I've had a further look at this problem now, and I have some more opinions. First, I've changed my mind about the use of the name Producer - Producer/Consumer is about having (possibly) multiple producers that hand over work to multiple consumers. This discussion has been more about making it possible for multiple things to read from a single source (there you go), and process the contents they read. I don't think there's much generic value to be added on the producer side of things in that scenario, but there might be on the processing side. The producer side is, I think, very problem-specific. I also think that (at least in our case), the preferred solution in a multi-threaded application would be to have the processing-side threads pull stuff out of the producer; that is, consumer-side threads executing code on the producer side of the API. If so, there would be no need for InterruptedException, timeouts, etc. It seems like producers pushing things means you have to have a queue, and there's already a perfectly good API for that in the JDK.

On the processing side of things, there might be some more generally applicable things to do. Maybe having something like a reversed Callable (Invokable? Processor?) whose single method would be 'void invoke(T parameter)', and some code for fluently wiring up a source and some processors with specified amounts of parallelism, etc.

I played around with adapting our code based on those insights, but I don't think there's enough payoff in changing it, so I'll have to keep squirming a little in my seat every time I look at the iterator-based code that shouldn't have been iterator-based... At least it works, and I think I learned something in the process. :)

@gissuebot

This comment has been minimized.

gissuebot commented Nov 1, 2014

Original comment posted by cpovirk@google.com on 2013-08-20 at 06:56 PM


Issue #1511 has been merged into this issue.

@gissuebot

This comment has been minimized.

gissuebot commented Nov 1, 2014

Original comment posted by michael.deardeuff on 2014-02-28 at 03:06 AM


I think this issue would probably duplicate the work of the RxJava, though the contract of RxJava is push-based instead of pull-based like an Iterator.

@hepin1989

This comment has been minimized.

hepin1989 commented Apr 12, 2016

haven't found Consumer<T>:(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment