Skip to content

Commit

Permalink
Proper handling end of streams (both producer and produced)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-khritonenko committed Nov 12, 2016
1 parent ac37bc9 commit 51b7eda
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
25 changes: 19 additions & 6 deletions Libs/Hopac/Stream.fs
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,25 @@ module Stream =

let ambAll (xxs: Stream<#Stream<_>>) = joinWith amb' xxs

let rec mergeAll' combined xss =
Alt.choose [ xss ^=> function Nil -> mergeAll' combined never
| Cons (s, xss) -> mergeAll' (merge' combined s) xss
combined ^=> function Nil -> nilj
| Cons (x, tail) -> consj x (mergeAll' tail xss |> memo) ]
and mergeAll (xxs: Stream<#Stream<'x>>) : Stream<'x> = mergeAll' never xxs |> memo
let rec mergeAll' (streams: Stream<'x> option) (producer: Stream<Stream<'x>> option) : Stream<'x> =
match streams, producer with
| None, Some p ->
p ^=> function Nil -> mergeAll' None None
| Cons (x, xs) -> mergeAll' (Some x) (Some xs)
|> memo
| Some s, None ->
s ^=> function Cons (x, xs) -> consj x (mergeAll' (Some xs) producer)
| Nil -> mergeAll' None producer :> _
|> memo
| Some s, Some p ->
Alt.choose [ s ^=> function Cons (x, xs) -> consj x (mergeAll' (Some xs) producer)
| Nil -> mergeAll' None producer :> _
p ^=> function Cons (x, xs) -> mergeAll' (merge s x |> Some) (Some xs)
| Nil -> mergeAll' streams None ] |> memo
| None, None -> memo nilj

let mergeAll producer = mergeAll' None (Some producer)


let appendAll (xxs: Stream<#Stream<_>>) = joinWith append' xxs
let switchAll (xxs: Stream<#Stream<_>>) = joinWith switch' xxs
Expand Down
2 changes: 1 addition & 1 deletion Libs/Hopac/Stream.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ module Stream =
val ambAll: Stream<#Stream<'x>> -> Stream<'x>

/// Joins all the streams together with `merge`.
val mergeAll: Stream<#Stream<'x>> -> Stream<'x>
val mergeAll: Stream<Stream<'x>> -> Stream<'x>

/// Joins all the streams together with `switch`.
val switchAll: Stream<#Stream<'x>> -> Stream<'x>
Expand Down
13 changes: 13 additions & 0 deletions Tests/AdHocTests/StreamTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,16 @@ let run () =
<|> timeOutMillis 100 ^-> Choice2Of2
|> run = Choice1Of2 xs

quick <| fun xs ->
let values =
[ Stream.ofList xs; Stream.once (timeOutMillis 50) ]
|> Stream.ofSeq
|> Stream.mergeAll
|> Stream.toList
|> memo
:> Alt<_>

values ^-> Choice1Of2
<|> timeOutMillis 100 ^-> Choice2Of2
|> run = Choice1Of2 (xs @ [ () ])

0 comments on commit 51b7eda

Please sign in to comment.