Skip to content

Infinite loop? #685

@eddiemundo

Description

@eddiemundo

Hi, I've been using streamly for a bit and it's a really cool library.

I have an application that batches IO actions and executes them asynchronously at various times, and if the IO actions fail then they are put into a retry queue to be retried at the appropriate times. In certain cases this runs into an infinite loop which also quickly eats up memory. I'm pretty sure isn't in my code, although I can't be sure. I've managed to write a smaller dummy example that reproduces the infinite loop and memory eating behaviour:

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Data.Functor (($>))
import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef)
import Data.Time.Clock (getCurrentTime)
import qualified Streamly as S
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream.StreamD.Type as S
import Streamly.Internal.Data.Unfold.Types (Unfold (..))
import Streamly.Prelude ((|:))
import qualified Streamly.Prelude as S

signalAction :: IORef Int -> IORef Bool -> MVar Int -> Int -> IO Int
signalAction inFlightCountRef signalRef mvar d = do
  atomicModifyIORef' inFlightCountRef (\c -> (c + 1, ()))
  before <- getCurrentTime
  print $ ">>> signal " <> show before
  threadDelay d
  atomicModifyIORef' signalRef (\bool -> (True, ()))
  i <- takeMVar mvar
  after <- getCurrentTime
  print $ "<<< signal " <> show after
  atomicModifyIORef' inFlightCountRef (\c -> (c - 1, ()))
  pure i

regularAction :: IORef Int -> Int -> IO Int
regularAction inFlightCountRef d = do
  atomicModifyIORef' inFlightCountRef (\c -> (c + 1, ()))
  before <- getCurrentTime
  print $ ">>> regulr " <> show before
  threadDelay d
  after <- getCurrentTime
  print $ "<<< regulr " <> show after
  atomicModifyIORef' inFlightCountRef (\c -> (c - 1, ()))
  pure 2

stream :: S.IsStream s => IO (s IO Int)
stream = do
  inFlightCountRef <- newIORef 0
  signalRef <- newIORef False
  mvar <- newEmptyMVar
  pure $
    S.concatMapWith
      S.ahead
      id
      ( S.unfold
          ( Unfold
              ( \actions -> do
                  threadDelay $ 10 ^ 6
                  signal <- readIORef signalRef
                  if signal then async (threadDelay (10 ^ 6 + 10 ^ 4) >> putMVar mvar 1) >> atomicModifyIORef' signalRef (\bool -> (False, ())) else pure ()
                  let (currActions, remainingActions) = splitAt (if signal then 2 else 3) actions
                  print $ "action count: " <> show (length currActions)
                  if length currActions == 0
                    then do
                      inFlightCount <- readIORef inFlightCountRef
                      if inFlightCount > 0
                        then pure $ S.Skip actions
                        else pure $ S.Stop
                    else pure $ S.Yield (S.aheadly $ foldr (|:) S.nil currActions) remainingActions
              )
              pure
          )
          [ 
            regularAction inFlightCountRef $ 10 ^ 6 `div` 2,
            regularAction inFlightCountRef (10 ^ 6 + 10 ^ 4),
            regularAction inFlightCountRef (10 ^ 6 `div` 2 * 3),
            regularAction inFlightCountRef $ 10 ^ 6 `div` 2,
            signalAction inFlightCountRef signalRef mvar (10 ^ 6 + 10 ^ 4),
            regularAction inFlightCountRef (10 ^ 6 `div` 2 * 3),
            -- if you comment the below program works
            regularAction inFlightCountRef $ 10 ^ 6 `div` 2,
            regularAction inFlightCountRef (10 ^ 6 + 10 ^ 4),
            regularAction inFlightCountRef (10 ^ 6 `div` 2 * 3),
            regularAction inFlightCountRef $ 10 ^ 6 `div` 2,
            regularAction inFlightCountRef (10 ^ 6 + 10 ^ 4),
            regularAction inFlightCountRef (10 ^ 6 `div` 2 * 3)
          ]
      )

main :: IO ()
main = do
  putStrLn "Hello, Haskell!"
  s <- stream
  (S.fold Fold.toList s) >>= print

How the example works is that the unfold step outputs 3 actions inside an Ahead stream every second. This stream of streams is flattened via concatMapWith and ahead and so from the outside it looks like 3 actions are executed asynchronously every second. When a signalAction is executed, after a delay, it writes to a signal IORef and waits on an MVar. The unfold step will check if the signal IORef has been written and if so asynchronously (after a delay again) write some value to the MVar theoretically unblocking the signal action. Additionally if the signal IORef was written the unfold step will group 1 less action in the stream it outputs.

The delays in the action bodies affect whether the program infinite loops. If they are less than 1 second (10^6 microseconds) then I think everything works fine. However I've set it up so that for every 3 actions, the first action waits about 0.5 seconds, the second action a little more than 1 second, and the third action about 1.5 seconds.

If I shorten the action list to 6 items it works, but 7 items and more it doesn't work.
If I move actions 3-5 to the front it works.
Hopefully I haven't made a silly mistake.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions