Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockingQueueSource should be closable #176

Open
SevereOverfl0w opened this issue Jun 13, 2019 · 3 comments
Open

BlockingQueueSource should be closable #176

SevereOverfl0w opened this issue Jun 13, 2019 · 3 comments
Labels
bug Confirmed bug

Comments

@SevereOverfl0w
Copy link
Contributor

(def a (s/->source (java.util.concurrent.LinkedBlockingQueue. [1 2 3])))
(def b (s/batch 100 1 a))

(s/close! a)
(s/description a)
;; {:type "java.util.concurrent.LinkedBlockingQueue", :buffer-size 0, :source? true}
(s/description b)
;; {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true}

I guess this is because the queue is not closable. Would be nice to have the downstreams disconnect in this case though.

@KingMob
Copy link
Collaborator

KingMob commented Jul 4, 2021

Unless overridden, the default impl of .close does nothing, and it's not currently overridden for BlockingQueueSource. I agree the "source-wrapped" version should be closable, though, and prevent further takes.

EDIT 2023-2-28: Oops, slosing should prevent puts, not takes. Takes can happen until drained

@KingMob KingMob added the bug Confirmed bug label Jul 4, 2021
@KingMob KingMob changed the title Closing a converted LBQ does nothing BlockinkQueueSource should be closable Jul 4, 2021
@KingMob KingMob changed the title BlockinkQueueSource should be closable BlockingQueueSource should be closable Jul 4, 2021
@cosineblast
Copy link
Collaborator

cosineblast commented Feb 28, 2023

This doesn't seem to be a particular issue with BlockingQueueSource honestly.

(defn foo []
  (let [a (s/stream 4)
        b (s/batch 100 1 a)]

    (s/put-all! a [1 2 3 4])

    (s/close! a)

    (println (s/description b))))

This produces exactly the same behavior, using a regular stream.

However, by removing the (s/put-all! a [1 2 3 4]) call, b is closed when a is closed. I believe the reason for that is because when there's nothing to take from a, the pending takes implied by batch end up being stored in a's consumers, and then the close! call resolves those consumers with their default values, allowing the graph to close downstream, in this case, b. This doesn't happen when the take!s are resolved by put!s since the consumers are properly resolved and not reported to the graph.

@KingMob
Copy link
Collaborator

KingMob commented Feb 28, 2023

There's a bit of a race condition here, though. Behind the scenes, multiple threads are being used, so it's completely possible for the description of b to be printed out before the closing of a has fully propagated its effects to b. I ran that code snippet multiple times, and got different results.

In general, ordering around closing can get a little weird. Unfortunately, the current default close! behavior doesn't return a deferred we could wait on...should probably fix that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Confirmed bug
Projects
None yet
Development

No branches or pull requests

3 participants