Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 31 additions & 30 deletions src/Streaming/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,23 @@ module Streaming.Internal (

) where

import Control.Applicative
import Control.Concurrent (threadDelay)
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Error.Class
import Control.Monad.Morph
import Control.Monad.Reader.Class
import Control.Monad.State.Class
import Control.Monad.Error.Class
import Control.Applicative
import Control.Monad.Trans
import Data.Data (Typeable)
import Data.Function ( on )
import Control.Monad.Morph
import Data.Functor.Classes
import Data.Functor.Compose
import Data.Functor.Sum
import Data.Monoid (Monoid (..))
import Data.Semigroup (Semigroup (..))
import Data.Data (Typeable)
import Prelude hiding (splitAt)
import Data.Functor.Compose
import Data.Functor.Sum
import Data.Functor.Classes
import Control.Concurrent (threadDelay)

{- $stream

The 'Stream' data type is equivalent to @FreeT@ and can represent any effectful
Expand Down Expand Up @@ -234,7 +235,7 @@ instance (Functor f, Monad m) => Monad (Stream f m) where
(>>) = (*>)
{-# INLINE (>>) #-}
-- (>>=) = _bind
-- {-#INLINE (>>=) #-}
-- {-# INLINE (>>=) #-}
--
stream >>= f =
loop stream where
Expand All @@ -245,7 +246,7 @@ instance (Functor f, Monad m) => Monad (Stream f m) where
{-# INLINABLE (>>=) #-}

fail = lift . fail
{-#INLINE fail #-}
{-# INLINE fail #-}


-- _bind
Expand All @@ -258,7 +259,7 @@ instance (Functor f, Monad m) => Monad (Stream f m) where
-- Step fstr -> Step (fmap go fstr)
-- Effect m -> Effect (m >>= \s -> return (go s))
-- Return r -> f r
-- {-#INLINABLE _bind #-}
-- {-# INLINABLE _bind #-}
--
-- see https://github.com/Gabriel439/Haskell-Pipes-Library/pull/163
-- for a plan to delay inlining and manage interaction with other operations.
Expand Down Expand Up @@ -294,21 +295,21 @@ instance (Functor f, Monad m) => Applicative (Stream f m) where
-}
instance (Applicative f, Monad m) => Alternative (Stream f m) where
empty = never
{-#INLINE empty #-}
{-# INLINE empty #-}

str <|> str' = zipsWith' liftA2 str str'
{-#INLINE (<|>) #-}
{-# INLINE (<|>) #-}

instance (Functor f, Monad m, Semigroup w) => Semigroup (Stream f m w) where
a <> b = a >>= \w -> fmap (w <>) b
{-#INLINE (<>) #-}
{-# INLINE (<>) #-}

instance (Functor f, Monad m, Monoid w) => Monoid (Stream f m w) where
mempty = return mempty
{-#INLINE mempty #-}
{-# INLINE mempty #-}
#if !(MIN_VERSION_base(4,11,0))
mappend a b = a >>= \w -> fmap (w `mappend`) b
{-#INLINE mappend #-}
{-# INLINE mappend #-}
#endif

instance (Applicative f, Monad m) => MonadPlus (Stream f m) where
Expand Down Expand Up @@ -421,7 +422,7 @@ streamFold
:: (Functor f, Monad m) =>
(r -> b) -> (m b -> b) -> (f b -> b) -> Stream f m r -> b
streamFold done theEffect construct stream = destroy stream construct theEffect done
{-#INLINE streamFold #-}
{-# INLINE streamFold #-}

{- | Reflect a church-encoded stream; cp. @GHC.Exts.build@

Expand Down Expand Up @@ -742,7 +743,7 @@ distribute = loop where
Return r -> lift (Return r)
Effect tmstr -> hoist lift tmstr >>= loop
Step fstr -> join (lift (Step (fmap (Return . loop) fstr)))
{-#INLINABLE distribute #-}
{-# INLINABLE distribute #-}

-- | Repeat a functorial layer (a \"command\" or \"instruction\") forever.
repeats :: (Monad m, Functor f) => f () -> Stream f m r
Expand Down Expand Up @@ -889,7 +890,7 @@ unexposed = Effect . loop where
-}
wrap :: (Monad m, Functor f ) => f (Stream f m r) -> Stream f m r
wrap = Step
{-#INLINE wrap #-}
{-# INLINE wrap #-}


{- | Wrap an effect that returns a stream
Expand All @@ -899,7 +900,7 @@ wrap = Step
-}
effect :: (Monad m, Functor f ) => m (Stream f m r) -> Stream f m r
effect = Effect
{-#INLINE effect #-}
{-# INLINE effect #-}


{-| @yields@ is like @lift@ for items in the streamed functor.
Expand All @@ -916,7 +917,7 @@ effect = Effect

yields :: (Monad m, Functor f) => f r -> Stream f m r
yields fr = Step (fmap Return fr)
{-#INLINE yields #-}
{-# INLINE yields #-}

{-
Note that if the first stream produces Return, we don't inspect
Expand Down Expand Up @@ -1016,7 +1017,7 @@ interleaves = zipsWith' liftA2
-}
switch :: Sum f g r -> Sum g f r
switch s = case s of InL a -> InR a; InR a -> InL a
{-#INLINE switch #-}
{-# INLINE switch #-}



Expand Down Expand Up @@ -1070,7 +1071,7 @@ separate str = destroyExposed
(\x -> case x of InL fss -> wrap fss; InR gss -> effect (yields gss))
(effect . lift)
return
{-#INLINABLE separate #-}
{-# INLINABLE separate #-}



Expand All @@ -1080,7 +1081,7 @@ unseparate str = destroyExposed
(wrap . InL)
(join . maps InR)
return
{-#INLINABLE unseparate #-}
{-# INLINABLE unseparate #-}

-- | If 'Of' had a @Comonad@ instance, then we'd have
--
Expand Down Expand Up @@ -1119,7 +1120,7 @@ unzips str = destroyExposed
(\(Compose fgstr) -> Step (fmap (Effect . yields) fgstr))
(Effect . lift)
return
{-#INLINABLE unzips #-}
{-# INLINABLE unzips #-}

{-| Group layers in an alternating stream into adjoining sub-streams
of one type or another.
Expand Down Expand Up @@ -1156,7 +1157,7 @@ groups = loop
Left r -> return (return r)
Right (InL fstr) -> return (wrap (InL fstr))
Right (InR gstr) -> wrap (fmap go gstr)
{-#INLINABLE groups #-}
{-# INLINABLE groups #-}

-- groupInL :: (Monad m, Functor f, Functor g)
-- => Stream (Sum f g) m r
Expand Down Expand Up @@ -1249,14 +1250,14 @@ never :: (Monad m, Applicative f) => Stream f m r
-- The Monad m constraint should really be an Applicative one,
-- but we still support old versions of base.
never = let loop = Step $ pure (Effect (return loop)) in loop
{-#INLINABLE never #-}
{-# INLINABLE never #-}


delays :: (MonadIO m, Applicative f) => Double -> Stream f m r
delays seconds = loop where
loop = Effect $ liftIO (threadDelay delay) >> return (Step (pure loop))
delay = fromInteger (truncate (1000000 * seconds))
{-#INLINABLE delays #-}
{-# INLINABLE delays #-}

-- {-| Permit streamed actions to proceed unless the clock has run out.
--
Expand All @@ -1275,7 +1276,7 @@ delays seconds = loop where
-- loop str
-- where
-- cutoff = fromInteger (truncate (1000000000 * seconds))
-- {-#INLINABLE period #-}
-- {-# INLINABLE period #-}
--
--
-- {-| Divide a succession of phases according to a specified time interval. If time runs out
Expand Down
Loading