Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limited example of interoperability with the Streaming library. #61

Open
torgeirsh opened this issue Aug 30, 2023 · 4 comments
Open

Limited example of interoperability with the Streaming library. #61

torgeirsh opened this issue Aug 30, 2023 · 4 comments

Comments

@torgeirsh
Copy link

The Interop.Streaming.fromStreaming example assumes "r ~ ()", but I rarely find that to be the case in code that uses Streaming. Is this an area where Streaming's model is more "powerful" than Streamly's due to different trade-offs, or is there a way to represent "Stream (Of a) m r" in Streamly?

@harendra-kumar
Copy link
Member

I am not very much familiar with how returning value in Streaming is actually used. If there is a practical use case example of that we can try to model that using Streamly and see how the two models are different.

@torgeirsh
Copy link
Author

It is the value you end up with after consuming all the elements of the stream. Since you can't access the return value before all the elements are consumed, it lets you encode data dependencies in the types (enforced by the compiler). This is useful in several situations:

  • Ensure correct ordering of effects for functions like this, without having to accumulate values:

    splitAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r)

  • Return an error for infinite streams produced by an effect that can fail:

    Stream f m NetworkError

  • When the return type is an unconstrained type variable (like "r" in the splitAt example above), you know that the function can't terminate the stream, so there's no need to document it, as proposed in Document folds with termination annotation streamly#772. (Unfortunately it only works one way - you are not guaranteed that a "Stream f m ()" will terminate.)

This was just off the top of my head, so I'll let you know if anything else comes up. Thank you for looking into it!

@torgeirsh
Copy link
Author

@harendra-kumar Have you had the chance to model the described behaviour using Streamly?

@harendra-kumar
Copy link
Member

harendra-kumar commented Feb 2, 2024

Sorry, I missed these updates earlier.

There is a fundamental difference in the model that streamly uses and the one that other streaming libraries use. Other libraries use the same type for a producer stream and a consumer stream whereas streamly uses different types. For example conduit uses the following type, I guess the same applies to Pipes and Streaming as well:

data ConduitT i o m r

Streamly has different types for producers, pipes and consumers. The first half of the story is the producer type:

data Stream m o -- producer

The other half of the story is the consumer types. The output type o above would become the input type of the consumers when you consume the generated stream:

data Fold m o r -- consumer
data Parser o m r -- more powerful fold

The Pipe type is not yet implemented properly, but this is how it would look like:

data Pipe m i o -- pipe

If you put all these together you can imagine getting something like ConduitT i o m r.

These streaming libraries use the monad instance for two purposes, (1) appending the producers, (2) splitting the consumers. Both of these are possible with streamly and in a very similar fashion.

For appending, conduit would do something like this, example from the conduit README:

source :: Monad m => ConduitT i Int m ()
source = do
    yieldMany [1..10]
    yieldMany [11..20]

With streamly you do this without using the monad instance using the append operation.

source :: Stream m Int
source = 
    Stream.fromList [1..10] `Stream.append`
    Stream.fromList [11..20]

When consuming, the monad instance is used to split the consumers and this is where the return type you mentioned originally comes into picture. Consider this conduit example from the conduit README:

sink :: Monad m => ConduitT Int o m (String, Int)
sink = do
    x <- takeC 5 .| mapC show .| foldC
    y <- sumC
    return (x, y)

Streamly also supports the return type but you find it in consumers, not in producers. The r in Fold m i r type can be considered equivalent to the stream return type in Conduit and Pipes. The above conduit example directly translates to folds in streamly. Since folds do not have a monad instance, I will use the Parser type which is essentially a more powerful fold. Folds and parsers can be inter-converted:

import Data.Function ((&))
import Streamly.Data.Parser (Parser)
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser as Parser
import qualified Streamly.Data.Stream as Stream

sink :: Monad m => Parser Int m (String, Int)
sink = do
    x <- Parser.fromFold (Fold.take 5 Fold.toList & fmap show)
    y <- Parser.fromFold (Fold.sum)
    return (x, y)

main :: IO ()
main = Stream.fold (Parser.toFold sink) (Stream.fromList [1..10]) >>= print

We can add a monad instance to Fold as well and then we won't need to convert it to and from Parser. But that is just boilerplate difference.

Running the above example would give:

$ ./example
("[1,2,3,4,5]",40)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants