Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Navigating the stream graph upstream #85

Open
dm3 opened this issue Jul 14, 2016 · 2 comments
Open

Navigating the stream graph upstream #85

dm3 opened this issue Jul 14, 2016 · 2 comments

Comments

@dm3
Copy link
Contributor

dm3 commented Jul 14, 2016

Would it make sense to add an manifold.stream/upstream analogous to already existing manifold.stream/downstream? I don't think it would be needed for any functionality currently, but would be great for graph navigation purposes.

In our use-case we need to reconstruct stream topology starting from the sink. It seems limiting being able to get the graph from the source but not from the sink.

Would you accept such a PR?

@ztellman
Copy link
Collaborator

The reason this doesn't already exist is because it would introduce memory
leaks unless it is very carefully designed. I've been poking at something
in this vein, but don't have a timeline for release. I would be unlikely to
accept a PR unless it came with a convincing argument for why no memory
leaks would result.

If you can give me more detail about your use case, though, I may be able
to suggest an alternative.
On Thu, Jul 14, 2016 at 1:09 AM Vadim Platonov notifications@github.com
wrote:

Would it make sense to add an upstream analogous to already existing
downstream? I don't think it would be needed for any functionality
currently, but would be great for graph navigation purposes.

In our use-case we need to reconstruct stream topology starting from the
sink. It seems limiting being able to get the graph from the source but not
from the sink.

Would you accept such a PR?


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#85, or mute the thread
https://github.com/notifications/unsubscribe/AAB6P2x83ULdEGA9_o77Qnuurk5nddQ0ks5qVe7GgaJpZM4JMLrz
.

@dm3
Copy link
Contributor Author

dm3 commented Jul 14, 2016

Actually, I wrote the following thing as a crutch:

(defn- unwrap [stream]
  (loop [s stream]
    (cond (instance? manifold.stream.Callback s)
          (recur (first (.downstream s)))

          (instance? manifold.stream.SourceProxy s)
          (recur (.source s))

          (instance? manifold.stream.SinkProxy s)
          (recur (.sink s))

          :else s)))

(defn upstream [sink]
  (let [sink (unwrap sink)
        is-sink? (fn [^manifold.stream.graph.Downstream d]
                   (= (unwrap (.sink d)) sink))
        source #(when (some is-sink? (val %))
                  (when-let [k (.get (key %))]
                    [k]))]
    (->> (.entrySet manifold.stream.graph/handle->downstreams)
         (mapcat source))))

which serves our purpose. This will produce inconsistent results if the streams are connected/disconnected concurrently but we have our topology finalized by the time we need this.

EDIT: some unwrapping needed to make the upstream above work for non-trivial cases.

EDIT2: relying on the graph being preserved through all the transformations is error-prone. Consider stream/zip which doesn't connect the input streams to the output stream, but rather relies on the deferred/loop and deferred/zip to do its work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants