Permalink
Browse files

Added more functionality to support advertiseSplit. This involves pro…

…viding IO actions for pushing individual messages out.
  • Loading branch information...
1 parent 696dfa9 commit acb5327a56a9436d4d99d37e6b9c3433d37d52a9 Anthony Cowley committed Apr 29, 2011
Showing with 51 additions and 1 deletion.
  1. +51 −1 Ros/RosTcp.hs
View
52 Ros/RosTcp.hs
@@ -1,7 +1,8 @@
{-# LANGUAGE ScopedTypeVariables, BangPatterns #-}
module Ros.RosTcp (subStream, runServer) where
import Control.Applicative ((<$>))
-import Control.Concurrent (forkIO, killThread)
+import Control.Arrow (first)
+import Control.Concurrent (forkIO, killThread, newEmptyMVar, takeMVar, putMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar
import Control.Monad.Reader
@@ -107,6 +108,20 @@ pubStream t clients = liftIO $ go 0 t
mapM_ (flip writeChan bytes . snd) cs
go (n+1) t'
+-- |Produce a publishing action associated with a list of
+-- clients. This is used by runServers.
+pubStreamIO :: RosBinary a => IO (TVar [(b, RingChan ByteString)] -> Config (),
+ a -> IO ())
+pubStreamIO = do m <- newEmptyMVar
+ let feed clients =
+ let go !n = do x <- takeMVar m
+ let bytes = runPut $ putMsg n x
+ cs <- readTVarIO clients
+ mapM_ (flip writeChan bytes . snd) cs
+ go (n+1)
+ in liftIO $ go 0
+ return (feed, putMVar m)
+
-- Negotiate a TCPROS subscriber connection.
negotiateSub :: Socket -> String -> String -> String -> IO ()
negotiateSub sock tname ttype md5 =
@@ -202,3 +217,38 @@ runServer :: forall a. (RosBinary a, MsgInfo a) =>
Config (Config (), Int)
runServer stream = runServerAux (mkPubNegotiator (undefined::a))
(pubStream stream)
+
+-- |The 'MsgInfo' type class dictionary made explicit to strip off the
+-- actual message type.
+data MsgInfoRcd = MsgInfoRcd { md5, typeName :: String }
+
+-- |A 'Feeder' represents a 'Topic' fully prepared to accept
+-- subscribers.
+data Feeder = Feeder MsgInfoRcd -- ^Explicit MsgInfo dictionary
+ Int -- ^Transmit buffer size
+ (URI -> Int -> IO ()) -- ^Update topic stats
+ (TVar [(Config (), RingChan ByteString)] -> Config ())
+ -- ^'pubStream' partial application
+
+-- |Prepare an action for publishing messages. Arguments are a monadic
+-- function for updating topic statistics, and a transmit buffer
+-- size. The returned 'Feeder' value may be supplied to 'runServers',
+-- while the returned 'IO' function may be used to push out new
+-- messages.
+feedTopic :: forall a. (MsgInfo a, RosBinary a) =>
+ (URI -> Int -> IO ()) -> Int -> IO (Feeder, a -> IO ())
+feedTopic updateStats bufSize =
+ do (feed,pub) <- pubStreamIO
+ let f = Feeder info bufSize updateStats feed
+ return (f, pub)
+ where info = mkInfo (undefined::a)
+ mkInfo x = MsgInfoRcd (msgTypeName x) (sourceMD5 x)
+
+-- |Publish several 'Topic's. A single cleanup action for all 'Topic's
+-- is returned, along with each 'Topic's server port in the order of
+-- the input 'Feeder's.
+runServers :: [Feeder] -> Config (Config (), [Int])
+runServers = return . first sequence_ . unzip <=< mapM feed
+ where feed (Feeder info bufSize stats push) =
+ let pub = negotiatePub (typeName info) (md5 info)
+ in runServerAux pub push stats bufSize

0 comments on commit acb5327

Please sign in to comment.