Skip to content

Commit

Permalink
working producer-consumer version but needs more docs and cleanup, co…
Browse files Browse the repository at this point in the history
…mmit is for backup purposes
  • Loading branch information
Dierk Koenig committed May 17, 2021
1 parent 509e70e commit e226dba
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 9 deletions.
244 changes: 244 additions & 0 deletions examples/LineProducerConsumer.fr
@@ -0,0 +1,244 @@
{--
Example module to show the use of producer-consumer piping by putting it in various contexts like
file, url, and command line processing but also pure in-memory operations.
-}

module examples.LineProducerConsumer where

import Control.pipe.Producer (produce, endWith, Producer, runProducer )
import Control.pipe.Consumer (consume, Consumer, runConsumer )
import Control.monad.trans.MonadTrans (MonadTrans (lift))
import Control.pipe.Pipe (pipe2)
import Java.IO (openReader)
import Java.Net (URL, MalformedURLException)

data StrGenData = StrGen {
line :: String, -- the currently generated line, "onLine" will be called with this
nextLine :: String -> String -- the generation function for the next line
}

type StringProducer = Ref StrGenData -- a mutable reference to the Producer data

data Person = Person {
name :: String,
birthyear :: Int
}

producePerson :: Producer Person IO ()
producePerson =
do
produce $ Person "Leonard Euler" 1707
produce $ Person "Carl Friedrich Gauss" 1777
produce $ Person "Gottlob Frege" 1848
endWith ()





produceLinesFromStringReader :: Producer String (ST s) ()
produceLinesFromStringReader =
do
stringReader <- lift $ StringReader.new $ unlines ["first","second","third"]
bufferedReader <- lift $ BufferedReader.new stringReader
produceLinesFromBufferedReader bufferedReader


produceLinesFromUrl :: String -> Producer String IO ()
produceLinesFromUrl url =
do
xurl <- lift $ URL.new url
uis <- lift $ URL.openStream xurl
isr <- lift $ InputStreamReader.new uis "UTF-8"
bufferedReader <- lift $ BufferedReader.new isr
produceLinesFromBufferedReader bufferedReader

produceLinesFromStrGenData :: Int -> Mutable s StringProducer -> Producer String (ST s) ()
produceLinesFromStrGenData n _ | n == 0 = pure ()
produceLinesFromStrGenData n strGen | otherwise =
do
state <- lift $ strGen.get
produce state.line
lift $ strGen.modify _.{line <- state.nextLine}
produceLinesFromStrGenData (n - 1) strGen

countLinesConsumer :: Show a => Int -> Consumer (Maybe a) (ST s) Int
countLinesConsumer count =
do
line <- consume
case line of
(Just l) ->
do
countLinesConsumer (count + 1)
(Nothing) -> pure count



oneLineConsumer :: Show a => Consumer (Maybe a) IO (Maybe a)
oneLineConsumer =
do
line <- consume
pure line

nLineCountAndPrintConsumer :: Show a => Int -> Int -> Consumer (Maybe a) IO Int
nLineCountAndPrintConsumer n count | n == count = pure count
nLineCountAndPrintConsumer n count | otherwise =
do
line <- consume
case line of
(Just l) ->
do
lift $ println $ show (count + 1) ++ ": " ++ (show l)
nLineCountAndPrintConsumer n (count + 1)
(Nothing) -> pure count

personConsumer :: Consumer (Maybe Person) IO ()
personConsumer =
do
maybePerson <- consume
case maybePerson of
(Just p) ->
do
lift $ println $ "Processing " ++ p.name
personConsumer
(Nothing) -> pure ()


prepareTestFile :: IO File
prepareTestFile = do
testFile = File.new "TestFile.txt"
println "----------------------------------------"
println "general file handling"
-- delete test file if it already existed
result <- testFile.delete
println $ "Test file deleted to clean up before start: " ++ show result
println "create test file with three lines"
writeFile testFile.getPath $ unlines ["first line","second line","third line"]
result <- testFile.exists
println $ "File now exists: " ++ show result
println "read test file in toto"
content <- readFile testFile.getPath
println "file content was:"
println content
println "append 2 lines to show that we can write"
appendFile testFile.getPath $ unlines ["fourth line","fifth line"]
return testFile

-- Basic support

--- A producer of Strings (lines) that runs in an (ST s) Monad (usually IO) and returns no result.
--- Used with many Reader types that a BufferedReader can decorate.
produceLinesFromBufferedReader :: Mutable s BufferedReader -> Producer String (ST s) ()
produceLinesFromBufferedReader bufferedReader = do
maybeLine <- lift $ bufferedReader.readLine -- lift into the Producer context monad
case maybeLine of
Just line -> do -- successful read
produce line -- produce the line, yield to consumer if any
produceLinesFromBufferedReader bufferedReader -- proceed with next line
Nothing -> do -- no more lines
lift $ bufferedReader.close -- cleanup
endWith () -- signal end of processing

-- use case 1:

processNamedFile :: String -> IO ()
processNamedFile filePath = do
println "----------------------------------------"
println "processing a named file line-by-line with a buffered reader, while keeping track of line numbers"
(end, count) <- pipe2 (produceLinesFromFilePath filePath) (countAndPrintLinesConsumer 0)
println $ "total number of lines: " ++ show count
println $ "producer end value is (): " ++ show (end == ())

produceLinesFromFilePath :: String -> Producer String IO ()
produceLinesFromFilePath filePath = do
bufferedReader <- lift $ openReader filePath -- get a buffered reader in the Producer context, now IO specific
produceLinesFromBufferedReader bufferedReader -- simple delegate, nothing specific to do

countAndPrintLinesConsumer :: Show a => Int -> Consumer (Maybe a) IO Int
countAndPrintLinesConsumer count = do
maybeLine <- consume -- try to consume
case maybeLine of
Just line -> do -- on success
lift $ println $ show (count + 1) ++ ": " ++ show line -- work in the Consumer context monad (IO) and lift
countAndPrintLinesConsumer (count + 1) -- proceed with next consumption
Nothing -> pure count -- on end, return result in Consumer context monad

-- use case 2:

fullyProcessFile :: String -> IO ()
fullyProcessFile filePath = do
println "----------------------------------------"
println "processing all lines from a file, pushing each line to a list"
println "without consumer and pipe, we can run the same producer to get the full result"
(list, _) <- runProducer (produceLinesFromFilePath filePath)
println $ "total result list: " ++ show list



main :: IO ()
main =
do
testFile <- prepareTestFile

processNamedFile testFile.getPath

fullyProcessFile testFile.getPath

println "----------------------------------------"
println "reading only one line (a header for example)"
(_, maybeHeader) <- pipe2 (produceLinesFromFilePath testFile.getPath) oneLineConsumer
case maybeHeader of
(Just header) -> println $ "the header line is: " ++ show header
(Nothing) -> println $ "The file is empty"

println "-----------Version 2 of processing just one line --------------------"
println "reading only one line (a header for example)"
(_, maybeHeader) <- pipe2 (produceLinesFromFilePath testFile.getPath) (consume >>= pure)
case maybeHeader of
(Just header) -> println $ "the header line is: " ++ show header
(Nothing) -> println $ "The file is empty"

println "----------------------------------------"
println "processing each line with a non-IO impure reader, here: StringReader. (great for testing)"
numLines =
do
(_, res) <- pipe2 (produceLinesFromStringReader) (countLinesConsumer 0)
pure res
println $ "processing strings with StringReader works as expected: " ++ show (3 == numLines.run)


println "----------------------------------------"
println "reading from a URL: http://google.com"
(_, urlLinesCount) <- pipe2 (produceLinesFromUrl "http://google.com") (nLineCountAndPrintConsumer 4 0)
println $ "processing strings from URL works as expected: " ++ show (urlLinesCount == 4)


println "----------------------------------------"
strGenLineCount =
do
strGenRef <- Ref.new $ StrGen { line = "", nextLine = (++ "x") }
(_, res) <- pipe2 (produceLinesFromStrGenData 4 strGenRef) (countLinesConsumer 0)
pure res
println $ "processing strings from String Producer works as expected: " ++ show (strGenLineCount.run == 4)

println "----------------------------------------"
gen =
do
produce 1
produce 2
endWith 42
iter =
do
lift $ print "Enter two numbers: "
a <- consume
b <- consume
lift $ println $ "Sum is " ++ show (a + b)
(xs, res) <- runProducer gen
println $ show xs ++ " " ++ show res
println $ "processing producers and consumers independently without the pipe2 function"
runConsumer xs iter

println "----------------------------------------"
(_, _) <- pipe2 (producePerson) (personConsumer)
println "processing custom data type Person works"
println "FIN"
5 changes: 5 additions & 0 deletions frege/StandardLibrary.fr
Expand Up @@ -24,6 +24,11 @@ import Control.monad.trans.MonadTrans public()

import Control.concurrent.STM public()

import Control.pipe.Coroutine public()
import Control.pipe.Producer public()
import Control.pipe.Consumer public()
import Control.pipe.Pipe public()

import Data.Bits public()
import Data.Dec64 public()
import Data.Char public()
Expand Down
23 changes: 23 additions & 0 deletions frege/control/pipe/Consumer.fr
@@ -0,0 +1,23 @@
module frege.control.pipe.Consumer where

import Control.monad.trans.MonadTrans (MonadTrans (lift))
import Control.pipe.Coroutine (Coroutine, resume, suspend)

--- An Consumer is a coroutine that uses @(->) a@ as the to-be-suspended functor.
type Consumer a m res = Coroutine ((->) a) m res

consume :: Monad m => Consumer a m a
consume = suspend pure

runConsumer :: Monad m => [a] -> Consumer a m res -> m res
runConsumer xs consumer = do
eitherCo <- resume consumer
case eitherCo of
(Left co) ->
case xs of
[] -> runConsumer [] $ co $ error "No more values to consume."
(x:xs') -> runConsumer xs' $ co x
(Right x) -> pure x

-- test cases see text.qc.ProducerConsumer
-- examples see examples.LineProducerConsumer
23 changes: 23 additions & 0 deletions frege/control/pipe/Coroutine.fr
@@ -0,0 +1,23 @@
--- Coroutine as layed out in M. Blazevic, “Coroutine pipelines,” The Monad Reader, vol. 19, pp. 29–50, 2011.

module frege.control.pipe.Coroutine where

import Control.monad.trans.MonadTrans

--- Coroutines are build for a to-be-suspended functor @suspendFun@, a monad @m@, and a result type @res@.
data Coroutine suspendFun m res = Coroutine
{ resume :: m (Either (suspendFun (Coroutine suspendFun m res)) res)
}

--- Monad instance for the Coroutine, generic in the result type
instance (Functor suspendFun, Monad m) => Monad (Coroutine suspendFun m) where
pure x = Coroutine $ pure $ Right x
co >>= f =
Coroutine( Coroutine.resume co >>= either (pure . Left . fmap (>>= f)) (Coroutine.resume . f))

--- Monad transformer instance that allows to interleave the to-be-suspended functor with effects from a monad over the result type
instance (Functor suspendFun) => MonadTrans (Coroutine suspendFun) where
lift = Coroutine . liftM Right

suspend :: (Functor suspendFun, Monad m) => suspendFun (Coroutine suspendFun m res) -> Coroutine suspendFun m res
suspend suspendFun = Coroutine $ pure $ Left suspendFun
51 changes: 51 additions & 0 deletions frege/control/pipe/Pipe.fr
@@ -0,0 +1,51 @@
module frege.control.pipe.Pipe where

import Control.pipe.Producer (produce, endWith, Producer )
import Control.pipe.Consumer (consume, Consumer )
import Control.pipe.Coroutine ( Coroutine(resume) )
import Control.monad.trans.MonadTrans ( lift )

bindM2 :: Monad m => (a -> b -> m c) -> m a -> m b -> m c
bindM2 f ma mb = do
a <- ma
b <- mb
f a b

-- todo: consider removing this "partial" function.
pipe :: Monad m => Producer a m x -> Consumer a m y -> m (x, y)
pipe producer consumer = bindM2 go (resume producer) (resume consumer) where
go (Left (x, prod)) (Left consume) = pipe prod (consume x)
go (Right x) (Right y) = pure (x, y)
go (Left (x, prod)) (Right y) = pipe prod $ return y
go (Right x) (Left consume) = error "The consumer has nothing to consume."

pipe2 :: Monad m => Producer a m x -> Consumer (Maybe a) m y -> m (x, y)
pipe2 producer consumer = bindM2 go (resume producer) (resume consumer) where -- when after resume ...
go (Left (x, prod)) (Left consume) = pipe2 prod (consume $ Just x) -- .. both are suspended -> consume and proceed
go (Right x) (Right y) = pure (x, y) -- .. both are done -> finish
go (Left (x, prod)) (Right y) = pipe2 prod $ return y -- .. more to do -> proceed
go (Right x) (Left consume) = pipe2 (return x) (consume Nothing) -- .. consumer waits but Nothing to do


-- -- -- Usage -- -- -- todo: consider making this a test case

produceInts :: Int -> Int -> Producer Int IO ()
produceInts start end | start == end = do endWith ()
| otherwise = do
produce start
produceInts (start + 1) end

printEachLine :: Show a => Consumer (Maybe a) IO ()
printEachLine = do
line <- consume
case line of
(Just x) -> do
lift $ println $ show x
printEachLine
(Nothing) -> pure ()

main :: IO ()
main =
do
_ <- pipe2 (produceInts 0 10) printEachLine
pure ()
24 changes: 24 additions & 0 deletions frege/control/pipe/Producer.fr
@@ -0,0 +1,24 @@

module frege.control.pipe.Producer where

import Control.pipe.Coroutine (Coroutine, resume, suspend)

--- A Producer is a coroutine that uses @(,) a@ as the to-be-suspended functor.
type Producer a m res = Coroutine ((,) a) m res

runProducer :: Monad m => Producer a m res -> m ([a], res)
runProducer = go id
where
go f gen = do
eitherGen <- resume gen
case eitherGen of
(Left (x, newGen)) -> go (f . (x:)) newGen
(Right x) -> pure (f [], x)

produce :: Monad m => a -> Producer a m ()
produce a = suspend (a, pure ())

endWith x = pure x

-- test cases see text.qc.ProducerConsumer
-- examples see examples.LineProducerConsumer

0 comments on commit e226dba

Please sign in to comment.