/
LineProducerConsumer.fr
244 lines (200 loc) · 9.17 KB
/
LineProducerConsumer.fr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
{--
Example module to show the use of producer-consumer piping by putting it in various contexts like
file, url, and command line processing but also pure in-memory operations.
-}
module examples.LineProducerConsumer where
import Control.pipe.Producer (produce, endWith, Producer, runProducer )
import Control.pipe.Consumer (consume, Consumer, runConsumer )
import Control.monad.trans.MonadTrans (MonadTrans (lift))
import Control.pipe.Pipe (pipe2)
import Java.IO (openReader)
import Java.Net (URL, MalformedURLException)
data StrGenData = StrGen {
line :: String, -- the currently generated line, "onLine" will be called with this
nextLine :: String -> String -- the generation function for the next line
}
type StringProducer = Ref StrGenData -- a mutable reference to the Producer data
data Person = Person {
name :: String,
birthyear :: Int
}
producePerson :: Producer Person IO ()
producePerson =
do
produce $ Person "Leonard Euler" 1707
produce $ Person "Carl Friedrich Gauss" 1777
produce $ Person "Gottlob Frege" 1848
endWith ()
produceLinesFromStringReader :: Producer String (ST s) ()
produceLinesFromStringReader =
do
stringReader <- lift $ StringReader.new $ unlines ["first","second","third"]
bufferedReader <- lift $ BufferedReader.new stringReader
produceLinesFromBufferedReader bufferedReader
produceLinesFromUrl :: String -> Producer String IO ()
produceLinesFromUrl url =
do
xurl <- lift $ URL.new url
uis <- lift $ URL.openStream xurl
isr <- lift $ InputStreamReader.new uis "UTF-8"
bufferedReader <- lift $ BufferedReader.new isr
produceLinesFromBufferedReader bufferedReader
produceLinesFromStrGenData :: Int -> Mutable s StringProducer -> Producer String (ST s) ()
produceLinesFromStrGenData n _ | n == 0 = pure ()
produceLinesFromStrGenData n strGen | otherwise =
do
state <- lift $ strGen.get
produce state.line
lift $ strGen.modify _.{line <- state.nextLine}
produceLinesFromStrGenData (n - 1) strGen
countLinesConsumer :: Show a => Int -> Consumer (Maybe a) (ST s) Int
countLinesConsumer count =
do
line <- consume
case line of
(Just l) ->
do
countLinesConsumer (count + 1)
(Nothing) -> pure count
oneLineConsumer :: Show a => Consumer (Maybe a) IO (Maybe a)
oneLineConsumer =
do
line <- consume
pure line
nLineCountAndPrintConsumer :: Show a => Int -> Int -> Consumer (Maybe a) IO Int
nLineCountAndPrintConsumer n count | n == count = pure count
nLineCountAndPrintConsumer n count | otherwise =
do
line <- consume
case line of
(Just l) ->
do
lift $ println $ show (count + 1) ++ ": " ++ (show l)
nLineCountAndPrintConsumer n (count + 1)
(Nothing) -> pure count
personConsumer :: Consumer (Maybe Person) IO ()
personConsumer =
do
maybePerson <- consume
case maybePerson of
(Just p) ->
do
lift $ println $ "Processing " ++ p.name
personConsumer
(Nothing) -> pure ()
prepareTestFile :: IO File
prepareTestFile = do
testFile = File.new "TestFile.txt"
println "----------------------------------------"
println "general file handling"
-- delete test file if it already existed
result <- testFile.delete
println $ "Test file deleted to clean up before start: " ++ show result
println "create test file with three lines"
writeFile testFile.getPath $ unlines ["first line","second line","third line"]
result <- testFile.exists
println $ "File now exists: " ++ show result
println "read test file in toto"
content <- readFile testFile.getPath
println "file content was:"
println content
println "append 2 lines to show that we can write"
appendFile testFile.getPath $ unlines ["fourth line","fifth line"]
return testFile
-- Basic support
--- A producer of Strings (lines) that runs in an (ST s) Monad (usually IO) and returns no result.
--- Used with many Reader types that a BufferedReader can decorate.
produceLinesFromBufferedReader :: Mutable s BufferedReader -> Producer String (ST s) ()
produceLinesFromBufferedReader bufferedReader = do
maybeLine <- lift $ bufferedReader.readLine -- lift into the Producer context monad
case maybeLine of
Just line -> do -- successful read
produce line -- produce the line, yield to consumer if any
produceLinesFromBufferedReader bufferedReader -- proceed with next line
Nothing -> do -- no more lines
lift $ bufferedReader.close -- cleanup
endWith () -- signal end of processing
-- use case 1:
processNamedFile :: String -> IO ()
processNamedFile filePath = do
println "----------------------------------------"
println "processing a named file line-by-line with a buffered reader, while keeping track of line numbers"
(end, count) <- pipe2 (produceLinesFromFilePath filePath) (countAndPrintLinesConsumer 0)
println $ "total number of lines: " ++ show count
println $ "producer end value is (): " ++ show (end == ())
produceLinesFromFilePath :: String -> Producer String IO ()
produceLinesFromFilePath filePath = do
bufferedReader <- lift $ openReader filePath -- get a buffered reader in the Producer context, now IO specific
produceLinesFromBufferedReader bufferedReader -- simple delegate, nothing specific to do
countAndPrintLinesConsumer :: Show a => Int -> Consumer (Maybe a) IO Int
countAndPrintLinesConsumer count = do
maybeLine <- consume -- try to consume
case maybeLine of
Just line -> do -- on success
lift $ println $ show (count + 1) ++ ": " ++ show line -- work in the Consumer context monad (IO) and lift
countAndPrintLinesConsumer (count + 1) -- proceed with next consumption
Nothing -> pure count -- on end, return result in Consumer context monad
-- use case 2:
fullyProcessFile :: String -> IO ()
fullyProcessFile filePath = do
println "----------------------------------------"
println "processing all lines from a file, pushing each line to a list"
println "without consumer and pipe, we can run the same producer to get the full result"
(list, _) <- runProducer (produceLinesFromFilePath filePath)
println $ "total result list: " ++ show list
main :: IO ()
main =
do
testFile <- prepareTestFile
processNamedFile testFile.getPath
fullyProcessFile testFile.getPath
println "----------------------------------------"
println "reading only one line (a header for example)"
(_, maybeHeader) <- pipe2 (produceLinesFromFilePath testFile.getPath) oneLineConsumer
case maybeHeader of
(Just header) -> println $ "the header line is: " ++ show header
(Nothing) -> println $ "The file is empty"
println "-----------Version 2 of processing just one line --------------------"
println "reading only one line (a header for example)"
(_, maybeHeader) <- pipe2 (produceLinesFromFilePath testFile.getPath) (consume >>= pure)
case maybeHeader of
(Just header) -> println $ "the header line is: " ++ show header
(Nothing) -> println $ "The file is empty"
println "----------------------------------------"
println "processing each line with a non-IO impure reader, here: StringReader. (great for testing)"
numLines =
do
(_, res) <- pipe2 (produceLinesFromStringReader) (countLinesConsumer 0)
pure res
println $ "processing strings with StringReader works as expected: " ++ show (3 == numLines.run)
println "----------------------------------------"
println "reading from a URL: http://google.com"
(_, urlLinesCount) <- pipe2 (produceLinesFromUrl "http://google.com") (nLineCountAndPrintConsumer 4 0)
println $ "processing strings from URL works as expected: " ++ show (urlLinesCount == 4)
println "----------------------------------------"
strGenLineCount =
do
strGenRef <- Ref.new $ StrGen { line = "", nextLine = (++ "x") }
(_, res) <- pipe2 (produceLinesFromStrGenData 4 strGenRef) (countLinesConsumer 0)
pure res
println $ "processing strings from String Producer works as expected: " ++ show (strGenLineCount.run == 4)
println "----------------------------------------"
gen =
do
produce 1
produce 2
endWith 42
iter =
do
lift $ print "Enter two numbers: "
a <- consume
b <- consume
lift $ println $ "Sum is " ++ show (a + b)
(xs, res) <- runProducer gen
println $ show xs ++ " " ++ show res
println $ "processing producers and consumers independently without the pipe2 function"
runConsumer xs iter
println "----------------------------------------"
(_, _) <- pipe2 (producePerson) (personConsumer)
println "processing custom data type Person works"
println "FIN"