Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consuming events #45

Closed
vasily-kirichenko opened this issue Jan 20, 2015 · 2 comments
Closed

Consuming events #45

vasily-kirichenko opened this issue Jan 20, 2015 · 2 comments
Labels

Comments

@vasily-kirichenko
Copy link
Contributor

What's the best way to consume ordinary IEvent (or IObservable)? This is what I'm trying:

type RunningProcess = 
    { LineOutput: Ch<Line>
      ProcessExited: IVar<ExitResult> }

let execute (p: Process) =
    let lineOutput = ch()
    let processExited = ivar()

    let server() =
        Streams.subscribingTo p.OutputDataReceived <| Streams.iterJob (fun args -> 
            if args.Data <> null then 
                lineOutput <-+ args.Data
            else Job.unit())
        >>. 
        (Streams.subscribingTo p.Exited <| Streams.iterJob (fun _ ->
            Job.start (processExited <-= p.ExitCode)))

    start (server())
    if not <| p.Start() then failwithf "Cannot start %s." p.StartInfo.FileName
    p.BeginOutputReadLine()
    { LineOutput = lineOutput; ProcessExited = processExited }      

Here I basically create two streams based on two events and route each event to a dedicated channel.
Client code is as following:

let pr = <create a RunningProcess instance here>
let rec loop() =
    (pr.LineOutput >>=? fun line -> 
        Job.start (job { printfn "Line: %s" line }) >>. loop()) <|>?
    (pr.ProcessExited |>>? fun res ->
        printfn "Exited with code %A." res)
    <|>? ...other Alts...

start (loop())

Besides it does not work, is this code a good use case for streams? Or it's possible to consume event more easily in this case?

@polytypic
Copy link
Member

Cool to see that you are trying out choice streams! It is unfortunate, but the documentation on them is not yet ready. It is also cool to see that you are working on stuff like the process runner. These will make nice practical examples on how to use Hopac.

Basically, you can think of streams as a kind of asynchronous communication channel, similar to multicast channels, that are suitable for one-to-many and many-to-many communication, where each consumer gets to see all the messages and can keep as much of the communication history as desired (usually, though, consumers throw away messages as they consume them).

Regarding the streams usage, I think that in some scenarios they could be a good model for the clients of a process runner. In more complex scenarios streams can also be a good model to use internally, but I think that in this case streams are not perhaps the simplest approach. In this case, I think that a simple approach would be to offer the following kind of interface:

[<NoComparison>]
type RunningProcess = 
    { LineOutput: Alt<Line>
      ProcessExited: Alt<ExitResult> }

As you can see, there is no mention of streams above. The idea above is simply that LineOutput essentially reads from an internal mailbox and ProcessExited reads an internal ivar. execute would then add handlers to write to the mailbox and the ivar:

let execute (p: Process) : RunningProcess =
    let lineOutput = mb ()
    let processExited = ivar ()

    p.OutputDataReceived.Add <| fun args ->
      lineOutput <<-+ args.Data |> start
    p.Exited.Add <| fun _ ->
      processExited <-= Exited p.ExitCode |> start

    if not <| p.Start () then
      failwithf "Cannot start %s." p.StartInfo.FileName

    p.BeginOutputReadLine ()

    { LineOutput = lineOutput
      ProcessExited = processExited }

Note that the above example doesn't remove the event handlers from the process object. That should not be a problem unless one tries to reuse process objects rather than just let them be GC'ed.

BTW, using start like above, to execute a single message passing operation, guarantees that the operations can be/are observed in the order in which the events are triggered. (If you would use queue the lines could be sent to the mailbox in some non-deterministic order.)

Now, a client of that interface can, if so desired, wrap the LineOutput as a stream by saying:

let lineStream: Streams<Line> =
  Streams.foreverJob rp.LineOutput
  |> Streams.takeUntil rp.ProcessExited

The stream bound to the variable lineStream can be used to read all the lines of output assuming that it is the only direct consumer of the LineOutput. The use of Streams.takeUntil also makes it so that the stream ends once the process has been terminated. In many cases it would likely not be necessary to explicitly close the stream.

What you would normally do with the lineStream stream would then be to combine that with some more complex stream processing. For just consuming the output once, streams are overkill.

It is also possible to go from a stream to an alternative:

let lineAlt: Alt<Line> =
  Streams.values lineStream

The elements from the stream can be read one by one from the alternative returned by Streams.values as they are generated. If you look at the implementation of multicast channels (also see CML book), you can see that a combination of a stream and alternative created using Streams.values is almost exactly like a multicast channel with a port. Just like with multicast channels, Streams.values can be used any number of times to create independent "ports" to read the stream. Hopac code can quite flexibly switch between and combine streams and other message passing operations.

@vasily-kirichenko
Copy link
Contributor Author

Thanks! You are right, it seems that streams are overkill here, so I've ended up with RunningProcess, even your lineStream is not needed. I also added timeout helper https://github.com/Hopac/Hopac.Extras/blob/master/src/Hopac.Extras/ProcessRunner.fs#L79-L82:

let timeoutAlt timeout = Alt.delay <| fun _ ->
    match (startTime + timeout) - DateTime.UtcNow with
    | t when t > TimeSpan.Zero -> Timer.Global.timeOut t
    | _ -> Alt.always()

Client can synchronize on it (as well as on lineOutput and exited), then decide what to do with the process - kill it or something else. Oh, wait. I think I can create a special channel for this and do the same logic as here https://github.com/Hopac/Hopac.Extras/blob/master/src/Hopac.Extras/ObjectPool.fs#L31-L32 Will rewrite it.

About Streams, yes, a well documented example of their usage is very much appreciated.

@Hopac Hopac locked and limited conversation to collaborators Apr 17, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants