Skip to content

Commit

Permalink
Merge pull request #110 from AlexeyRaga/sqs-lazy-messages
Browse files Browse the repository at this point in the history
SQS: Read messages lazily
  • Loading branch information
AlexeyRaga committed Oct 22, 2020
2 parents aaa9728 + 4593c03 commit 86ad3df
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 13 deletions.
2 changes: 1 addition & 1 deletion antiope-athena/antiope-athena.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-athena
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
2 changes: 1 addition & 1 deletion antiope-contract/antiope-contract.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-contract
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
2 changes: 1 addition & 1 deletion antiope-core/antiope-core.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-core
version: 7.5.2
version: 7.5.3
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
category: Services
Expand Down
2 changes: 1 addition & 1 deletion antiope-dynamodb/antiope-dynamodb.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-dynamodb
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
2 changes: 1 addition & 1 deletion antiope-es/antiope-es.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-es
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
2 changes: 1 addition & 1 deletion antiope-messages/antiope-messages.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-messages
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-optparse-applicative
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
2 changes: 1 addition & 1 deletion antiope-s3/antiope-s3.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-s3
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
2 changes: 1 addition & 1 deletion antiope-shell/antiope-shell.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-shell
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
2 changes: 1 addition & 1 deletion antiope-sns/antiope-sns.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-sns
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
2 changes: 1 addition & 1 deletion antiope-sqs/antiope-sqs.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4

name: antiope-sqs
version: 7.5.2
version: 7.5.3
synopsis: Please see the README on Github at <https://github.com/arbor/antiope#readme>
description: Please see the README on Github at <https://github.com/arbor/antiope#readme>.
category: Services
Expand Down
40 changes: 38 additions & 2 deletions antiope-sqs/src/Antiope/SQS.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module Antiope.SQS
, keepAliveMessage
, forAllMessages
, forAllMessages'
, lazyReadAllMessages
, lazyReadAllMessages'
, defaultReceiveMessage

-- * Re-exports
Expand All @@ -23,7 +25,7 @@ module Antiope.SQS

import Control.Lens
import Control.Monad (forM, forM_, join, void)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.IO.Unlift (MonadUnliftIO, askUnliftIO, liftIO, unliftIO)
import Control.Monad.Loops (unfoldWhileM)
import Control.Monad.Trans (lift)
import Data.Coerce (coerce)
Expand All @@ -37,7 +39,8 @@ import Network.AWS.SQS

import Antiope.SQS.Types

import qualified Network.AWS as AWS
import qualified Network.AWS as AWS
import qualified System.IO.Unsafe as IO

defaultReceiveMessage :: QueueUrl -> ReceiveMessage
defaultReceiveMessage (QueueUrl url)
Expand Down Expand Up @@ -139,6 +142,39 @@ forAllMessages' :: (MonadUnliftIO m, HasEnv env)
forAllMessages' env recMsg mode process =
forAllMessages'' env recMsg mode (const process)

-- | Reads all messages from the queue into a LAZY list.
-- This function will read the provided SQS queue one batch at a time
-- as the list is consumed.
--
-- Use 'lazyReadAllMessages'' to control the batch size.
lazyReadAllMessages :: (MonadUnliftIO m, HasEnv env)
=> env
-> QueueUrl
-> ConsumerMode
-> m [Message]
lazyReadAllMessages env queue mode =
lazyReadAllMessages' env (defaultReceiveMessage queue) mode

-- | Reads all messages from the queue into a LAZY list.
-- This function will read the provided SQS queue one batch at a time as
-- as the list is consumed.
lazyReadAllMessages' :: (MonadUnliftIO m, HasEnv env)
=> env
-> ReceiveMessage
-> ConsumerMode
-> m [Message]
lazyReadAllMessages' env recMsg mode = do
u <- askUnliftIO
liftIO $ IO.unsafeInterleaveIO (go u)
where
go u = do
msgs <- unliftIO u (runResourceT $ runAWS env (readQueue' recMsg))
case (mode, msgs) of
(Drain, []) -> pure []
_ -> do
rest <- IO.unsafeInterleaveIO (go u)
pure (msgs ++ rest)

forAllMessages'' :: (MonadUnliftIO m, HasEnv env)
=> env
-> ReceiveMessage
Expand Down

0 comments on commit 86ad3df

Please sign in to comment.