Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered-stream goes over capacity when put! and try-put! don't respect backpressure #110

Open
jeroenvandijk opened this issue Oct 5, 2016 · 4 comments

Comments

@jeroenvandijk
Copy link

manifold.stream/buffered-stream allows a bigger buffer-size than what is configured when using s/put! or s/try-put! and ignoring backpressure:

(require '[manifold.stream :as s])

(let [s (s/buffered-stream 10)
   f (future
       (dotimes [i 100]
         (s/put! s i)))]


;; Wait a bit to fill up the stream
(Thread/sleep 100)

(s/description s) ;#=> {:buffer-capacity 10, :buffer-size 100, :closed? false, :drained? false, :pending-puts 0, :pending-takes 0, :permanent? false, :sink? true, :source? true, :type "manifold"}

The same happens with manifold.stream/try-put!. I've confirmed that manifold.stream/stream doesn't have this issue.

This was observed with [manifold "0.1.6-alpha1"] and [org.clojure/clojure "1.8.0"] and Java 8

Here is a complete test file with all these cases https://gist.github.com/jeroenvandijk/a753b5945b6cd1f8fa8811e4c393f974

Note that there is no limit on the buffer-size at all. In production I've seen buffers go over their configured limit times 100.

Thanks to @dm3 for narrowing down this issue.

@jeroenvandijk
Copy link
Author

And maybe this is a related, but different issue https://gist.github.com/jeroenvandijk/a753b5945b6cd1f8fa8811e4c393f974#file-manifold_test-clj-L90 i.e. I think it means buffered-stream can not be used with an event-bus at the moment.

@ztellman
Copy link
Collaborator

Hey, sorry for not responding to this sooner. In general, if you don't respect backpressure than things won't work very well in Manifold. You'll notice that even in your examples where it's "respected", there are still a ton of entries under :pending-puts (https://gist.github.com/jeroenvandijk/a753b5945b6cd1f8fa8811e4c393f974#file-manifold_test-clj-L85). If that fills up past 16384, and error will be thrown and the stream will be automatically closed.

In general, I'd say this is not a bug so much as a quirk in how the description in buffered-stream captures the fact that the producer isn't respecting backpressure. I'm not sure I understand what you mean by buffered-stream not being able to be used with event-bus. Can you clarify?

@cosineblast
Copy link
Collaborator

The documentation for buffered-stream says:

A stream which will buffer at most limit data, where the size of each message is defined by (metric message).

But in the provided example, while the :buffer-capacity is 10, :buffer-size is 100 and :pending-puts is 0. Shouldn't the :buffer-size be 10 and :pending-puts 90 here?

@KingMob
Copy link
Collaborator

KingMob commented Feb 26, 2023

Well, sort of. As Zach mentions, it's a quirk of how description reports on a BufferedStream, though it could definitely be changed to make more sense.

The first thing to understand is that the exceeding your buffer size/limit doesn't mean items are dropped, it just means that the associated put! deferreds aren't resolved immediately. Instead, excess items are added separately and their put! deferreds are resolved only when added to the buffer or take!n out of the stream. When you're below the limit, the put! deferreds resolve immediately. If you have no buffer, they're resolved only once there's a corresponding take! Core.async behaves the exact same way.

Now, the key difference between using buffered-stream and calling (stream buffer-size) is that buffered-stream can accept a metric function, while (stream buffer-size) operates purely on the number of items.

This means BufferedStream has to keep track of how much of the "limit" is "consumed" separately from just counting the number of items it has. It does this by placing them all on the underlying stream anyway, and not resolving the put! deferreds while you're above the limit. (Though, hmm, there may be a bug in the buf+ logic, where all excess puts are resolved at once when you go below the limit....)

Regular streams with a buffer-size do something similar, where excess puts just end up on a producers list, but their deferreds aren't resolved until they get off that list.

In both cases, the deferreds return by put! won't resolve until the buffer has room, and the excess puts can be taken as soon you get through all the prior puts, no matter where they're tracked. But since BufferedStream isn't relying on the size of the producers list to know its remaining capacity, description reports it a little oddly. It's technically correct, but confusing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants