From e226dbad4f764b50b677f1a64175f129739439a1 Mon Sep 17 00:00:00 2001 From: Dierk Koenig Date: Tue, 18 May 2021 00:26:22 +0200 Subject: [PATCH] working producer-consumer version but needs more docs and cleanup, commit is for backup purposes --- examples/LineProducerConsumer.fr | 244 +++++++++++++++++++++++++++++++ frege/StandardLibrary.fr | 5 + frege/control/pipe/Consumer.fr | 23 +++ frege/control/pipe/Coroutine.fr | 23 +++ frege/control/pipe/Pipe.fr | 51 +++++++ frege/control/pipe/Producer.fr | 24 +++ frege/java/IO.fr | 34 +++-- tests/qc/ProducerConsumer.fr | 39 +++++ 8 files changed, 434 insertions(+), 9 deletions(-) create mode 100644 examples/LineProducerConsumer.fr create mode 100644 frege/control/pipe/Consumer.fr create mode 100644 frege/control/pipe/Coroutine.fr create mode 100644 frege/control/pipe/Pipe.fr create mode 100644 frege/control/pipe/Producer.fr create mode 100644 tests/qc/ProducerConsumer.fr diff --git a/examples/LineProducerConsumer.fr b/examples/LineProducerConsumer.fr new file mode 100644 index 00000000..6e65ac22 --- /dev/null +++ b/examples/LineProducerConsumer.fr @@ -0,0 +1,244 @@ +{-- + Example module to show the use of producer-consumer piping by putting it in various contexts like + file, url, and command line processing but also pure in-memory operations. +-} + +module examples.LineProducerConsumer where + +import Control.pipe.Producer (produce, endWith, Producer, runProducer ) +import Control.pipe.Consumer (consume, Consumer, runConsumer ) +import Control.monad.trans.MonadTrans (MonadTrans (lift)) +import Control.pipe.Pipe (pipe2) +import Java.IO (openReader) +import Java.Net (URL, MalformedURLException) + +data StrGenData = StrGen { + line :: String, -- the currently generated line, "onLine" will be called with this + nextLine :: String -> String -- the generation function for the next line +} + +type StringProducer = Ref StrGenData -- a mutable reference to the Producer data + +data Person = Person { + name :: String, + birthyear :: Int +} + +producePerson :: Producer Person IO () +producePerson = + do + produce $ Person "Leonard Euler" 1707 + produce $ Person "Carl Friedrich Gauss" 1777 + produce $ Person "Gottlob Frege" 1848 + endWith () + + + + + +produceLinesFromStringReader :: Producer String (ST s) () +produceLinesFromStringReader = + do + stringReader <- lift $ StringReader.new $ unlines ["first","second","third"] + bufferedReader <- lift $ BufferedReader.new stringReader + produceLinesFromBufferedReader bufferedReader + + +produceLinesFromUrl :: String -> Producer String IO () +produceLinesFromUrl url = + do + xurl <- lift $ URL.new url + uis <- lift $ URL.openStream xurl + isr <- lift $ InputStreamReader.new uis "UTF-8" + bufferedReader <- lift $ BufferedReader.new isr + produceLinesFromBufferedReader bufferedReader + +produceLinesFromStrGenData :: Int -> Mutable s StringProducer -> Producer String (ST s) () +produceLinesFromStrGenData n _ | n == 0 = pure () +produceLinesFromStrGenData n strGen | otherwise = + do + state <- lift $ strGen.get + produce state.line + lift $ strGen.modify _.{line <- state.nextLine} + produceLinesFromStrGenData (n - 1) strGen + +countLinesConsumer :: Show a => Int -> Consumer (Maybe a) (ST s) Int +countLinesConsumer count = + do + line <- consume + case line of + (Just l) -> + do + countLinesConsumer (count + 1) + (Nothing) -> pure count + + + +oneLineConsumer :: Show a => Consumer (Maybe a) IO (Maybe a) +oneLineConsumer = + do + line <- consume + pure line + +nLineCountAndPrintConsumer :: Show a => Int -> Int -> Consumer (Maybe a) IO Int +nLineCountAndPrintConsumer n count | n == count = pure count +nLineCountAndPrintConsumer n count | otherwise = + do + line <- consume + case line of + (Just l) -> + do + lift $ println $ show (count + 1) ++ ": " ++ (show l) + nLineCountAndPrintConsumer n (count + 1) + (Nothing) -> pure count + +personConsumer :: Consumer (Maybe Person) IO () +personConsumer = + do + maybePerson <- consume + case maybePerson of + (Just p) -> + do + lift $ println $ "Processing " ++ p.name + personConsumer + (Nothing) -> pure () + + +prepareTestFile :: IO File +prepareTestFile = do + testFile = File.new "TestFile.txt" + println "----------------------------------------" + println "general file handling" + -- delete test file if it already existed + result <- testFile.delete + println $ "Test file deleted to clean up before start: " ++ show result + println "create test file with three lines" + writeFile testFile.getPath $ unlines ["first line","second line","third line"] + result <- testFile.exists + println $ "File now exists: " ++ show result + println "read test file in toto" + content <- readFile testFile.getPath + println "file content was:" + println content + println "append 2 lines to show that we can write" + appendFile testFile.getPath $ unlines ["fourth line","fifth line"] + return testFile + +-- Basic support + +--- A producer of Strings (lines) that runs in an (ST s) Monad (usually IO) and returns no result. +--- Used with many Reader types that a BufferedReader can decorate. +produceLinesFromBufferedReader :: Mutable s BufferedReader -> Producer String (ST s) () +produceLinesFromBufferedReader bufferedReader = do + maybeLine <- lift $ bufferedReader.readLine -- lift into the Producer context monad + case maybeLine of + Just line -> do -- successful read + produce line -- produce the line, yield to consumer if any + produceLinesFromBufferedReader bufferedReader -- proceed with next line + Nothing -> do -- no more lines + lift $ bufferedReader.close -- cleanup + endWith () -- signal end of processing + +-- use case 1: + +processNamedFile :: String -> IO () +processNamedFile filePath = do + println "----------------------------------------" + println "processing a named file line-by-line with a buffered reader, while keeping track of line numbers" + (end, count) <- pipe2 (produceLinesFromFilePath filePath) (countAndPrintLinesConsumer 0) + println $ "total number of lines: " ++ show count + println $ "producer end value is (): " ++ show (end == ()) + +produceLinesFromFilePath :: String -> Producer String IO () +produceLinesFromFilePath filePath = do + bufferedReader <- lift $ openReader filePath -- get a buffered reader in the Producer context, now IO specific + produceLinesFromBufferedReader bufferedReader -- simple delegate, nothing specific to do + +countAndPrintLinesConsumer :: Show a => Int -> Consumer (Maybe a) IO Int +countAndPrintLinesConsumer count = do + maybeLine <- consume -- try to consume + case maybeLine of + Just line -> do -- on success + lift $ println $ show (count + 1) ++ ": " ++ show line -- work in the Consumer context monad (IO) and lift + countAndPrintLinesConsumer (count + 1) -- proceed with next consumption + Nothing -> pure count -- on end, return result in Consumer context monad + +-- use case 2: + +fullyProcessFile :: String -> IO () +fullyProcessFile filePath = do + println "----------------------------------------" + println "processing all lines from a file, pushing each line to a list" + println "without consumer and pipe, we can run the same producer to get the full result" + (list, _) <- runProducer (produceLinesFromFilePath filePath) + println $ "total result list: " ++ show list + + + +main :: IO () +main = + do + testFile <- prepareTestFile + + processNamedFile testFile.getPath + + fullyProcessFile testFile.getPath + + println "----------------------------------------" + println "reading only one line (a header for example)" + (_, maybeHeader) <- pipe2 (produceLinesFromFilePath testFile.getPath) oneLineConsumer + case maybeHeader of + (Just header) -> println $ "the header line is: " ++ show header + (Nothing) -> println $ "The file is empty" + + println "-----------Version 2 of processing just one line --------------------" + println "reading only one line (a header for example)" + (_, maybeHeader) <- pipe2 (produceLinesFromFilePath testFile.getPath) (consume >>= pure) + case maybeHeader of + (Just header) -> println $ "the header line is: " ++ show header + (Nothing) -> println $ "The file is empty" + + println "----------------------------------------" + println "processing each line with a non-IO impure reader, here: StringReader. (great for testing)" + numLines = + do + (_, res) <- pipe2 (produceLinesFromStringReader) (countLinesConsumer 0) + pure res + println $ "processing strings with StringReader works as expected: " ++ show (3 == numLines.run) + + + println "----------------------------------------" + println "reading from a URL: http://google.com" + (_, urlLinesCount) <- pipe2 (produceLinesFromUrl "http://google.com") (nLineCountAndPrintConsumer 4 0) + println $ "processing strings from URL works as expected: " ++ show (urlLinesCount == 4) + + + println "----------------------------------------" + strGenLineCount = + do + strGenRef <- Ref.new $ StrGen { line = "", nextLine = (++ "x") } + (_, res) <- pipe2 (produceLinesFromStrGenData 4 strGenRef) (countLinesConsumer 0) + pure res + println $ "processing strings from String Producer works as expected: " ++ show (strGenLineCount.run == 4) + + println "----------------------------------------" + gen = + do + produce 1 + produce 2 + endWith 42 + iter = + do + lift $ print "Enter two numbers: " + a <- consume + b <- consume + lift $ println $ "Sum is " ++ show (a + b) + (xs, res) <- runProducer gen + println $ show xs ++ " " ++ show res + println $ "processing producers and consumers independently without the pipe2 function" + runConsumer xs iter + + println "----------------------------------------" + (_, _) <- pipe2 (producePerson) (personConsumer) + println "processing custom data type Person works" + println "FIN" diff --git a/frege/StandardLibrary.fr b/frege/StandardLibrary.fr index 5e82ed7c..8cd926b2 100644 --- a/frege/StandardLibrary.fr +++ b/frege/StandardLibrary.fr @@ -24,6 +24,11 @@ import Control.monad.trans.MonadTrans public() import Control.concurrent.STM public() +import Control.pipe.Coroutine public() +import Control.pipe.Producer public() +import Control.pipe.Consumer public() +import Control.pipe.Pipe public() + import Data.Bits public() import Data.Dec64 public() import Data.Char public() diff --git a/frege/control/pipe/Consumer.fr b/frege/control/pipe/Consumer.fr new file mode 100644 index 00000000..c61462c3 --- /dev/null +++ b/frege/control/pipe/Consumer.fr @@ -0,0 +1,23 @@ +module frege.control.pipe.Consumer where + +import Control.monad.trans.MonadTrans (MonadTrans (lift)) +import Control.pipe.Coroutine (Coroutine, resume, suspend) + +--- An Consumer is a coroutine that uses @(->) a@ as the to-be-suspended functor. +type Consumer a m res = Coroutine ((->) a) m res + +consume :: Monad m => Consumer a m a +consume = suspend pure + +runConsumer :: Monad m => [a] -> Consumer a m res -> m res +runConsumer xs consumer = do + eitherCo <- resume consumer + case eitherCo of + (Left co) -> + case xs of + [] -> runConsumer [] $ co $ error "No more values to consume." + (x:xs') -> runConsumer xs' $ co x + (Right x) -> pure x + +-- test cases see text.qc.ProducerConsumer +-- examples see examples.LineProducerConsumer diff --git a/frege/control/pipe/Coroutine.fr b/frege/control/pipe/Coroutine.fr new file mode 100644 index 00000000..73373314 --- /dev/null +++ b/frege/control/pipe/Coroutine.fr @@ -0,0 +1,23 @@ +--- Coroutine as layed out in M. Blazevic, “Coroutine pipelines,” The Monad Reader, vol. 19, pp. 29–50, 2011. + +module frege.control.pipe.Coroutine where + +import Control.monad.trans.MonadTrans + +--- Coroutines are build for a to-be-suspended functor @suspendFun@, a monad @m@, and a result type @res@. +data Coroutine suspendFun m res = Coroutine + { resume :: m (Either (suspendFun (Coroutine suspendFun m res)) res) + } + +--- Monad instance for the Coroutine, generic in the result type +instance (Functor suspendFun, Monad m) => Monad (Coroutine suspendFun m) where + pure x = Coroutine $ pure $ Right x + co >>= f = + Coroutine( Coroutine.resume co >>= either (pure . Left . fmap (>>= f)) (Coroutine.resume . f)) + +--- Monad transformer instance that allows to interleave the to-be-suspended functor with effects from a monad over the result type +instance (Functor suspendFun) => MonadTrans (Coroutine suspendFun) where + lift = Coroutine . liftM Right + +suspend :: (Functor suspendFun, Monad m) => suspendFun (Coroutine suspendFun m res) -> Coroutine suspendFun m res +suspend suspendFun = Coroutine $ pure $ Left suspendFun diff --git a/frege/control/pipe/Pipe.fr b/frege/control/pipe/Pipe.fr new file mode 100644 index 00000000..54bfe0af --- /dev/null +++ b/frege/control/pipe/Pipe.fr @@ -0,0 +1,51 @@ +module frege.control.pipe.Pipe where + +import Control.pipe.Producer (produce, endWith, Producer ) +import Control.pipe.Consumer (consume, Consumer ) +import Control.pipe.Coroutine ( Coroutine(resume) ) +import Control.monad.trans.MonadTrans ( lift ) + +bindM2 :: Monad m => (a -> b -> m c) -> m a -> m b -> m c +bindM2 f ma mb = do + a <- ma + b <- mb + f a b + +-- todo: consider removing this "partial" function. +pipe :: Monad m => Producer a m x -> Consumer a m y -> m (x, y) +pipe producer consumer = bindM2 go (resume producer) (resume consumer) where + go (Left (x, prod)) (Left consume) = pipe prod (consume x) + go (Right x) (Right y) = pure (x, y) + go (Left (x, prod)) (Right y) = pipe prod $ return y + go (Right x) (Left consume) = error "The consumer has nothing to consume." + +pipe2 :: Monad m => Producer a m x -> Consumer (Maybe a) m y -> m (x, y) +pipe2 producer consumer = bindM2 go (resume producer) (resume consumer) where -- when after resume ... + go (Left (x, prod)) (Left consume) = pipe2 prod (consume $ Just x) -- .. both are suspended -> consume and proceed + go (Right x) (Right y) = pure (x, y) -- .. both are done -> finish + go (Left (x, prod)) (Right y) = pipe2 prod $ return y -- .. more to do -> proceed + go (Right x) (Left consume) = pipe2 (return x) (consume Nothing) -- .. consumer waits but Nothing to do + + +-- -- -- Usage -- -- -- todo: consider making this a test case + +produceInts :: Int -> Int -> Producer Int IO () +produceInts start end | start == end = do endWith () + | otherwise = do + produce start + produceInts (start + 1) end + +printEachLine :: Show a => Consumer (Maybe a) IO () +printEachLine = do + line <- consume + case line of + (Just x) -> do + lift $ println $ show x + printEachLine + (Nothing) -> pure () + +main :: IO () +main = + do + _ <- pipe2 (produceInts 0 10) printEachLine + pure () diff --git a/frege/control/pipe/Producer.fr b/frege/control/pipe/Producer.fr new file mode 100644 index 00000000..0901e8c5 --- /dev/null +++ b/frege/control/pipe/Producer.fr @@ -0,0 +1,24 @@ + +module frege.control.pipe.Producer where + +import Control.pipe.Coroutine (Coroutine, resume, suspend) + +--- A Producer is a coroutine that uses @(,) a@ as the to-be-suspended functor. +type Producer a m res = Coroutine ((,) a) m res + +runProducer :: Monad m => Producer a m res -> m ([a], res) +runProducer = go id + where + go f gen = do + eitherGen <- resume gen + case eitherGen of + (Left (x, newGen)) -> go (f . (x:)) newGen + (Right x) -> pure (f [], x) + +produce :: Monad m => a -> Producer a m () +produce a = suspend (a, pure ()) + +endWith x = pure x + +-- test cases see text.qc.ProducerConsumer +-- examples see examples.LineProducerConsumer diff --git a/frege/java/IO.fr b/frege/java/IO.fr index e485f69a..b0f57075 100644 --- a/frege/java/IO.fr +++ b/frege/java/IO.fr @@ -193,9 +193,9 @@ data InputStream = native java.io.InputStream where native skip :: Mutable s InputStream -> Long -> ST s Long throws IOException data FileInputStream = native java.io.FileInputStream where - native new :: File -> IOMutable FileInputStream + native new :: File -> STMutable s FileInputStream throws FileNotFoundException - | String -> IOMutable FileInputStream + | String -> STMutable s FileInputStream throws FileNotFoundException data Reader = native java.io.Reader where @@ -261,19 +261,35 @@ data BufferedReader = native java.io.BufferedReader where -} getLine :: Mutable s BufferedReader -> ST s String getLine br = readLine br >>= maybe (throwST (EOFException.new "getLine")) return - + + + {-- + Convenience function to open a file and wrap it with a buffered 'Reader' for the named charset. + Charset name is a String like "UTF-8". See @java.nio.charset.Charset@. + + May throw 'FileNotFoundException' + -} +openReaderWithCharset :: String -> String -> IOMutable BufferedReader +openReaderWithCharset charsetName pathName = do + fis <- FileInputStream.new (pathName :: String ) + isr <- InputStreamReader.new fis charsetName + BufferedReader.new isr {-- - Convenience function to open a file and wrap it with an UTF-8 decoding - buffered 'Reader'. + Convenience function to open a file and wrap it with a + buffered 'Reader' for the UTF-8 charset. May throw 'FileNotFoundException' -} openReader :: String -> IOMutable BufferedReader -openReader fileName = do - fis <- FileInputStream.new fileName - isr <- InputStreamReader.new fis "UTF-8" - BufferedReader.new isr +openReader pathName = openReaderWithCharset "UTF-8" pathName + +{-- + A 'Reader' that reads from a String. + Often useful to replace an effectful Reader, e.g. for testing purposes. + -} +data StringReader = native java.io.StringReader where + native new :: String -> STMutable s StringReader {-- Convenience function to open a file for writing through an UTF-8 encoding diff --git a/tests/qc/ProducerConsumer.fr b/tests/qc/ProducerConsumer.fr new file mode 100644 index 00000000..4038ca95 --- /dev/null +++ b/tests/qc/ProducerConsumer.fr @@ -0,0 +1,39 @@ +--- This is an undocumented module +module tests.qc.ProducerConsumer where + +import Control.pipe.Producer +import Control.pipe.Consumer +import Control.pipe.Pipe + +import Control.monad.trans.MonadTrans (lift) + +import Test.QuickCheck + +-- separate producer test + +-- a producer of the numbers that allows consumption in the context Monad "m", ending with a String +produce12 :: (Num n, Monad m) => Producer n m String +produce12 = do + produce 1 -- produce a number + produce 2 + endWith "the end" + +-- for the test case, we use Maybe as the context monad, in real usages this is often IO +producerTestRun = once ( runProducer produce12 == Just ( [1, 2], "the end") ) + +-- separate consumer test + +-- we consume values of type (Maybe n) - since production might fail - as needed for pipe2 +consumeTwiceAndAdd :: (Num n) => Consumer (Maybe n) Maybe n +consumeTwiceAndAdd = do + a <- consume + b <- consume + -- lift $ Just ( unJust a + unJust b) -- using unJust is not nice, better sequence and map + lift $ fmap (foldr1 (+)) (sequence [a,b]) -- note that we must lift the result into the context monad + +consumerTestRun = once ( runConsumer [Just 1, Just 2] consumeTwiceAndAdd == Just 3) + +-- pipe test in combination + +pipeTestRun = once ( pipe2 produce12 consumeTwiceAndAdd == Just ("the end", 3) ) +