New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question for understanding #26

Closed
buybackoff opened this Issue Nov 13, 2014 · 19 comments

Comments

Projects
None yet
3 participants
@buybackoff

I have started using Hopac as an alternative to Async/TPL and I love it. I understand basic usage, but some aspects are still not clear.

First, could we compare Alt to F# lazy, so that a job inside an Alt is only evaluated on Alt.pick?

Second, is this implementation of AutoResetEvent correct and idiomatic for Hopac?

/// <summary>
/// MSDN: The AutoResetEvent class represents a local wait handle event that resets automatically 
/// when signaled, after releasing a single waiting thread. An AutoResetEvent object is automatically 
/// reset to non-signaled by the system after a single waiting thread has been released. 
/// If no threads are waiting, the event object's state remains signaled.
///
/// Hopac's alternative to http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266923.aspx
/// </summary>
type HopacAutoResetEvent (initialState : bool) =
    // We will wait on take, and set with send
    let setChannel : Ch<unit> = ch()
    do if initialState then start <| Ch.send setChannel ()
    new() = HopacAutoResetEvent(false)
    member this.Wait(timeout:int) : Job<bool> = 
            let timedOut : Alt<bool> = 
                ((float timeout) |> TimeSpan.FromMilliseconds |> Timer.Global.timeOut)
                >>=? fun () -> Job.result false
            let signaled = Ch.Alt.take setChannel >>=? fun () -> Job.result true
            signaled <|> timedOut

    // From docs, important for <|>:
    // The given alternatives are processed in a left-to-right order with short-cut evaluation. 
    // In other words, given an alternative of the form first <|> second, the first alternative 
    // is first instantiated and, if it is pickable, is committed to and the second alternative 
    // will not be instantiated at all.

    member this.Set() : Job<unit> = 
        // from MSDN: Also, if Set is called when there are no threads waiting and the EventWaitHandle 
        // is already signaled, the call has no effect.

        // try take and send covers all cases
        // if there was no waiters and state was signalled -> will steal the state and send it back immediately
        // if there were waiting thread or state was not signaled -> there was no signal and we steal nothing, just signal
        (Ch.Try.take setChannel) >>. Ch.send setChannel ()

Third, is this implementation of ManualResetEvent correct and idiomatic for Hopac?

/// <summary>
/// Hopac's alternative to http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx
/// </summary>
type HopacManualResetEvent (initialState : bool) =
    [<VolatileFieldAttribute>]
    let mutable state : bool = initialState
    let setChannel : MChan<bool> = run <| Multicast.create ()
    let lock = Lock.Now.create()
    new() = HopacManualResetEvent(false)

    member this.Wait() : Job<bool> =
        let rec loop () = 
            job {
                if state then return true 
                else 
                    let! port = Multicast.port setChannel
                    let! res = (Multicast.recv port) // waiting here
                    if res then return true
                    else return! loop ()
            }
        loop ()

    // From Multicast.fsi: **Sends** a message to all of the ports listening to the multicast channel.
    // Send must mean the same as in Ch
    member this.Set() : Job<unit> = 
        (Multicast.multicast setChannel true)   // there could be no waiters
        |>> (fun _ -> state <- true )   // in any case we set the state
        >>% ()                          // and return unit
        |> (Lock.duringJob lock)

    member this.Reset() : Job<unit> = 
        (Multicast.multicast setChannel false) // (redundant?) if there are takers, res in loop() will be false and loop will iterate
        |>> (fun _ -> state <- false )  // in any case we set the state
        >>% ()
        |> (Lock.duringJob lock)

Thanks in advance!

Cross-post: http://stackoverflow.com/questions/26912904/better-understanding-of-f-hopac-library

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

I'll answer here in a number of parts.

The first question was whether Alt could be compared to lazy. Alt<'x> would be slightly more accurately compared to a thunk aka a function of type unit -> 'x. The reason for this is that lazy implies performing any effect only once and then memoizing the result, while, in general, alternatives make no such guarantee. The effects of an alternative, such as offering to give a message on a channel, are performed each time the alternative is "instantiated".

Contributor

polytypic commented Nov 13, 2014

I'll answer here in a number of parts.

The first question was whether Alt could be compared to lazy. Alt<'x> would be slightly more accurately compared to a thunk aka a function of type unit -> 'x. The reason for this is that lazy implies performing any effect only once and then memoizing the result, while, in general, alternatives make no such guarantee. The effects of an alternative, such as offering to give a message on a channel, are performed each time the alternative is "instantiated".

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

Regarding AutoResetEvent, here is what I think would be a CML-style design using Hopac.

type AutoResetEvent (initialState: bool) =
  let waitCh = ch ()
  let setCh = ch ()

  let rec set () = (waitCh <-? () >>=? unset) <|> (setCh >>=? set)
  and unset () = setCh >>= set
  do start (if initialState then set () else unset ())

  member this.Set: Job<unit> = setCh <-- ()

  member this.Wait: Alt<unit> = upcast waitCh

Why would this be CML-style or idiomatic?

We keep the state, in this case whether the event is set or not, in a lightweight thread that implements the logic of the concurrent abstraction. This way the state is nicely protected from most concurrency issues.

We use channels to communicate with the lightweight thread, requesting state changes or responses according to state.

We don't implement timeouts, but rather we provide a simple alternative for waiting for the event. It is now possible for any client of the are: AutoResetEvent to wait for the event without:

are.Wait >>= // ...

or with timeout:

Alt.select [
  are.Wait >>=? // ...
  timeout >>=? // ...
] 

or as alternative to some completely different alternative:

Alt.select [
  are.Wait >>=? // ...
  sendMissiles >>=? // ...
]

But, of course, there are many ways to implement this functionality.

Contributor

polytypic commented Nov 13, 2014

Regarding AutoResetEvent, here is what I think would be a CML-style design using Hopac.

type AutoResetEvent (initialState: bool) =
  let waitCh = ch ()
  let setCh = ch ()

  let rec set () = (waitCh <-? () >>=? unset) <|> (setCh >>=? set)
  and unset () = setCh >>= set
  do start (if initialState then set () else unset ())

  member this.Set: Job<unit> = setCh <-- ()

  member this.Wait: Alt<unit> = upcast waitCh

Why would this be CML-style or idiomatic?

We keep the state, in this case whether the event is set or not, in a lightweight thread that implements the logic of the concurrent abstraction. This way the state is nicely protected from most concurrency issues.

We use channels to communicate with the lightweight thread, requesting state changes or responses according to state.

We don't implement timeouts, but rather we provide a simple alternative for waiting for the event. It is now possible for any client of the are: AutoResetEvent to wait for the event without:

are.Wait >>= // ...

or with timeout:

Alt.select [
  are.Wait >>=? // ...
  timeout >>=? // ...
] 

or as alternative to some completely different alternative:

Alt.select [
  are.Wait >>=? // ...
  sendMissiles >>=? // ...
]

But, of course, there are many ways to implement this functionality.

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

Here is a ManualResetEvent implementation in similar style:

type ManualResetEvent (initialState: bool) =
  let waitCh = ch ()
  let controlCh = ch ()

  do start << Job.iterate initialState <| function
      | false -> upcast controlCh
      | true  -> (waitCh <-? () >>%? true) <|> asAlt controlCh

  member this.Set: Job<unit> = controlCh <-- true
  member this.Reset: Job<unit> = controlCh <-- false

  member this.Wait: Alt<unit> = upcast waitCh

This implementation similarly maintains state in a lightweight server thread. This time the server thread loop is implemented using the Job.iterate combinator. Rather than a plain setCh: Ch<unit> the state change requests are now given on a controlCh: Ch<bool>.

There is one "tricky" detail in the above implementation. In the case that the state is true, the server thread first tries the alternative of giving a message on the waitCh. This means that the server will satisfy all wait requests before accepting another control message.

Again, there are many possible ways to implement the same functionality.

Contributor

polytypic commented Nov 13, 2014

Here is a ManualResetEvent implementation in similar style:

type ManualResetEvent (initialState: bool) =
  let waitCh = ch ()
  let controlCh = ch ()

  do start << Job.iterate initialState <| function
      | false -> upcast controlCh
      | true  -> (waitCh <-? () >>%? true) <|> asAlt controlCh

  member this.Set: Job<unit> = controlCh <-- true
  member this.Reset: Job<unit> = controlCh <-- false

  member this.Wait: Alt<unit> = upcast waitCh

This implementation similarly maintains state in a lightweight server thread. This time the server thread loop is implemented using the Job.iterate combinator. Rather than a plain setCh: Ch<unit> the state change requests are now given on a controlCh: Ch<bool>.

There is one "tricky" detail in the above implementation. In the case that the state is true, the server thread first tries the alternative of giving a message on the waitCh. This means that the server will satisfy all wait requests before accepting another control message.

Again, there are many possible ways to implement the same functionality.

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

Back to the AutoResetEvent. Here is a simplified version of the code you gave:

type HopacAutoResetEvent (initialState : bool) =
  let setChannel : Ch<unit> = ch()
  do if initialState then start <| Ch.send setChannel ()
  member this.Wait: Alt<unit> = Ch.Alt.take setChannel
  member this.Set: Job<unit> = Ch.Try.take setChannel >>. Ch.send setChannel ()

There is a problem in the above AutoResetEvent implementation. The problem is that there is a race condition. If multiple threads execute the Set operation concurrently, it is possible that more than one thread effectively simultaneously executes the Ch.Try.take operation before advancing to the Ch.sendoperation. This means that more than one messages will be left in the channel and subsequently multiple waits will be satisfied.

I think that the timeout logic you presented was basically correct, so I just left it out for simplicity.

BTW, looking at the timeout logic, I realized that there is a kind of bug in the current timeout implementation of Hopac, because it doesn't support the special case of infinite timeouts. I'll fix that ASAP.

Contributor

polytypic commented Nov 13, 2014

Back to the AutoResetEvent. Here is a simplified version of the code you gave:

type HopacAutoResetEvent (initialState : bool) =
  let setChannel : Ch<unit> = ch()
  do if initialState then start <| Ch.send setChannel ()
  member this.Wait: Alt<unit> = Ch.Alt.take setChannel
  member this.Set: Job<unit> = Ch.Try.take setChannel >>. Ch.send setChannel ()

There is a problem in the above AutoResetEvent implementation. The problem is that there is a race condition. If multiple threads execute the Set operation concurrently, it is possible that more than one thread effectively simultaneously executes the Ch.Try.take operation before advancing to the Ch.sendoperation. This means that more than one messages will be left in the channel and subsequently multiple waits will be satisfied.

I think that the timeout logic you presented was basically correct, so I just left it out for simplicity.

BTW, looking at the timeout logic, I realized that there is a kind of bug in the current timeout implementation of Hopac, because it doesn't support the special case of infinite timeouts. I'll fix that ASAP.

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

Thinking about the race condition, I think it is best to consider channels as stateless. Threads, on the other hand, can hold state and can then choose what to do with that state in response to communication with other threads.

BTW, thanks for the question! I hope my answers have helped. Feel free to ask further questions. I'll likely comment further on the HopacManualResetEvent implementation a bit later.

Contributor

polytypic commented Nov 13, 2014

Thinking about the race condition, I think it is best to consider channels as stateless. Threads, on the other hand, can hold state and can then choose what to do with that state in response to communication with other threads.

BTW, thanks for the question! I hope my answers have helped. Feel free to ask further questions. I'll likely comment further on the HopacManualResetEvent implementation a bit later.

@buybackoff

This comment has been minimized.

Show comment
Hide comment
@buybackoff

buybackoff Nov 13, 2014

Thank you very much! This helps a lot. I need to digest your answers and play with the examples to understand the idioms and subtleties.

How upcast for channels works? You return a channel as Alt - is it the same as Ch.Alt.take?. What happens with this Alt after give if several threads are waiting?

Thank you very much! This helps a lot. I need to digest your answers and play with the examples to understand the idioms and subtleties.

How upcast for channels works? You return a channel as Alt - is it the same as Ch.Alt.take?. What happens with this Alt after give if several threads are waiting?

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

Yes, upcast ch is the same as Ch.Alt.take ch. This is an optimization to reduce memory allocations. The inheritance hierarchy is Ch<'x> :> Alt<'x> :> Job<'x>.

If multiple threads wait for a Ch.Alt.take ch (or the exact same upcast ch) alternative and some other thread performs Ch.give ch x then one of the threads waiting on the channel is given (or takes) the message. The other threads continue waiting on the channel.

Contributor

polytypic commented Nov 13, 2014

Yes, upcast ch is the same as Ch.Alt.take ch. This is an optimization to reduce memory allocations. The inheritance hierarchy is Ch<'x> :> Alt<'x> :> Job<'x>.

If multiple threads wait for a Ch.Alt.take ch (or the exact same upcast ch) alternative and some other thread performs Ch.give ch x then one of the threads waiting on the channel is given (or takes) the message. The other threads continue waiting on the channel.

@buybackoff

This comment has been minimized.

Show comment
Hide comment
@buybackoff

buybackoff Nov 13, 2014

So in ManualResetEvent case how many thread will go through after Set? Am I correct that if several threads are waiting, then:

  • Before Set, iterator is waiting on the control channel here (after false case):
  do start << Job.iterate initialState <| function
      | false -> upcast controlCh                                                       //wait here before set
      | true  -> (waitCh <-? () >>%? true) <|> asAlt controlCh
  • After Set, controlCh returns and then loops to the true case. Th
  do start << Job.iterate initialState <| function
      | false -> upcast controlCh                                                       // return from here
      | true  -> (waitCh <-? () >>%? true) <|> asAlt controlCh          // iterate to here
  • If there is a waiter, first Alt (waitCh <-? () >>%? true) releases the waiter, and iterator goes back to the same place. If there are many waiters, we will iterate until we signal each them. When there are no more waiters, we block on asAlt controlCh in the true case.

What will happen if there are 10 threads waiting, then we iterate to release all of them, and right after releasing the 5th thread another thread calls Reset?

Here (waitCh <-? () >>%? true) <|> asAlt controlCh we will signal remaining 5 threads due to the order of Alts. But what if after releasing the 6th waiter other threads will call Wait before we release the 10th one? It looks like the new threads (11th, 12th,...) will also be released even though they called Wait after Reset?

Reversing the order asAlt controlCh <|> (waitCh <-? () >>%? true) won't help, since we will block threads 7-10. We need a lock while iterating on many waiter after set.

So in ManualResetEvent case how many thread will go through after Set? Am I correct that if several threads are waiting, then:

  • Before Set, iterator is waiting on the control channel here (after false case):
  do start << Job.iterate initialState <| function
      | false -> upcast controlCh                                                       //wait here before set
      | true  -> (waitCh <-? () >>%? true) <|> asAlt controlCh
  • After Set, controlCh returns and then loops to the true case. Th
  do start << Job.iterate initialState <| function
      | false -> upcast controlCh                                                       // return from here
      | true  -> (waitCh <-? () >>%? true) <|> asAlt controlCh          // iterate to here
  • If there is a waiter, first Alt (waitCh <-? () >>%? true) releases the waiter, and iterator goes back to the same place. If there are many waiters, we will iterate until we signal each them. When there are no more waiters, we block on asAlt controlCh in the true case.

What will happen if there are 10 threads waiting, then we iterate to release all of them, and right after releasing the 5th thread another thread calls Reset?

Here (waitCh <-? () >>%? true) <|> asAlt controlCh we will signal remaining 5 threads due to the order of Alts. But what if after releasing the 6th waiter other threads will call Wait before we release the 10th one? It looks like the new threads (11th, 12th,...) will also be released even though they called Wait after Reset?

Reversing the order asAlt controlCh <|> (waitCh <-? () >>%? true) won't help, since we will block threads 7-10. We need a lock while iterating on many waiter after set.

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

Your analysis is almost correct. Note that the Reset operation is synchronous. It will not be completed until the server thread responds to it. Let's say that some thread starts executing the Reset operation after the 5th thread is released. What happens is that the Reset operation blocks until all the threads waiting have been released.

This corresponds to the idea that Set is effectively an atomic operation: all threads waiting for the event will be released.

If the true case were written:

      | true  -> asAlt controlCh <|> (waitCh <-? () >>%? true)

Then an outside thread could Reset the event before all waiters have been released.

Contributor

polytypic commented Nov 13, 2014

Your analysis is almost correct. Note that the Reset operation is synchronous. It will not be completed until the server thread responds to it. Let's say that some thread starts executing the Reset operation after the 5th thread is released. What happens is that the Reset operation blocks until all the threads waiting have been released.

This corresponds to the idea that Set is effectively an atomic operation: all threads waiting for the event will be released.

If the true case were written:

      | true  -> asAlt controlCh <|> (waitCh <-? () >>%? true)

Then an outside thread could Reset the event before all waiters have been released.

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

Here is an implementation of AutoResetEvent that doesn't use a server thread:

type AutoResetEvent (initialState: bool) =
  let set = if initialState then MVar.Now.createFull () else mvar ()
  let unset = if initialState then mvar () else MVar.Now.createFull ()

  member this.Set: Job<unit> =
    (set <|> unset) >>= fun () -> set <<-= ()

  member this.Wait: Alt<unit> =
    set >>=? fun () -> unset <<-= ()

This implementation makes use of two MVars of which at most one holds a value at any time. This illustrates the idea of using MVars for passing a "permission token".

Contributor

polytypic commented Nov 13, 2014

Here is an implementation of AutoResetEvent that doesn't use a server thread:

type AutoResetEvent (initialState: bool) =
  let set = if initialState then MVar.Now.createFull () else mvar ()
  let unset = if initialState then mvar () else MVar.Now.createFull ()

  member this.Set: Job<unit> =
    (set <|> unset) >>= fun () -> set <<-= ()

  member this.Wait: Alt<unit> =
    set >>=? fun () -> unset <<-= ()

This implementation makes use of two MVars of which at most one holds a value at any time. This illustrates the idea of using MVars for passing a "permission token".

@buybackoff

This comment has been minimized.

Show comment
Hide comment
@buybackoff

buybackoff Nov 13, 2014

Just wow! It is so simple and powerful, and almost easy! (to both comments)

Just wow! It is so simple and powerful, and almost easy! (to both comments)

@buybackoff

This comment has been minimized.

Show comment
Hide comment
@buybackoff

buybackoff Nov 13, 2014

But wait,

Your analysis is almost correct. Note that the Reset operation is synchronous. It will not be completed until the server thread responds to it. Let's say that some thread starts executing the Reset operation after the 5th thread is released. What happens is that the Reset operation blocks until all the threads waiting have been released.

what if we have the same 10 waiters, then we call Reset after releasing the 5th one. Reset will block. But then after releasing 6th waiter 11th, 12th... waiter come before we release 10th and before we go to <|> asAlt controlCh. We won't block 7th-10th, but 11+th ones will be released. If waiters are added at the same speed as they are released then many threads will go through. Am I missing something?

But wait,

Your analysis is almost correct. Note that the Reset operation is synchronous. It will not be completed until the server thread responds to it. Let's say that some thread starts executing the Reset operation after the 5th thread is released. What happens is that the Reset operation blocks until all the threads waiting have been released.

what if we have the same 10 waiters, then we call Reset after releasing the 5th one. Reset will block. But then after releasing 6th waiter 11th, 12th... waiter come before we release 10th and before we go to <|> asAlt controlCh. We won't block 7th-10th, but 11+th ones will be released. If waiters are added at the same speed as they are released then many threads will go through. Am I missing something?

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

I was just in the middle of writing this reply... Yes, indeed. Thinking about this further, I think that my initial version of ManualResetEvent can be considered unfair, because it basically gives priority to releasing Wait operations. I think that could be used to contrive a program that live locks.

Here is an implementation on ManualResetEvent using the MVar permission token passing idiom:

type ManualResetEvent (initialState: bool) =
  let set = if initialState then MVar.Now.createFull () else mvar ()
  let unset = if initialState then mvar () else MVar.Now.createFull ()
  member this.Set = (set <|> unset) >>= MVar.fill set
  member this.Reset = (set <|> unset) >>= MVar.fill unset
  member this.Wait = MVar.Alt.read set

In this version none of the operations is given priority over other operations. The set variable effectively acts as a queue for the operations.

Contributor

polytypic commented Nov 13, 2014

I was just in the middle of writing this reply... Yes, indeed. Thinking about this further, I think that my initial version of ManualResetEvent can be considered unfair, because it basically gives priority to releasing Wait operations. I think that could be used to contrive a program that live locks.

Here is an implementation on ManualResetEvent using the MVar permission token passing idiom:

type ManualResetEvent (initialState: bool) =
  let set = if initialState then MVar.Now.createFull () else mvar ()
  let unset = if initialState then mvar () else MVar.Now.createFull ()
  member this.Set = (set <|> unset) >>= MVar.fill set
  member this.Reset = (set <|> unset) >>= MVar.fill unset
  member this.Wait = MVar.Alt.read set

In this version none of the operations is given priority over other operations. The set variable effectively acts as a queue for the operations.

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 13, 2014

Contributor

Here is a fair version of ManualResetEvent using the server loop idiom:

type Msg =
  | Wait of IVar<Alt<unit>>
  | Set
  | Reset

type ManualResetEvent (initialState: bool) =
  let reqCh = ch ()

  let rec isSet (v: Alt<unit>) =
    reqCh >>= function
     | Wait r -> r <-= v >>. isSet v
     | Set -> isSet v
     | Reset -> isUnset (ivar ())
  and isUnset (v: IVar<unit>) =
    reqCh >>= function
     | Wait r -> r <-= upcast v >>. isUnset v
     | Set -> v <-= () >>. isSet v
     | Reset -> isUnset v

  do start <| if initialState
              then isSet (IVar.Now.createFull ())
              else isUnset (ivar ())

  member this.Set = reqCh <-- Set
  member this.Reset = reqCh <-- Reset

  member this.Wait =
    Alt.guard << Job.delay <| fun () ->
    let reply = ivar ()
    reqCh <-+ Wait reply >>.
    reply

When isSet, the state of the server is an ivar that has been filled. Transitioning to isUnset, a new empty ivar is created. Transitioning back to isSet, the empty ivar is filled. And so on. Wait operation requests the state from the server. This illustrates the use of Alt.guard to encapsulate an operation that sends a request to a server and then waits for the result.

Contributor

polytypic commented Nov 13, 2014

Here is a fair version of ManualResetEvent using the server loop idiom:

type Msg =
  | Wait of IVar<Alt<unit>>
  | Set
  | Reset

type ManualResetEvent (initialState: bool) =
  let reqCh = ch ()

  let rec isSet (v: Alt<unit>) =
    reqCh >>= function
     | Wait r -> r <-= v >>. isSet v
     | Set -> isSet v
     | Reset -> isUnset (ivar ())
  and isUnset (v: IVar<unit>) =
    reqCh >>= function
     | Wait r -> r <-= upcast v >>. isUnset v
     | Set -> v <-= () >>. isSet v
     | Reset -> isUnset v

  do start <| if initialState
              then isSet (IVar.Now.createFull ())
              else isUnset (ivar ())

  member this.Set = reqCh <-- Set
  member this.Reset = reqCh <-- Reset

  member this.Wait =
    Alt.guard << Job.delay <| fun () ->
    let reply = ivar ()
    reqCh <-+ Wait reply >>.
    reply

When isSet, the state of the server is an ivar that has been filled. Transitioning to isUnset, a new empty ivar is created. Transitioning back to isSet, the empty ivar is filled. And so on. Wait operation requests the state from the server. This illustrates the use of Alt.guard to encapsulate an operation that sends a request to a server and then waits for the result.

@polytypic polytypic added the question label Nov 13, 2014

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 14, 2014

Contributor

Here is one more fair version of ManualResetEvent:

type ManualResetEvent (initialState: bool) =
  let state = mvarFull <| if initialState then ivarFull () else ivar ()
  member this.Set =
    state >>= fun v -> IVar.tryFill v () >>. (state <<-= v)
  member this.Reset =
    state >>= fun v ->
    state <<-= if IVar.Now.isFull v then ivar () else v
  member this.Wait = Alt.guard (MVar.read state |>> asAlt)

This version uses a mvar to serialize access to the state which is an ivar.

Contributor

polytypic commented Nov 14, 2014

Here is one more fair version of ManualResetEvent:

type ManualResetEvent (initialState: bool) =
  let state = mvarFull <| if initialState then ivarFull () else ivar ()
  member this.Set =
    state >>= fun v -> IVar.tryFill v () >>. (state <<-= v)
  member this.Reset =
    state >>= fun v ->
    state <<-= if IVar.Now.isFull v then ivar () else v
  member this.Wait = Alt.guard (MVar.read state |>> asAlt)

This version uses a mvar to serialize access to the state which is an ivar.

@buybackoff

This comment has been minimized.

Show comment
Hide comment
@buybackoff

buybackoff Nov 15, 2014

Thank you again! This is very helpful to feel Hopac.

Another question - how to run a synchronous blocking operations inside Hopac jobs, e.g. native non-async ARE/MREs that just block a thread? Should I always wrap them in Async/Task and then use Async.toJob or Task.awaitJob as in an example below? Otherwise I will block a worker thread of Hopac and effectively one core?

let jumpOutOfWorkerThread (blockingCall:'x->'y) (x:'x) : Job<'y> =
        let asAsync : Async<'y> = async { return blockingCall(x)}
        Hopac.Extensions.Async.toJob asAsync

Or using Job.lift is safe in this respect?

Thank you again! This is very helpful to feel Hopac.

Another question - how to run a synchronous blocking operations inside Hopac jobs, e.g. native non-async ARE/MREs that just block a thread? Should I always wrap them in Async/Task and then use Async.toJob or Task.awaitJob as in an example below? Otherwise I will block a worker thread of Hopac and effectively one core?

let jumpOutOfWorkerThread (blockingCall:'x->'y) (x:'x) : Job<'y> =
        let asAsync : Async<'y> = async { return blockingCall(x)}
        Hopac.Extensions.Async.toJob asAsync

Or using Job.lift is safe in this respect?

@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 15, 2014

Contributor

BTW, one thing I didn't emphasize earlier is that Job<'x> and Alt<'x> values (and Async<'x> values) are reusable. If you care for performance (space and time), then it can sometimes make a big difference (either way) whether you reallocate or reuse (when possible) operations. So, for example, this would also be a valid implementation of AutoResetEvent:

type AutoResetEvent =
  val Set: Job<unit>
  val Wait: Alt<unit>
  new (initialState: bool) =
    let set = if initialState then mvarFull () else mvar ()
    let unset = if initialState then mvar () else mvarFull ()
    {Set = (set <|> unset) >>= MVar.fill set
     Wait = set >>=? MVar.fill unset}
  new () = AutoResetEvent (false)
Contributor

polytypic commented Nov 15, 2014

BTW, one thing I didn't emphasize earlier is that Job<'x> and Alt<'x> values (and Async<'x> values) are reusable. If you care for performance (space and time), then it can sometimes make a big difference (either way) whether you reallocate or reuse (when possible) operations. So, for example, this would also be a valid implementation of AutoResetEvent:

type AutoResetEvent =
  val Set: Job<unit>
  val Wait: Alt<unit>
  new (initialState: bool) =
    let set = if initialState then mvarFull () else mvar ()
    let unset = if initialState then mvar () else mvarFull ()
    {Set = (set <|> unset) >>= MVar.fill set
     Wait = set >>=? MVar.fill unset}
  new () = AutoResetEvent (false)
@polytypic

This comment has been minimized.

Show comment
Hide comment
@polytypic

polytypic Nov 15, 2014

Contributor

One should not block Hopac worker threads or equivalently one should also not use a Hopac job to run a thread that never waits. Such operations will interfere with the scheduling mechanisms of Hopac.

If you just need to run some simple CPU bound synchronous computation that takes a relatively long time, but will ultimately finish, then you may be just fine synchronously calling that computation from within a Hopac job. You may be fine even if that computation does some IO.

For more involved synchronous operations that take long enough time that they interfere with responsiveness or spend a lot of time waiting for IO you will need to do something more clever. Job.lift is not safe in this respect.

Note that such involved, non-cooperative, synchronous operations don't really work nicely from within asyncs or Tasks either. Usually async and Task use ThreadPool underneath and what happens is that more worker threads will be created which is very expensive. Ultimately it would be better to rewrite CPU bound operations to take advantage of parallelism and IO bound operations to take advantage of non-blocking asynchronous IO.

If you really need to interface with such non-cooperative legacy APIs, then you could just as well use the legacy helper for that: ThreadPool.

Here is how one could wrap a WaitHandle as a job:

type WaitHandle with
  member this.awaitJob (timeout: TimeSpan) : Job<bool> =
    Job.scheduler () >>= fun sr ->
    let rV = ivar ()
    ThreadPool.RegisterWaitForSingleObject
     (this, (fun _ r -> Scheduler.start sr (rV <-= r)), null, timeout, true)
    upcast rV

Here is how one could wrap an expensive, non-cooperative, synchronous computation as a job:

type ThreadPool with
  static member runAsJob (op: unit -> 'x) : Job<'x> =
    Job.scheduler () >>= fun sr ->
    let rV = ivar ()
    ThreadPool.QueueUserWorkItem
     (fun _ -> Scheduler.start sr (try rV <-= op () with e -> IVar.fillFailure rV e))
    upcast rV

BTW, I'm currently in the process of rewriting/redesigning the scheduler mechanism of Hopac to allow for SynchronizationContext support. Once finished, the pattern you see above will change slightly.

Contributor

polytypic commented Nov 15, 2014

One should not block Hopac worker threads or equivalently one should also not use a Hopac job to run a thread that never waits. Such operations will interfere with the scheduling mechanisms of Hopac.

If you just need to run some simple CPU bound synchronous computation that takes a relatively long time, but will ultimately finish, then you may be just fine synchronously calling that computation from within a Hopac job. You may be fine even if that computation does some IO.

For more involved synchronous operations that take long enough time that they interfere with responsiveness or spend a lot of time waiting for IO you will need to do something more clever. Job.lift is not safe in this respect.

Note that such involved, non-cooperative, synchronous operations don't really work nicely from within asyncs or Tasks either. Usually async and Task use ThreadPool underneath and what happens is that more worker threads will be created which is very expensive. Ultimately it would be better to rewrite CPU bound operations to take advantage of parallelism and IO bound operations to take advantage of non-blocking asynchronous IO.

If you really need to interface with such non-cooperative legacy APIs, then you could just as well use the legacy helper for that: ThreadPool.

Here is how one could wrap a WaitHandle as a job:

type WaitHandle with
  member this.awaitJob (timeout: TimeSpan) : Job<bool> =
    Job.scheduler () >>= fun sr ->
    let rV = ivar ()
    ThreadPool.RegisterWaitForSingleObject
     (this, (fun _ r -> Scheduler.start sr (rV <-= r)), null, timeout, true)
    upcast rV

Here is how one could wrap an expensive, non-cooperative, synchronous computation as a job:

type ThreadPool with
  static member runAsJob (op: unit -> 'x) : Job<'x> =
    Job.scheduler () >>= fun sr ->
    let rV = ivar ()
    ThreadPool.QueueUserWorkItem
     (fun _ -> Scheduler.start sr (try rV <-= op () with e -> IVar.fillFailure rV e))
    upcast rV

BTW, I'm currently in the process of rewriting/redesigning the scheduler mechanism of Hopac to allow for SynchronizationContext support. Once finished, the pattern you see above will change slightly.

@dustinlacewell-wk

This comment has been minimized.

Show comment
Hide comment
@dustinlacewell-wk

dustinlacewell-wk Dec 15, 2017

Holy hell this thread was useful.

Holy hell this thread was useful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment