Skip to content

Commit

Permalink
Initial commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
acowley committed Mar 9, 2014
0 parents commit f003acf
Show file tree
Hide file tree
Showing 13 changed files with 847 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/cabal.sandbox.config
dist
.cabal-sandbox
30 changes: 30 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Copyright (c) 2014, Anthony Cowley

All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.

* Neither the name of Anthony Cowley nor the names of other
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2 changes: 2 additions & 0 deletions Setup.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import Distribution.Simple
main = defaultMain
38 changes: 38 additions & 0 deletions concurrent-machines.cabal
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: concurrent-machines
version: 0.1.0.0
synopsis: Concurrent networked stream transducers
-- description:
license: BSD3
license-file: LICENSE
author: Anthony Cowley
maintainer: acowley@gmail.com
copyright: Copyright (C) 2014 Anthony Cowley
category: Concurrency, Control
build-type: Simple
-- extra-source-files:
cabal-version: >=1.10

library
exposed-modules: Data.Machine.Concurrent,
Data.Machine.Fanout,
Data.Machine.Regulated,
Data.Machine.Concurrent.AsyncStep,
Data.Machine.Concurrent.Buffer,
Data.Machine.Concurrent.Scatter,
Data.Machine.Concurrent.Tee,
Data.Machine.Concurrent.Wye
-- other-modules:
other-extensions: GADTs, FlexibleContexts, RankNTypes, TupleSections,
ScopedTypeVariables
build-depends: base >= 4.6 && < 5,
monad-control >= 0.3 && < 0.4,
transformers >= 0.3 && < 0.4,
time >= 1.4 && < 1.5,
containers >= 0.5 && < 0.6,
transformers-base >= 0.4 && < 0.5,
machines >= 0.2.3.1 && < 0.3,
async >= 2.0.1 && < 2.1,
lifted-async >= 0.1 && < 0.2,
semigroups >= 0.8 && < 0.13
hs-source-dirs: src
default-language: Haskell2010
68 changes: 68 additions & 0 deletions src/Data/Machine/Concurrent.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{-# LANGUAGE GADTs, FlexibleContexts, RankNTypes, TupleSections #-}
-- | The primary use of concurrent machines is to establish a
-- pipelined architecture that can boost overall throughput by running
-- each stage of the pipeline at the same time. The processing, or
-- production, rate of each stage may not be identical, so facilities
-- are provided to loosen the temporal coupling between pipeline
-- stages using buffers.
--
-- This architecture also lends itself to operations where multiple
-- workers are available for procesisng inputs. If each worker is to
-- process the same set of inputs, consider 'fanout' and
-- 'fanoutSteps'. If each worker is to process a disjoint set of
-- inputs, consider 'scatter'.
module Data.Machine.Concurrent (module Data.Machine,
-- * Concurrent connection
(>~>), (<~<),
-- * Buffered machines
buffer, rolling,
bufferConnect, rollingConnect,
-- * Concurrent processing of shared inputs
fanout, fanoutSteps,
-- * Concurrent multiple-input machines
wye, tee, scatter, splitSum, mergeSum,
splitProd) where
import Control.Concurrent.Async.Lifted
import Control.Monad.Trans.Control
import Data.Machine hiding (tee, wye)
import Data.Machine.Concurrent.AsyncStep
import Data.Machine.Concurrent.Buffer
import Data.Machine.Concurrent.Fanout
import Data.Machine.Concurrent.Scatter
import Data.Machine.Concurrent.Wye
import Data.Machine.Concurrent.Tee

-- | Build a new 'Machine' by adding a 'Process' to the output of an
-- old 'Machine'. The upstream machine is run concurrently with
-- downstream with the aim that upstream will have a yielded value
-- ready as soon as downstream awaits. This effectively creates a
-- buffer between upstream and downstream, or source and sink, that
-- can contain up to one value.
--
-- @
-- ('<~<') :: 'Process' b c -> 'Process' a b -> 'Process' a c
-- ('<~<') :: 'Process' c d -> 'Data.Machine.Tee.Tee' a b c -> 'Data.Machine.Tee.Tee' a b d
-- ('<~<') :: 'Process' b c -> 'Machine' k b -> 'Machine' k c
-- @
(<~<) :: MonadBaseControl IO m
=> ProcessT m b c -> MachineT m k b -> MachineT m k c
mp <~< ma = MachineT $ asyncRun ma >>= go mp . Just
where go :: MonadBaseControl IO m
=> ProcessT m b c
-> Maybe (Async (StM m (MachineStep m k b)))
-> m (MachineStep m k c)
go snk src = runMachineT snk >>= \v -> case v of
Stop -> return Stop
Yield o k -> return . Yield o . MachineT $ go k src
Await f Refl ff -> maybe (return Stop) wait src >>= \u -> case u of
Stop -> go ff Nothing
Yield o k -> async (runMachineT k) >>= go (f o) . Just
Await g kg fg ->
asyncAwait g kg fg $ MachineT . go (encased v) . Just

-- | Flipped ('<~<').
(>~>) :: MonadBaseControl IO m
=> MachineT m k b -> ProcessT m b c -> MachineT m k c
ma >~> mp = mp <~< ma

infixl 7 >~>
54 changes: 54 additions & 0 deletions src/Data/Machine/Concurrent/AsyncStep.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{-# LANGUAGE FlexibleContexts, GADTs, RankNTypes #-}
module Data.Machine.Concurrent.AsyncStep where
import Control.Concurrent.Async.Lifted (Async, async, wait)
import Control.Monad.Trans.Control (MonadBaseControl, StM)
import Data.Machine

type MachineStep m k o = Step k o (MachineT m k o)
type AsyncStep m k o = Async (StM m (MachineStep m k o))

-- | Build an 'Await' step given a continuation that provides
-- subsequent steps. @awaitStep f sel ff k@ is like applying the
-- 'Await' constructor directly, but the continuation @k@ is used to
-- continue the machine.
--
-- @awaitStep f sel ff k = Await (k . f) sel (k ff)@
awaitStep :: (a -> d) -> k' a -> d -> (d -> r) -> Step k' b r
awaitStep f sel ff k = Await (k . f) sel (k ff)

asyncRun :: MonadBaseControl IO m => MachineT m k o -> m (AsyncStep m k o)
asyncRun = async . runMachineT

-- | Satisfy a downstream Await by blocking on an upstream step.
stepAsync :: MonadBaseControl IO m
=> (forall c. k c -> k' c)
-> AsyncStep m k a'
-> (a' -> d)
-> d
-> d
-> (AsyncStep m k a' -> d -> MachineT m k' b)
-> MachineT m k' b
stepAsync sel src f def prev go = MachineT $ wait src >>= \u -> case u of
Stop -> go' stopped def
Yield a k -> go' k (f a)
Await g kg fg -> return $ awaitStep g (sel kg) fg (MachineT . flip go' prev)
where go' k d = asyncRun k >>= runMachineT . flip go d

-- | @asyncEncased f x@ launches @x@ and provides the resulting
-- 'AsyncStep' to @f@. Turn a function on 'AsyncStep' to a funciton on
-- 'MachineT'.
asyncEncased :: MonadBaseControl IO m
=> (AsyncStep m k1 o1 -> MachineT m k o)
-> MachineT m k1 o1
-> MachineT m k o
asyncEncased f x = MachineT $ asyncRun x >>= runMachineT . f

-- | Similar to 'awaitStep', but for continuations that want their inputs
-- to be run asynchronously.
asyncAwait :: MonadBaseControl IO m
=> (a -> MachineT m k o)
-> k' a
-> MachineT m k o
-> (AsyncStep m k o -> MachineT m k1 o1)
-> m (Step k' b (MachineT m k1 o1))
asyncAwait f sel ff = return . awaitStep f sel ff . asyncEncased
132 changes: 132 additions & 0 deletions src/Data/Machine/Concurrent/Buffer.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
{-# LANGUAGE FlexibleContexts, GADTs, TupleSections #-}
-- | Place buffers between two machines. This is most useful with
-- irregular production rates.
module Data.Machine.Concurrent.Buffer (
-- * Blocking buffers
bufferConnect, buffer,
-- * Non-blocking (rolling) buffers
rollingConnect, rolling,
-- * Internal helpers
mediatedConnect, BufferRoom(..)
) where
import Control.Applicative ((<$>), (<*>))
import Control.Concurrent.Async.Lifted (wait, waitEither)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad (join, (>=>))
import Data.Machine.Concurrent.AsyncStep
import Data.Machine
import Data.Sequence (ViewL(..), (|>))
import qualified Data.Sequence as S
import Data.Traversable (traverse)

-- | Drain downstream until it awaits a value, then pass the awaiting
-- step to the given function.
drain :: (Functor m, Monad m)
=> MachineStep m k a
-> (MachineStep m k a -> m (MachineStep m k' a))
-> m (MachineStep m k' a)
drain z k = go z
where go Stop = return Stop
go (Yield o kd) = Yield o . MachineT . go <$> runMachineT kd
go aStep = k aStep

-- | Feed upstream until it yields a value, then pass the yielded
-- value and next step to the given function.
feedToBursting :: Monad m
=> MachineStep m k a
-> (Maybe (a, MachineT m k a) -> m (MachineStep m k b))
-> m (MachineStep m k b)
feedToBursting z k = go z
where go Stop = k Nothing
go (Await f kf ff) = return $
Await (\a -> go' (f a)) kf (go' ff)
go (Yield o kk) = k $ Just (o, kk)
go' step = MachineT $ runMachineT step >>= go

-- | Mediate a 'MachineT' and a 'ProcessT' with a bounded capacity
-- buffer. The source machine runs concurrently with the sink process,
-- and is only blocked when the buffer is full.
bufferConnect :: MonadBaseControl IO m
=> Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
bufferConnect n = mediatedConnect S.empty snoc view
where snoc acc x = (if S.length acc < n - 1 then Vacancy else NoVacancy) $
acc |> x
view acc = case S.viewl acc of
EmptyL -> Nothing
x :< acc' -> Just (x, acc')

-- | Mediate a 'MachineT' and a 'ProcessT' with a rolling buffer. The
-- source machine runs concurrently with the sink process and is never
-- blocked. If the sink process can not keep up with upstream, yielded
-- values will be dropped.
rollingConnect :: MonadBaseControl IO m
=> Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
rollingConnect n = mediatedConnect S.empty snoc view
where snoc acc x = Vacancy $ S.take (n-1) acc |> x
view acc = case S.viewl acc of
EmptyL -> Nothing
x :< acc' -> Just (x, acc')

-- | Eagerly request values from the wrapped machine. Values are
-- placed in a buffer of the given size. When the buffer is full
-- (i.e. downstream is running behind), we stop pumping the wrapped
-- machine.
buffer :: MonadBaseControl IO m => Int -> MachineT m k o -> MachineT m k o
buffer n src = bufferConnect n src echo

-- | Eagerly request values from the wrapped machine. Values are
-- placed in a rolling buffer of the given size. If downstream can not
-- catch up, values yielded by the wrapped machine will be dropped.
rolling :: MonadBaseControl IO m => Int -> MachineT m k o -> MachineT m k o
rolling n src = rollingConnect n src echo

-- | Indication if the payload value is "full" or not.
data BufferRoom a = NoVacancy a | Vacancy a deriving (Eq, Ord, Show)

-- | Mediate a 'MachineT' and a 'ProcessT' with a buffer.
--
-- @mediatedConnect z snoc view source sink@ pipes @source@ into
-- @sink@ through a buffer initialized to @z@ and updated with
-- @snoc@. Upstream is blocked if @snoc@ indicates that the buffer is
-- full after adding a new element. Downstream blocks if @view@
-- indicates that the buffer is empty. Otherwise, @view@ is expected
-- to return the next element to process and an updated buffer.
mediatedConnect :: MonadBaseControl IO m
=> t -> (t -> b -> BufferRoom t) -> (t -> Maybe (b,t))
-> MachineT m k b -> ProcessT m b c -> MachineT m k c
mediatedConnect z snoc view src0 snk0 =
MachineT $ do srcFuture <- asyncRun src0
snkFuture <- asyncRun snk0
go z (Just srcFuture) snkFuture
where -- Wait for the next available step
go acc src snk = maybe (Left <$> wait snk) (waitEither snk) src >>=
goStep acc . either (Right . (,src)) (Left . (,snk))
-- Kick off the next step of both the source and the sink
goAsync acc src snk =
join $ go acc <$> traverse asyncRun src <*> asyncRun snk
-- Handle whichever step is ready first
goStep acc step = case step of
-- @src@ stepped first
Left (Stop, snk) -> go acc Nothing snk
Left (Await g kg fg, snk) ->
asyncAwait g kg fg (MachineT . flip (go acc) snk . Just)
Left (Yield o k, snk) -> case snoc acc o of
-- add it to the right end of the buffer
Vacancy acc' -> asyncRun k >>= flip (go acc') snk . Just
-- buffer was full
NoVacancy acc' ->
let go' snk' = do src' <- asyncRun k
goStep acc' (Right (snk', Just src'))
in wait snk >>= flip drain go'

-- @snk@ stepped first
Right (Stop, _) -> return Stop
Right (Yield o k, src) ->
return $ Yield o (MachineT $ asyncRun k >>= go acc src)
Right (Await f Refl ff, src) ->
case view acc of
Nothing -> maybe (goAsync acc Nothing ff) (wait >=> demandSrc) src
Just (x, acc') -> asyncRun (f x) >>= go acc' src
where demandSrc = flip feedToBursting go'
go' Nothing = goAsync acc Nothing ff
go' (Just (o, k)) = goAsync acc (Just k) (f o)
Loading

0 comments on commit f003acf

Please sign in to comment.