Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial commit

  • Loading branch information...
commit b34c4904f3141ebbbd77fc4e75e21324c6f9a108 0 parents
@hreinhardt authored
4 .gitignore
@@ -0,0 +1,4 @@
+dist/
+*.o
+*.hi
+testing/
27 LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) 2011, Holger Reinhardt
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+3. Neither the name of the author nor the names of his contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGE.
1,006 Network/AMQP.hs
@@ -0,0 +1,1006 @@
+{-# OPTIONS -XBangPatterns -XScopedTypeVariables -XDeriveDataTypeable #-}
+{- |
+
+A client library for AMQP servers implementing the 0-8 spec; currently only supports RabbitMQ (see <http://www.rabbitmq.com>)
+
+A good introduction to AMQP can be found here (though it uses Python): <http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/>
+
+/Example/:
+
+Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange
+
+>import Network.AMQP
+>import qualified Data.ByteString.Lazy.Char8 as BL
+>
+>main = do
+> conn <- openConnection "127.0.0.1" "/" "guest" "guest"
+> chan <- openChannel conn
+>
+> -- declare a queue, exchange and binding
+> declareQueue chan newQueue {queueName = "myQueue"}
+> declareExchange chan newExchange {exchangeName = "myExchange", exchangeType = "direct"}
+> bindQueue chan "myQueue" "myExchange" "myKey"
+>
+> -- subscribe to the queue
+> consumeMsgs chan "myQueue" Ack myCallback
+>
+> -- publish a message to our new exchange
+> publishMsg chan "myExchange" "myKey"
+> newMsg {msgBody = (BL.pack "hello world"),
+> msgDeliveryMode = Just Persistent}
+>
+> getLine -- wait for keypress
+> closeConnection conn
+> putStrLn "connection closed"
+>
+>
+>myCallback :: (Message,Envelope) -> IO ()
+>myCallback (msg, env) = do
+> putStrLn $ "received message: "++(BL.unpack $ msgBody msg)
+> -- acknowledge receiving the message
+> ackEnv env
+
+/Exception handling/:
+
+Some function calls can make the AMQP server throw an AMQP exception, which has the side-effect of closing the connection or channel. The AMQP exceptions are raised as Haskell exceptions (see 'AMQPException'). So upon receiving an 'AMQPException' you may have to reopen the channel or connection.
+
+-}
+module Network.AMQP (
+
+ -- * Connection
+ Connection,
+ openConnection,
+ openConnection',
+ closeConnection,
+ addConnectionClosedHandler,
+
+ -- * Channel
+ Channel,
+ openChannel,
+
+ -- * Exchanges
+ ExchangeOpts(..),
+ newExchange,
+ declareExchange,
+ deleteExchange,
+
+ -- * Queues
+ QueueOpts(..),
+ newQueue,
+ declareQueue,
+ bindQueue,
+ purgeQueue,
+ deleteQueue,
+
+
+ -- * Messaging
+ Message(..),
+ DeliveryMode(..),
+ newMsg,
+ Envelope(..),
+ ConsumerTag,
+ Ack(..),
+ consumeMsgs,
+ cancelConsumer,
+ publishMsg,
+ getMsg,
+ rejectMsg,
+ recoverMsgs,
+
+ ackMsg,
+ ackEnv,
+
+ -- * Transactions
+ txSelect,
+ txCommit,
+ txRollback,
+
+ -- * Flow Control
+ flow,
+
+
+ -- * Exceptions
+ AMQPException(..)
+
+
+
+
+
+) where
+
+
+
+import Data.Binary
+import Data.Binary.Get
+import Data.Binary.Put as BPut
+import Data.Typeable
+import qualified Data.Map as M
+import qualified Data.IntMap as IM
+import qualified Data.ByteString.Char8 as BS
+import qualified Data.ByteString.Lazy.Char8 as BL
+import Data.IORef
+import Data.Maybe
+import Data.Int
+
+
+import Control.Concurrent
+import Control.Monad
+import qualified Control.Exception as CE
+
+
+import Network.BSD
+import Network.Socket
+import qualified Network.Socket.ByteString as NB
+
+
+import Network.AMQP.Protocol
+import Network.AMQP.Types
+import Network.AMQP.Helpers
+import Network.AMQP.Generated
+
+
+
+{-
+TODO:
+- basic.qos
+- handle basic.return
+- connection.secure
+- connection.redirect
+-}
+
+
+
+----- EXCHANGE -----
+
+
+-- | A record that contains the fields needed when creating a new exhange using 'declareExchange'. The default values apply when you use 'newExchange'.
+data ExchangeOpts = ExchangeOpts
+ {
+ exchangeName :: String, -- ^ (must be set); the name of the exchange
+ exchangeType :: String, -- ^ (must be set); the type of the exchange (\"fanout\", \"direct\", \"topic\")
+
+ -- optional
+ exchangePassive :: Bool, -- ^ (default 'False'); If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.
+ exchangeDurable :: Bool, -- ^ (default 'True'); If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
+ exchangeAutoDelete :: Bool, -- ^ (default 'False'); If set, the exchange is deleted when all queues have finished using it.
+ exchangeInternal :: Bool -- ^ (default 'False'); If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.
+ }
+
+-- | an 'ExchangeOpts' with defaults set; you must override at least the 'exchangeName' and 'exchangeType' fields.
+newExchange :: ExchangeOpts
+newExchange = ExchangeOpts "" "" False True False False
+
+-- | declares a new exchange on the AMQP server. Can be used like this: @declareExchange channel newExchange {exchangeName = \"myExchange\", exchangeType = \"fanout\"}@
+declareExchange :: Channel -> ExchangeOpts -> IO ()
+declareExchange chan exchg = do
+ (SimpleMethod Exchange_declare_ok) <- request chan (SimpleMethod (Exchange_declare
+ 1 -- ticket; ignored by rabbitMQ
+ (ShortString $ exchangeName exchg) -- exchange
+ (ShortString $ exchangeType exchg) -- typ
+ (exchangePassive exchg) -- passive
+ (exchangeDurable exchg) -- durable
+ (exchangeAutoDelete exchg) -- auto_delete
+ (exchangeInternal exchg) -- internal
+ False -- nowait
+ (FieldTable (M.fromList [])))) -- arguments
+ return ()
+
+
+-- | deletes the exchange with the provided name
+deleteExchange :: Channel -> String -> IO ()
+deleteExchange chan exchangeName = do
+ (SimpleMethod Exchange_delete_ok) <- request chan (SimpleMethod (Exchange_delete
+ 1 -- ticket; ignored by rabbitMQ
+ (ShortString exchangeName) -- exchange
+ False -- if_unused; If set, the server will only delete the exchange if it has no queue bindings.
+ False -- nowait
+ ))
+ return ()
+
+----- QUEUE -----
+
+-- | A record that contains the fields needed when creating a new queue using 'declareQueue'. The default values apply when you use 'newQueue'.
+data QueueOpts = QueueOpts
+ {
+ --must be set
+ queueName :: String, -- ^ (default \"\"); the name of the queue; if left empty, the server will generate a new name and return it from the 'declareQueue' method
+
+ --optional
+ queuePassive :: Bool, -- ^ (default 'False'); If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state.
+ queueDurable :: Bool, -- ^ (default 'True'); If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.
+ queueExclusive :: Bool, -- ^ (default 'False'); Exclusive queues may only be consumed from by the current connection. Setting the 'exclusive' flag always implies 'auto-delete'.
+ queueAutoDelete :: Bool -- ^ (default 'False'); If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted.
+ }
+
+-- | a 'QueueOpts' with defaults set; you should override at least 'queueName'.
+newQueue :: QueueOpts
+newQueue = QueueOpts "" False True False False
+
+-- | creates a new queue on the AMQP server; can be used like this: @declareQueue channel newQueue {queueName = \"myQueue\"}@.
+--
+-- Returns a tuple @(queueName, messageCount, consumerCount)@.
+-- @queueName@ is the name of the new queue (if you don't specify a queueName the server will autogenerate one).
+-- @messageCount@ is the number of messages in the queue, which will be zero for newly-created queues. @consumerCount@ is the number of active consumers for the queue.
+declareQueue :: Channel -> QueueOpts -> IO (String, Int, Int)
+declareQueue chan queue = do
+ (SimpleMethod (Queue_declare_ok (ShortString qName) messageCount consumerCount)) <- request chan $ (SimpleMethod (Queue_declare
+ 1 -- ticket
+ (ShortString $ queueName queue)
+ (queuePassive queue)
+ (queueDurable queue)
+ (queueExclusive queue)
+ (queueAutoDelete queue)
+ False -- no-wait; true means no answer from server
+ (FieldTable (M.fromList []))))
+
+ return (qName, fromIntegral messageCount, fromIntegral consumerCount)
+
+-- | @bindQueue chan queueName exchangeName routingKey@ binds the queue to the exchange using the provided routing key
+bindQueue :: Channel -> String -> String -> String -> IO ()
+bindQueue chan queueName exchangeName routingKey = do
+ (SimpleMethod Queue_bind_ok) <- request chan (SimpleMethod (Queue_bind
+ 1 -- ticket; ignored by rabbitMQ
+ (ShortString queueName)
+ (ShortString exchangeName)
+ (ShortString routingKey)
+ False -- nowait
+ (FieldTable (M.fromList [])))) -- arguments
+
+ return ()
+
+-- | remove all messages from the queue; returns the number of messages that were in the queue
+purgeQueue :: Channel -> String -> IO Word32
+purgeQueue chan queueName = do
+ (SimpleMethod (Queue_purge_ok msgCount)) <- request chan $ (SimpleMethod (Queue_purge
+ 1 -- ticket
+ (ShortString queueName) -- queue
+ False -- nowait
+ ))
+ return msgCount
+
+-- | deletes the queue; returns the number of messages that were in the queue before deletion
+deleteQueue :: Channel -> String -> IO Word32
+deleteQueue chan queueName = do
+ (SimpleMethod (Queue_delete_ok msgCount)) <- request chan $ (SimpleMethod (Queue_delete
+ 1 -- ticket
+ (ShortString queueName) -- queue
+ False -- if_unused
+ False -- if_empty
+ False -- nowait
+ ))
+
+ return msgCount
+
+
+----- MSG (the BASIC class in AMQP) -----
+
+type ConsumerTag = String
+
+-- | specifies whether you have to acknowledge messages that you receive from 'consumeMsgs' or 'getMsg'. If you use 'Ack', you have to call 'ackMsg' or 'ackEnv' after you have processed a message, otherwise it might be delivered again in the future
+data Ack = Ack | NoAck
+
+ackToBool :: Ack -> Bool
+ackToBool Ack = False
+ackToBool NoAck = True
+
+-- | @consumeMsgs chan queueName ack callback@ subscribes to the given queue and returns a consumerTag. For any incoming message, the callback will be run. If @ack == 'Ack'@ you will have to acknowledge all incoming messages (see 'ackMsg' and 'ackEnv')
+--
+-- NOTE: The callback will be run on the same thread as the channel thread (every channel spawns its own thread to listen for incoming data) so DO NOT perform any request on @chan@ inside the callback (however, you CAN perform requests on other open channels inside the callback, though I wouldn't recommend it).
+-- Functions that can safely be called on @chan@ are 'ackMsg', 'ackEnv', 'rejectMsg', 'recoverMsgs'. If you want to perform anything more complex, it's a good idea to wrap it inside 'forkIO'.
+consumeMsgs :: Channel -> String -> Ack -> ((Message,Envelope) -> IO ()) -> IO ConsumerTag
+consumeMsgs chan queueName ack callback = do
+ --generate a new consumer tag
+ newConsumerTag <- (liftM show) $ modifyMVar (lastConsumerTag chan) $ \c -> return (c+1,c+1)
+
+ --register the consumer
+ modifyMVar_ (consumers chan) $ \c -> return $ M.insert newConsumerTag callback c
+
+ writeAssembly chan (SimpleMethod $ Basic_consume
+ 1 -- ticket
+ (ShortString queueName) -- queue
+ (ShortString newConsumerTag) -- consumer_tag
+ False -- no_local; If the no-local field is set the server will not send messages to the client that published them.
+ (ackToBool ack) -- no_ack
+ False -- exclusive; Request exclusive consumer access, meaning only this consumer can access the queue.
+ True -- nowait
+ )
+ return newConsumerTag
+
+-- | stops a consumer that was started with 'consumeMsgs'
+cancelConsumer :: Channel -> ConsumerTag -> IO ()
+cancelConsumer chan consumerTag = do
+ --unregister the consumer
+ modifyMVar_ (consumers chan) $ \c -> return $ M.delete consumerTag c
+
+ (SimpleMethod (Basic_cancel_ok consumerTag')) <- request chan $ (SimpleMethod (Basic_cancel
+ (ShortString consumerTag) -- consumer_tag
+ False -- nowait
+ ))
+
+ return ()
+
+-- | @publishMsg chan exchangeName routingKey msg@ publishes @msg@ to the exchange with the provided @exchangeName@. The effect of @routingKey@ depends on the type of the exchange
+--
+-- NOTE: This method may temporarily block if the AMQP server requested us to stop sending content data (using the flow control mechanism). So don't rely on this method returning immediately
+publishMsg :: Channel -> String -> String -> Message -> IO ()
+publishMsg chan exchangeName routingKey msg = do
+ writeAssembly chan (ContentMethod (Basic_publish
+ 1 -- ticket; ignored by rabbitMQ
+ (ShortString exchangeName)
+ (ShortString routingKey)
+ False -- mandatory; if true, the server might return the msg, which is currently not handled
+ False) --immediate; if true, the server might return the msg, which is currently not handled
+
+ --TODO: add more of these to 'Message'
+ (CHBasic
+ (fmap ShortString $ msgContentType msg)
+ Nothing
+ Nothing
+ (fmap deliveryModeToInt $ msgDeliveryMode msg) -- delivery_mode
+ Nothing
+ Nothing
+ (fmap ShortString $ msgReplyTo msg)
+ Nothing
+ (fmap ShortString $ msgID msg)
+ (msgTimestamp msg)
+ Nothing
+ Nothing
+ Nothing
+ Nothing
+ )
+
+ (msgBody msg))
+ return ()
+
+
+-- | @getMsg chan ack queueName@ gets a message from the specified queue. If @ack=='Ack'@, you have to call 'ackMsg' or 'ackEnv' for any message that you get, otherwise it might be delivered again in the future (by calling 'recoverMsgs')
+getMsg :: Channel -> Ack -> String -> IO (Maybe (Message, Envelope))
+getMsg chan ack queueName = do
+ ret <- request chan (SimpleMethod (Basic_get
+ 1 -- ticket
+ (ShortString queueName) -- queue
+ (ackToBool ack) -- no_ack
+ ))
+
+ case ret of
+ ContentMethod (Basic_get_ok deliveryTag redelivered (ShortString exchangeName) (ShortString routingKey) msgCount) properties msgBody ->
+ return $ Just $ (msgFromContentHeaderProperties properties msgBody,
+ Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered,
+ envExchangeName = exchangeName, envRoutingKey = routingKey, envChannel = chan})
+ _ -> return Nothing
+
+
+{- | @ackMsg chan deliveryTag multiple@ acknowledges one or more messages.
+
+if @multiple==True@, the @deliverTag@ is treated as \"up to and including\", so that the client can acknowledge multiple messages with a single method call. If @multiple==False@, @deliveryTag@ refers to a single message.
+
+If @multiple==True@, and @deliveryTag==0@, tells the server to acknowledge all outstanding mesages.
+-}
+ackMsg :: Channel -> LongLongInt -> Bool -> IO ()
+ackMsg chan deliveryTag multiple =
+ writeAssembly chan $ (SimpleMethod (Basic_ack
+ deliveryTag -- delivery_tag
+ multiple -- multiple
+ ))
+
+-- | Acknowledges a single message. This is a wrapper for 'ackMsg' in case you have the 'Envelope' at hand.
+ackEnv :: Envelope -> IO ()
+ackEnv env = ackMsg (envChannel env) (envDeliveryTag env) False
+
+-- | @rejectMsg chan deliveryTag requeue@ allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. If @requeue==False@, the message will be discarded. If it is 'True', the server will attempt to requeue the message.
+--
+-- NOTE: RabbitMQ 1.7 doesn't implement this command
+rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()
+rejectMsg chan deliveryTag requeue =
+ writeAssembly chan $ (SimpleMethod (Basic_reject
+ deliveryTag -- delivery_tag
+ requeue -- requeue
+ ))
+
+-- | @recoverMsgs chan requeue@ asks the broker to redeliver all messages that were received but not acknowledged on the specified channel.
+--If @requeue==False@, the message will be redelivered to the original recipient. If @requeue==True@, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.
+recoverMsgs :: Channel -> Bool -> IO ()
+recoverMsgs chan requeue =
+ writeAssembly chan $ (SimpleMethod (Basic_recover
+ requeue -- requeue
+ ))
+
+
+
+------------------- TRANSACTIONS (TX) --------------------------
+
+-- | This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.
+txSelect :: Channel -> IO ()
+txSelect chan = do
+ (SimpleMethod Tx_select_ok) <- request chan $ SimpleMethod Tx_select
+ return ()
+
+-- | This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.
+txCommit :: Channel -> IO ()
+txCommit chan = do
+ (SimpleMethod Tx_commit_ok) <- request chan $ SimpleMethod Tx_commit
+ return ()
+
+-- | This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.
+txRollback :: Channel -> IO ()
+txRollback chan = do
+ (SimpleMethod Tx_rollback_ok) <- request chan $ SimpleMethod Tx_rollback
+ return ()
+
+
+--------------------- FLOW CONTROL ------------------------
+
+
+{- | @flow chan active@ tells the AMQP server to pause or restart the flow of content
+ data. This is a simple flow-control mechanism that a peer can use
+ to avoid overflowing its queues or otherwise finding itself receiving
+ more messages than it can process.
+
+ If @active==True@ the server will start sending content data, if @active==False@ the server will stop sending content data.
+
+ A new channel is always active by default.
+
+ NOTE: RabbitMQ 1.7 doesn't implement this command.
+ -}
+flow :: Channel -> Bool -> IO ()
+flow chan active = do
+ (SimpleMethod (Channel_flow_ok _)) <- request chan $ SimpleMethod (Channel_flow active)
+ return ()
+
+
+
+
+-------------------------- MESSAGE / ENVELOPE ------------------
+
+-- | contains meta-information of a delivered message (through 'getMsg' or 'consumeMsgs')
+data Envelope = Envelope
+ {
+ envDeliveryTag :: LongLongInt,
+ envRedelivered :: Bool,
+ envExchangeName :: String,
+ envRoutingKey :: String,
+ envChannel :: Channel
+ }
+
+
+data DeliveryMode = Persistent -- ^ the message will survive server restarts (if the queue is durable)
+ | NonPersistent -- ^ the message may be lost after server restarts
+ deriving Show
+
+deliveryModeToInt NonPersistent = 1
+deliveryModeToInt Persistent = 2
+
+intToDeliveryMode 1 = NonPersistent
+intToDeliveryMode 2 = Persistent
+
+
+-- | An AMQP message
+data Message = Message {
+ msgBody :: BL.ByteString, -- ^ the content of your message
+ msgDeliveryMode :: Maybe DeliveryMode, -- ^ see 'DeliveryMode'
+ msgTimestamp :: Maybe Timestamp, -- ^ use in any way you like; this doesn't affect the way the message is handled
+ msgID :: Maybe String, -- ^ use in any way you like; this doesn't affect the way the message is handled
+ msgContentType :: Maybe String,
+ msgReplyTo :: Maybe String
+ }
+ deriving Show
+
+-- | a 'Msg' with defaults set; you should override at least 'msgBody'
+newMsg :: Message
+newMsg = Message (BL.empty) Nothing Nothing Nothing Nothing Nothing
+
+------------- ASSEMBLY -------------------------
+-- an assembly is a higher-level object consisting of several frames (like in amqp 0-10)
+data Assembly = SimpleMethod MethodPayload
+ | ContentMethod MethodPayload ContentHeaderProperties BL.ByteString --method, properties, content-data
+ deriving Show
+
+-- | reads all frames necessary to build an assembly
+readAssembly :: Chan FramePayload -> IO Assembly
+readAssembly chan = do
+ m <- readChan chan
+ case m of
+ MethodPayload p -> --got a method frame
+ if hasContent m
+ then do
+ --several frames containing the content will follow, so read them
+ (props, msg) <- collectContent chan
+ return $ ContentMethod p props msg
+ else do
+ return $ SimpleMethod p
+ x -> error $ "didn't expect frame: "++(show x)
+
+-- | reads a contentheader and contentbodies and assembles them
+collectContent :: Chan FramePayload -> IO (ContentHeaderProperties, BL.ByteString)
+collectContent chan = do
+ (ContentHeaderPayload _ _ bodySize props) <- readChan chan
+
+ content <- collect $ fromIntegral bodySize
+ return (props, BL.concat content)
+ where
+ collect x | x <= 0 = return []
+ collect rem = do
+ (ContentBodyPayload payload) <- readChan chan
+ r <- collect (rem - (BL.length payload))
+ return $ payload : r
+
+
+
+------------ CONNECTION -------------------
+
+{- general concept:
+Each connection has its own thread. Each channel has its own thread.
+Connection reads data from socket and forwards it to channel. Channel processes data and forwards it to application.
+Outgoing data is written directly onto the socket.
+
+Incoming Data: Socket -> Connection-Thread -> Channel-Thread -> Application
+Outgoing Data: Application -> Socket
+-}
+
+data Connection = Connection {
+ connSocket :: Socket,
+ connChannels :: (MVar (IM.IntMap (Channel, ThreadId))), --open channels (channelID => (Channel, ChannelThread))
+ connMaxFrameSize :: Int, --negotiated maximum frame size
+ connClosed :: MVar (Maybe String),
+ connClosedLock :: MVar (), -- used by closeConnection to block until connection-close handshake is complete
+ connWriteLock :: MVar (), -- to ensure atomic writes to the socket
+ connClosedHandlers :: MVar [IO ()],
+ lastChannelID :: MVar Int --for auto-incrementing the channelIDs
+ }
+
+
+-- | reads incoming frames from socket and forwards them to the opened channels
+connectionReceiver :: Connection -> IO ()
+connectionReceiver conn = do
+ (Frame chanID payload) <- readFrameSock (connSocket conn) (connMaxFrameSize conn)
+ forwardToChannel chanID payload
+ connectionReceiver conn
+ where
+
+ forwardToChannel 0 (MethodPayload Connection_close_ok) = do
+ modifyMVar_ (connClosed conn) $ \x -> return $ Just "closed by user"
+ killThread =<< myThreadId
+
+
+ forwardToChannel 0 (MethodPayload (Connection_close _ (ShortString errorMsg) _ _ )) = do
+ modifyMVar_ (connClosed conn) $ \x -> return $ Just errorMsg
+
+ killThread =<< myThreadId
+
+ forwardToChannel 0 payload = print $ "Got unexpected msg on channel zero: "++(show payload)
+
+ forwardToChannel chanID payload = do
+ --got asynchronous msg => forward to registered channel
+ withMVar (connChannels conn) $ \cs -> do
+ case IM.lookup (fromIntegral chanID) cs of
+ Just c -> writeChan (inQueue $ fst c) payload
+ Nothing -> print $ "ERROR: channel not open "++(show chanID)
+
+
+-- | @openConnection hostname virtualHost loginName loginPassword@ opens a connection to an AMQP server running on @hostname@.
+-- @virtualHost@ is used as a namespace for AMQP resources (default is \"/\"), so different applications could use multiple virtual hosts on the same AMQP server
+--
+-- NOTE: If the login name, password or virtual host are invalid, this method will throw a 'ConnectionClosedException'. The exception will not contain a reason why the connection was closed, so you'll have to find out yourself.
+openConnection :: String -> String -> String -> String -> IO Connection
+openConnection host vhost loginName loginPassword =
+ openConnection' host 5672 vhost loginName loginPassword
+
+-- | same as 'openConnection' but allows you to specify a non-default port-number as the 2nd parameter
+openConnection' :: String -> PortNumber -> String -> String -> String -> IO Connection
+openConnection' host port vhost loginName loginPassword = do
+ proto <- getProtocolNumber "tcp"
+ sock <- socket AF_INET Stream proto
+ addr <- inet_addr host
+ connect sock (SockAddrInet port addr)
+ NB.send sock $ toStrict $ BPut.runPut $ do
+ BPut.putByteString $ BS.pack "AMQP"
+ BPut.putWord8 1
+ BPut.putWord8 1 --TCP/IP
+ BPut.putWord8 9 --Major Version
+ BPut.putWord8 1 --Minor Version
+
+
+ -- S: connection.start
+ Frame 0 (MethodPayload (Connection_start version_major version_minor server_properties mechanisms locales)) <- readFrameSock sock 4096
+
+ -- C: start_ok
+ writeFrameSock sock start_ok
+ -- S: tune
+ Frame 0 (MethodPayload (Connection_tune channel_max frame_max heartbeat)) <- readFrameSock sock 4096
+ -- C: tune_ok
+ let maxFrameSize = (min 131072 frame_max)
+
+ writeFrameSock sock (Frame 0 (MethodPayload
+ --TODO: handle channel_max
+ (Connection_tune_ok 0 maxFrameSize 0)
+ ))
+ -- C: open
+ writeFrameSock sock open
+
+ -- S: open_ok
+ Frame 0 (MethodPayload (Connection_open_ok _)) <- readFrameSock sock $ fromIntegral maxFrameSize
+
+ -- Connection established!
+
+ --build Connection object
+ connChannels <- newMVar IM.empty
+ lastChanID <- newMVar 0
+ cClosed <- newMVar Nothing
+ writeLock <- newMVar ()
+ ccl <- newEmptyMVar
+ connClosedHandlers <- newMVar []
+ let conn = Connection sock connChannels (fromIntegral maxFrameSize) cClosed ccl writeLock connClosedHandlers lastChanID
+
+ --spawn the connectionReceiver
+ forkIO $ CE.finally (connectionReceiver conn)
+ (do
+ -- try closing socket
+ CE.catch (sClose sock) (\(e::CE.SomeException) -> return ())
+
+ -- mark as closed
+ modifyMVar_ cClosed $ \x -> return $ Just $ maybe "closed" id x
+
+ --kill all channel-threads
+ withMVar connChannels $ \cc -> mapM_ (\c -> killThread $ snd c) $ IM.elems cc
+ withMVar connChannels $ \cc -> return $ IM.empty
+
+ -- mark connection as closed, so all pending calls to 'closeConnection' can now return
+ tryPutMVar ccl ()
+
+ -- notify connection-close-handlers
+ withMVar connClosedHandlers sequence
+
+ )
+
+ return conn
+
+ where
+ start_ok = (Frame 0 (MethodPayload (Connection_start_ok (FieldTable (M.fromList []))
+ (ShortString "AMQPLAIN")
+ --login has to be a table without first 4 bytes
+ (LongString (drop 4 $ BL.unpack $ runPut $ put $ FieldTable (M.fromList [(ShortString "LOGIN",FVLongString $ LongString loginName), (ShortString "PASSWORD", FVLongString $ LongString loginPassword)])))
+ (ShortString "en_US")) ))
+ open = (Frame 0 (MethodPayload (Connection_open
+ (ShortString vhost) --virtual host
+ (ShortString "") -- capabilities
+ True))) --insist; True because we don't support redirect yet
+
+
+
+
+-- | closes a connection
+closeConnection :: Connection -> IO ()
+closeConnection c = do
+ CE.catch (
+ withMVar (connWriteLock c) $ \_ -> writeFrameSock (connSocket c) $ (Frame 0 (MethodPayload (Connection_close
+ --TODO: set these values
+ 0 -- reply_code
+ (ShortString "") -- reply_text
+ 0 -- class_id
+ 0 -- method_id
+ )))
+ )
+ (\ (e::CE.IOException) -> do
+ --do nothing if connection is already closed
+ return ()
+ )
+
+ -- wait for connection_close_ok by the server; this MVar gets filled in the CE.finally handler in openConnection'
+ readMVar $ connClosedLock c
+ return ()
+
+
+-- | @addConnectionClosedHandler conn ifClosed handler@ adds a @handler@ that will be called after the connection is closed (either by calling @closeConnection@ or by an exception). If the @ifClosed@ parameter is True and the connection is already closed, the handler will be called immediately. If @ifClosed == False@ and the connection is already closed, the handler will never be called
+addConnectionClosedHandler :: Connection -> Bool -> IO () -> IO ()
+addConnectionClosedHandler conn ifClosed handler = do
+ withMVar (connClosed conn) $ \cc -> do
+ case cc of
+ -- connection is already closed, so call the handler directly
+ Just _ | ifClosed == True -> handler
+
+ -- otherwise add it to the list
+ _ -> modifyMVar_ (connClosedHandlers conn) $ \old -> return $ handler:old
+
+
+
+
+
+readFrameSock :: Socket -> Int -> IO Frame
+readFrameSock sock maxFrameSize = do
+ dat <- recvExact 7
+ let len = fromIntegral $ peekFrameSize dat
+ dat' <- recvExact (len+1) -- +1 for the terminating 0xCE
+ let (frame, rest, consumedBytes) = runGetState get (BL.append dat dat') 0
+
+
+ if consumedBytes /= fromIntegral (len+8)
+ then error $ "readFrameSock: parser should read "++show (len+8)++" bytes; but read "++show consumedBytes
+ else return ()
+ return frame
+
+ where
+ recvExact bytes = do
+ b <- recvExact' bytes $ BL.empty
+ if BL.length b /= fromIntegral bytes
+ then error $ "recvExact wanted "++show bytes++" bytes; got "++show (fromIntegral $ BL.length b)++" bytes"
+ else return b
+ recvExact' bytes buf = do
+ dat <- NB.recv sock bytes
+ let len = BS.length dat
+ if len == 0
+ then CE.throwIO $ ConnectionClosedException "recv returned 0 bytes"
+ else do
+ let buf' = BL.append buf (toLazy dat)
+ if len >= bytes
+ then return buf'
+ else recvExact' (bytes-len) buf'
+
+
+
+
+
+writeFrameSock :: Socket -> Frame -> IO ()
+writeFrameSock sock x = do
+ NB.send sock $ toStrict $ runPut $ put x
+ return ()
+
+
+
+------------------------ CHANNEL -----------------------------
+
+{- | A connection to an AMQP server is made up of separate channels. It is recommended to use a separate channel for each thread in your application that talks to the AMQP server (but you don't have to as channels are thread-safe)
+-}
+data Channel = Channel {
+ connection :: Connection,
+ inQueue :: Chan FramePayload, --incoming frames (from Connection)
+ outstandingResponses :: Chan (MVar Assembly), -- for every request an MVar is stored here waiting for the response
+ channelID :: Word16,
+ lastConsumerTag :: MVar Int,
+
+ chanActive :: Lock, -- used for flow-control. if lock is closed, no content methods will be sent
+ chanClosed :: MVar (Maybe String),
+ consumers :: MVar (M.Map String ((Message, Envelope) -> IO ())) -- who is consumer of a queue? (consumerTag => callback)
+ }
+
+
+msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message
+msgFromContentHeaderProperties
+ (CHBasic content_type content_encoding headers delivery_mode priority correlation_id reply_to expiration
+ message_id timestamp typ user_id app_id cluster_id) msgBody =
+ let msgId = fromShortString message_id
+ contentType = fromShortString content_type
+ replyTo = fromShortString reply_to
+
+ in
+ Message msgBody (fmap intToDeliveryMode delivery_mode) timestamp msgId contentType replyTo
+ where
+ fromShortString (Just (ShortString s)) = Just s
+ fromShortString _ = Nothing
+
+-- | The thread that is run for every channel
+channelReceiver :: Channel -> IO ()
+channelReceiver chan = do
+ --read incoming frames; they are put there by a Connection thread
+ p <- readAssembly $ inQueue chan
+
+ if isResponse p
+ then do
+ emp <- isEmptyChan $ outstandingResponses chan
+ if emp
+ then CE.throwIO $ userError "got response, but have no corresponding request"
+ else do
+ x <- readChan (outstandingResponses chan)
+ putMVar x p
+
+ --handle asynchronous assemblies
+ else handleAsync p
+
+ channelReceiver chan
+ where
+ isResponse :: Assembly -> Bool
+ isResponse (ContentMethod (Basic_deliver _ _ _ _ _) _ _) = False
+ isResponse (ContentMethod (Basic_return _ _ _ _) _ _) = False
+ isResponse (SimpleMethod (Channel_flow _)) = False
+ isResponse (SimpleMethod (Channel_close _ _ _ _)) = False
+ isResponse _ = True
+
+ --Basic.Deliver: forward msg to registered consumer
+ handleAsync (ContentMethod (Basic_deliver (ShortString consumerTag) deliveryTag redelivered (ShortString exchangeName)
+ (ShortString routingKey))
+ properties msgBody) =
+ withMVar (consumers chan) (\s -> do
+ let subscriber = fromJust $ M.lookup consumerTag s
+ let msg = msgFromContentHeaderProperties properties msgBody
+ let env = Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered,
+ envExchangeName = exchangeName, envRoutingKey = routingKey, envChannel = chan}
+
+ subscriber (msg, env)
+ )
+
+ handleAsync (SimpleMethod (Channel_close errorNum (ShortString errorMsg) _ _)) = do
+
+ modifyMVar_ (chanClosed chan) $ \x -> return $ Just errorMsg
+ closeChannel' chan
+ killThread =<< myThreadId
+
+ handleAsync (SimpleMethod (Channel_flow active)) = do
+ if active
+ then openLock $ chanActive chan
+ else closeLock $ chanActive chan
+ -- in theory we should respond with flow_ok but rabbitMQ 1.7 ignores that, so it doesn't matter
+ return ()
+
+
+ --Basic.return
+ handleAsync (ContentMethod (Basic_return replyCode replyText exchange routingKey) properties msgData) =
+ --TODO: implement handling
+ -- this won't be called currently, because publishMsg sets "mandatory" and "immediate" to false
+ print "BASIC.RETURN not implemented"
+
+
+
+
+-- closes the channel internally; but doesn't tell the server
+closeChannel' c = do
+ modifyMVar_ (connChannels $ connection c) $ \old -> return $ IM.delete (fromIntegral $ channelID c) old
+ -- mark channel as closed
+ modifyMVar_ (chanClosed c) $ \x -> do
+ killLock $ chanActive c
+ killOutstandingResponses $ outstandingResponses c
+ return $ Just $ maybe "closed" id x
+ where
+ killOutstandingResponses :: Chan (MVar a) -> IO ()
+ killOutstandingResponses chan = do
+ emp <- isEmptyChan chan
+ if emp
+ then return ()
+ else do
+ x <- readChan chan
+ tryPutMVar x $ error "channel closed"
+ killOutstandingResponses chan
+
+
+
+-- | opens a new channel on the connection
+--
+-- There's currently no closeChannel method, but you can always just close the connection (the maximum number of channels is 65535).
+openChannel :: Connection -> IO Channel
+openChannel c = do
+ newInQueue <- newChan
+ outRes <- newChan
+ lastConsumerTag <- newMVar 0
+ ca <- newLock
+
+ chanClosed <- newMVar Nothing
+ consumers <- newMVar M.empty
+
+ --get a new unused channelID
+ newChannelID <- modifyMVar (lastChannelID c) $ \x -> return (x+1,x+1)
+
+ let newChannel = Channel c newInQueue outRes (fromIntegral newChannelID) lastConsumerTag ca chanClosed consumers
+
+
+ thrID <- forkIO $ CE.finally (channelReceiver newChannel)
+ (closeChannel' newChannel)
+
+ --add new channel to connection's channel map
+ modifyMVar_ (connChannels c) (\oldMap -> return $ IM.insert newChannelID (newChannel, thrID) oldMap)
+
+ (SimpleMethod Channel_open_ok) <- request newChannel (SimpleMethod (Channel_open (ShortString "")))
+ return newChannel
+
+
+
+
+-- | writes multiple frames to the channel atomically
+writeFrames :: Channel -> [FramePayload] -> IO ()
+writeFrames chan payloads =
+ let conn = connection chan in
+ withMVar (connChannels conn) $ \chans ->
+ if IM.member (fromIntegral $ channelID chan) chans
+ then
+ CE.catch
+ -- ensure at most one thread is writing to the socket at any time
+ (withMVar (connWriteLock conn) $ \_ ->
+ mapM_ (\payload -> writeFrameSock (connSocket conn) (Frame (channelID chan) payload)) payloads)
+ ( \(e :: CE.IOException) -> do
+ CE.throwIO $ userError "connection not open"
+ )
+ else do
+ CE.throwIO $ userError "channel not open"
+
+
+
+
+
+writeAssembly' :: Channel -> Assembly -> IO ()
+writeAssembly' chan (ContentMethod m properties msg) = do
+ -- wait iff the AMQP server instructed us to withhold sending content data (flow control)
+ waitLock $ chanActive chan
+
+ let !toWrite =
+ [(MethodPayload m),
+ (ContentHeaderPayload
+ (getClassIDOf properties) --classID
+ 0 --weight is deprecated in AMQP 0-9
+ (fromIntegral $ BL.length msg) --bodySize
+ properties)] ++
+ (if BL.length msg > 0
+ then do
+ --split into frames of maxFrameSize
+ map ContentBodyPayload
+ (splitLen msg (fromIntegral $ connMaxFrameSize $ connection chan))
+ else []
+ )
+ writeFrames chan toWrite
+
+ where
+ splitLen str len | BL.length str > len = (BL.take len str):(splitLen (BL.drop len str) len)
+ splitLen str _ = [str]
+
+writeAssembly' chan (SimpleMethod m) = do
+ writeFrames chan [MethodPayload m]
+
+
+
+-- most exported functions in this module will use either 'writeAssembly' or 'request' to talk to the server
+-- so we perform the exception handling here
+
+
+-- | writes an assembly to the channel
+writeAssembly :: Channel -> Assembly -> IO ()
+writeAssembly chan m =
+ CE.catches
+ (writeAssembly' chan m)
+
+ [CE.Handler (\ (ex :: AMQPException) -> throwMostRelevantAMQPException chan),
+ CE.Handler (\ (ex :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
+ CE.Handler (\ (ex :: CE.IOException) -> throwMostRelevantAMQPException chan)]
+
+
+
+
+-- | sends an assembly and receives the response
+request :: Channel -> Assembly -> IO Assembly
+request chan m = do
+ res <- newEmptyMVar
+ CE.catches (do
+ withMVar (chanClosed chan) $ \cc -> do
+ if isNothing cc
+ then do
+ writeChan (outstandingResponses chan) res
+ writeAssembly' chan m
+ else CE.throwIO $ userError "closed"
+
+ -- res might contain an exception, so evaluate it here
+ !r <- takeMVar res
+ return r
+ )
+ [CE.Handler (\ (ex :: AMQPException) -> throwMostRelevantAMQPException chan),
+ CE.Handler (\ (ex :: CE.ErrorCall) -> throwMostRelevantAMQPException chan),
+ CE.Handler (\ (ex :: CE.IOException) -> throwMostRelevantAMQPException chan)]
+
+-- this throws an AMQPException based on the status of the connection and the channel
+-- if both connection and channel are closed, it will throw a ConnectionClosedException
+throwMostRelevantAMQPException chan = do
+ cc <- readMVar $ connClosed $ connection chan
+ case cc of
+ Just r -> CE.throwIO $ ConnectionClosedException r
+ Nothing -> do
+ chc <- readMVar $ chanClosed chan
+ case chc of
+ Just r -> CE.throwIO $ ChannelClosedException r
+ Nothing -> CE.throwIO $ ConnectionClosedException "unknown reason"
+
+
+
+
+----------------------------- EXCEPTIONS ---------------------------
+
+data AMQPException =
+ -- | the 'String' contains the reason why the channel was closed
+ ChannelClosedException String
+ | ConnectionClosedException String -- ^ String may contain a reason
+ deriving (Typeable, Show, Ord, Eq)
+
+instance CE.Exception AMQPException
724 Network/AMQP/Generated.hs
@@ -0,0 +1,724 @@
+module Network.AMQP.Generated where
+
+import Network.AMQP.Types
+import Data.Maybe
+import Data.Binary
+import Data.Binary.Get
+import Data.Binary.Put
+import Data.Bits
+
+getContentHeaderProperties 10 = getPropBits 0 >>= \[] -> return CHConnection
+getContentHeaderProperties 20 = getPropBits 0 >>= \[] -> return CHChannel
+getContentHeaderProperties 30 = getPropBits 0 >>= \[] -> return CHAccess
+getContentHeaderProperties 40 = getPropBits 0 >>= \[] -> return CHExchange
+getContentHeaderProperties 50 = getPropBits 0 >>= \[] -> return CHQueue
+getContentHeaderProperties 60 = getPropBits 14 >>= \[a,b,c,d,e,f,g,h,i,j,k,l,m,n] -> condGet a >>= \a' -> condGet b >>= \b' -> condGet c >>= \c' -> condGet d >>= \d' -> condGet e >>= \e' -> condGet f >>= \f' -> condGet g >>= \g' -> condGet h >>= \h' -> condGet i >>= \i' -> condGet j >>= \j' -> condGet k >>= \k' -> condGet l >>= \l' -> condGet m >>= \m' -> condGet n >>= \n' -> return (CHBasic a' b' c' d' e' f' g' h' i' j' k' l' m' n' )
+getContentHeaderProperties 70 = getPropBits 9 >>= \[a,b,c,d,e,f,g,h,i] -> condGet a >>= \a' -> condGet b >>= \b' -> condGet c >>= \c' -> condGet d >>= \d' -> condGet e >>= \e' -> condGet f >>= \f' -> condGet g >>= \g' -> condGet h >>= \h' -> condGet i >>= \i' -> return (CHFile a' b' c' d' e' f' g' h' i' )
+getContentHeaderProperties 80 = getPropBits 5 >>= \[a,b,c,d,e] -> condGet a >>= \a' -> condGet b >>= \b' -> condGet c >>= \c' -> condGet d >>= \d' -> condGet e >>= \e' -> return (CHStream a' b' c' d' e' )
+getContentHeaderProperties 90 = getPropBits 0 >>= \[] -> return CHTx
+getContentHeaderProperties 100 = getPropBits 0 >>= \[] -> return CHDtx
+getContentHeaderProperties 110 = getPropBits 5 >>= \[a,b,c,d,e] -> condGet a >>= \a' -> condGet b >>= \b' -> condGet c >>= \c' -> condGet d >>= \d' -> condGet e >>= \e' -> return (CHTunnel a' b' c' d' e' )
+getContentHeaderProperties 120 = getPropBits 0 >>= \[] -> return CHTest
+
+putContentHeaderProperties CHConnection = putPropBits []
+putContentHeaderProperties CHChannel = putPropBits []
+putContentHeaderProperties CHAccess = putPropBits []
+putContentHeaderProperties CHExchange = putPropBits []
+putContentHeaderProperties CHQueue = putPropBits []
+putContentHeaderProperties (CHBasic a b c d e f g h i j k l m n) = putPropBits [isJust a,isJust b,isJust c,isJust d,isJust e,isJust f,isJust g,isJust h,isJust i,isJust j,isJust k,isJust l,isJust m,isJust n] >> condPut a >> condPut b >> condPut c >> condPut d >> condPut e >> condPut f >> condPut g >> condPut h >> condPut i >> condPut j >> condPut k >> condPut l >> condPut m >> condPut n
+putContentHeaderProperties (CHFile a b c d e f g h i) = putPropBits [isJust a,isJust b,isJust c,isJust d,isJust e,isJust f,isJust g,isJust h,isJust i] >> condPut a >> condPut b >> condPut c >> condPut d >> condPut e >> condPut f >> condPut g >> condPut h >> condPut i
+putContentHeaderProperties (CHStream a b c d e) = putPropBits [isJust a,isJust b,isJust c,isJust d,isJust e] >> condPut a >> condPut b >> condPut c >> condPut d >> condPut e
+putContentHeaderProperties CHTx = putPropBits []
+putContentHeaderProperties CHDtx = putPropBits []
+putContentHeaderProperties (CHTunnel a b c d e) = putPropBits [isJust a,isJust b,isJust c,isJust d,isJust e] >> condPut a >> condPut b >> condPut c >> condPut d >> condPut e
+putContentHeaderProperties CHTest = putPropBits []
+
+getClassIDOf (CHConnection) = 10
+getClassIDOf (CHChannel) = 20
+getClassIDOf (CHAccess) = 30
+getClassIDOf (CHExchange) = 40
+getClassIDOf (CHQueue) = 50
+getClassIDOf (CHBasic _ _ _ _ _ _ _ _ _ _ _ _ _ _) = 60
+getClassIDOf (CHFile _ _ _ _ _ _ _ _ _) = 70
+getClassIDOf (CHStream _ _ _ _ _) = 80
+getClassIDOf (CHTx) = 90
+getClassIDOf (CHDtx) = 100
+getClassIDOf (CHTunnel _ _ _ _ _) = 110
+getClassIDOf (CHTest) = 120
+
+data ContentHeaderProperties =
+ CHConnection
+
+ |CHChannel
+
+ |CHAccess
+
+ |CHExchange
+
+ |CHQueue
+
+ |CHBasic
+ (Maybe ShortString) -- content_type
+ (Maybe ShortString) -- content_encoding
+ (Maybe FieldTable) -- headers
+ (Maybe Octet) -- delivery_mode
+ (Maybe Octet) -- priority
+ (Maybe ShortString) -- correlation_id
+ (Maybe ShortString) -- reply_to
+ (Maybe ShortString) -- expiration
+ (Maybe ShortString) -- message_id
+ (Maybe Timestamp) -- timestamp
+ (Maybe ShortString) -- typ
+ (Maybe ShortString) -- user_id
+ (Maybe ShortString) -- app_id
+ (Maybe ShortString) -- cluster_id
+ |CHFile
+ (Maybe ShortString) -- content_type
+ (Maybe ShortString) -- content_encoding
+ (Maybe FieldTable) -- headers
+ (Maybe Octet) -- priority
+ (Maybe ShortString) -- reply_to
+ (Maybe ShortString) -- message_id
+ (Maybe ShortString) -- filename
+ (Maybe Timestamp) -- timestamp
+ (Maybe ShortString) -- cluster_id
+ |CHStream
+ (Maybe ShortString) -- content_type
+ (Maybe ShortString) -- content_encoding
+ (Maybe FieldTable) -- headers
+ (Maybe Octet) -- priority
+ (Maybe Timestamp) -- timestamp
+ |CHTx
+
+ |CHDtx
+
+ |CHTunnel
+ (Maybe FieldTable) -- headers
+ (Maybe ShortString) -- proxy_name
+ (Maybe ShortString) -- data_name
+ (Maybe Octet) -- durable
+ (Maybe Octet) -- broadcast
+ |CHTest
+
+
+ deriving Show
+
+--Bits need special handling because AMQP requires contiguous bits to be packed into a Word8
+-- | Packs up to 8 bits into a Word8
+putBits :: [Bit] -> Put
+putBits xs = putWord8 $ putBits' 0 xs
+putBits' _ [] = 0
+putBits' offset (x:xs) = (shiftL (toInt x) offset) .|. (putBits' (offset+1) xs)
+ where toInt True = 1
+ toInt False = 0
+getBits num = getWord8 >>= \x -> return $ getBits' num 0 x
+getBits' 0 offset _= []
+getBits' num offset x = ((x .&. (2^offset)) /= 0) : (getBits' (num-1) (offset+1) x)
+-- | Packs up to 15 Bits into a Word16 (=Property Flags)
+putPropBits :: [Bit] -> Put
+putPropBits xs = putWord16be $ (putPropBits' 0 xs)
+putPropBits' _ [] = 0
+putPropBits' offset (x:xs) = (shiftL (toInt x) (15-offset)) .|. (putPropBits' (offset+1) xs)
+ where toInt True = 1
+ toInt False = 0
+getPropBits num = getWord16be >>= \x -> return $ getPropBits' num 0 x
+getPropBits' 0 offset _= []
+getPropBits' num offset x = ((x .&. (2^(15-offset))) /= 0) : (getPropBits' (num-1) (offset+1) x)
+condGet False = return Nothing
+condGet True = get >>= \x -> return $ Just x
+
+condPut (Just x) = put x
+condPut _ = return ()
+
+instance Binary MethodPayload where
+ put (Connection_start a b c d e) = putWord16be 10 >> putWord16be 10 >> put a >> put b >> put c >> put d >> put e
+ put (Connection_start_ok a b c d) = putWord16be 10 >> putWord16be 11 >> put a >> put b >> put c >> put d
+ put (Connection_secure a) = putWord16be 10 >> putWord16be 20 >> put a
+ put (Connection_secure_ok a) = putWord16be 10 >> putWord16be 21 >> put a
+ put (Connection_tune a b c) = putWord16be 10 >> putWord16be 30 >> put a >> put b >> put c
+ put (Connection_tune_ok a b c) = putWord16be 10 >> putWord16be 31 >> put a >> put b >> put c
+ put (Connection_open a b c) = putWord16be 10 >> putWord16be 40 >> put a >> put b >> put c
+ put (Connection_open_ok a) = putWord16be 10 >> putWord16be 41 >> put a
+ put (Connection_redirect a b) = putWord16be 10 >> putWord16be 50 >> put a >> put b
+ put (Connection_close a b c d) = putWord16be 10 >> putWord16be 60 >> put a >> put b >> put c >> put d
+ put Connection_close_ok = putWord16be 10 >> putWord16be 61
+ put (Channel_open a) = putWord16be 20 >> putWord16be 10 >> put a
+ put Channel_open_ok = putWord16be 20 >> putWord16be 11
+ put (Channel_flow a) = putWord16be 20 >> putWord16be 20 >> put a
+ put (Channel_flow_ok a) = putWord16be 20 >> putWord16be 21 >> put a
+ put (Channel_alert a b c) = putWord16be 20 >> putWord16be 30 >> put a >> put b >> put c
+ put (Channel_close a b c d) = putWord16be 20 >> putWord16be 40 >> put a >> put b >> put c >> put d
+ put Channel_close_ok = putWord16be 20 >> putWord16be 41
+ put (Access_request a b c d e f) = putWord16be 30 >> putWord16be 10 >> put a >> putBits [b,c,d,e,f]
+ put (Access_request_ok a) = putWord16be 30 >> putWord16be 11 >> put a
+ put (Exchange_declare a b c d e f g h i) = putWord16be 40 >> putWord16be 10 >> put a >> put b >> put c >> putBits [d,e,f,g,h] >> put i
+ put Exchange_declare_ok = putWord16be 40 >> putWord16be 11
+ put (Exchange_delete a b c d) = putWord16be 40 >> putWord16be 20 >> put a >> put b >> putBits [c,d]
+ put Exchange_delete_ok = putWord16be 40 >> putWord16be 21
+ put (Queue_declare a b c d e f g h) = putWord16be 50 >> putWord16be 10 >> put a >> put b >> putBits [c,d,e,f,g] >> put h
+ put (Queue_declare_ok a b c) = putWord16be 50 >> putWord16be 11 >> put a >> put b >> put c
+ put (Queue_bind a b c d e f) = putWord16be 50 >> putWord16be 20 >> put a >> put b >> put c >> put d >> put e >> put f
+ put Queue_bind_ok = putWord16be 50 >> putWord16be 21
+ put (Queue_purge a b c) = putWord16be 50 >> putWord16be 30 >> put a >> put b >> put c
+ put (Queue_purge_ok a) = putWord16be 50 >> putWord16be 31 >> put a
+ put (Queue_delete a b c d e) = putWord16be 50 >> putWord16be 40 >> put a >> put b >> putBits [c,d,e]
+ put (Queue_delete_ok a) = putWord16be 50 >> putWord16be 41 >> put a
+ put (Basic_qos a b c) = putWord16be 60 >> putWord16be 10 >> put a >> put b >> put c
+ put Basic_qos_ok = putWord16be 60 >> putWord16be 11
+ put (Basic_consume a b c d e f g) = putWord16be 60 >> putWord16be 20 >> put a >> put b >> put c >> putBits [d,e,f,g]
+ put (Basic_consume_ok a) = putWord16be 60 >> putWord16be 21 >> put a
+ put (Basic_cancel a b) = putWord16be 60 >> putWord16be 30 >> put a >> put b
+ put (Basic_cancel_ok a) = putWord16be 60 >> putWord16be 31 >> put a
+ put (Basic_publish a b c d e) = putWord16be 60 >> putWord16be 40 >> put a >> put b >> put c >> putBits [d,e]
+ put (Basic_return a b c d) = putWord16be 60 >> putWord16be 50 >> put a >> put b >> put c >> put d
+ put (Basic_deliver a b c d e) = putWord16be 60 >> putWord16be 60 >> put a >> put b >> put c >> put d >> put e
+ put (Basic_get a b c) = putWord16be 60 >> putWord16be 70 >> put a >> put b >> put c
+ put (Basic_get_ok a b c d e) = putWord16be 60 >> putWord16be 71 >> put a >> put b >> put c >> put d >> put e
+ put (Basic_get_empty a) = putWord16be 60 >> putWord16be 72 >> put a
+ put (Basic_ack a b) = putWord16be 60 >> putWord16be 80 >> put a >> put b
+ put (Basic_reject a b) = putWord16be 60 >> putWord16be 90 >> put a >> put b
+ put (Basic_recover a) = putWord16be 60 >> putWord16be 100 >> put a
+ put (File_qos a b c) = putWord16be 70 >> putWord16be 10 >> put a >> put b >> put c
+ put File_qos_ok = putWord16be 70 >> putWord16be 11
+ put (File_consume a b c d e f g) = putWord16be 70 >> putWord16be 20 >> put a >> put b >> put c >> putBits [d,e,f,g]
+ put (File_consume_ok a) = putWord16be 70 >> putWord16be 21 >> put a
+ put (File_cancel a b) = putWord16be 70 >> putWord16be 30 >> put a >> put b
+ put (File_cancel_ok a) = putWord16be 70 >> putWord16be 31 >> put a
+ put (File_open a b) = putWord16be 70 >> putWord16be 40 >> put a >> put b
+ put (File_open_ok a) = putWord16be 70 >> putWord16be 41 >> put a
+ put File_stage = putWord16be 70 >> putWord16be 50
+ put (File_publish a b c d e f) = putWord16be 70 >> putWord16be 60 >> put a >> put b >> put c >> putBits [d,e] >> put f
+ put (File_return a b c d) = putWord16be 70 >> putWord16be 70 >> put a >> put b >> put c >> put d
+ put (File_deliver a b c d e f) = putWord16be 70 >> putWord16be 80 >> put a >> put b >> put c >> put d >> put e >> put f
+ put (File_ack a b) = putWord16be 70 >> putWord16be 90 >> put a >> put b
+ put (File_reject a b) = putWord16be 70 >> putWord16be 100 >> put a >> put b
+ put (Stream_qos a b c d) = putWord16be 80 >> putWord16be 10 >> put a >> put b >> put c >> put d
+ put Stream_qos_ok = putWord16be 80 >> putWord16be 11
+ put (Stream_consume a b c d e f) = putWord16be 80 >> putWord16be 20 >> put a >> put b >> put c >> putBits [d,e,f]
+ put (Stream_consume_ok a) = putWord16be 80 >> putWord16be 21 >> put a
+ put (Stream_cancel a b) = putWord16be 80 >> putWord16be 30 >> put a >> put b
+ put (Stream_cancel_ok a) = putWord16be 80 >> putWord16be 31 >> put a
+ put (Stream_publish a b c d e) = putWord16be 80 >> putWord16be 40 >> put a >> put b >> put c >> putBits [d,e]
+ put (Stream_return a b c d) = putWord16be 80 >> putWord16be 50 >> put a >> put b >> put c >> put d
+ put (Stream_deliver a b c d) = putWord16be 80 >> putWord16be 60 >> put a >> put b >> put c >> put d
+ put Tx_select = putWord16be 90 >> putWord16be 10
+ put Tx_select_ok = putWord16be 90 >> putWord16be 11
+ put Tx_commit = putWord16be 90 >> putWord16be 20
+ put Tx_commit_ok = putWord16be 90 >> putWord16be 21
+ put Tx_rollback = putWord16be 90 >> putWord16be 30
+ put Tx_rollback_ok = putWord16be 90 >> putWord16be 31
+ put Dtx_select = putWord16be 100 >> putWord16be 10
+ put Dtx_select_ok = putWord16be 100 >> putWord16be 11
+ put (Dtx_start a) = putWord16be 100 >> putWord16be 20 >> put a
+ put Dtx_start_ok = putWord16be 100 >> putWord16be 21
+ put (Tunnel_request a) = putWord16be 110 >> putWord16be 10 >> put a
+ put (Test_integer a b c d e) = putWord16be 120 >> putWord16be 10 >> put a >> put b >> put c >> put d >> put e
+ put (Test_integer_ok a) = putWord16be 120 >> putWord16be 11 >> put a
+ put (Test_string a b c) = putWord16be 120 >> putWord16be 20 >> put a >> put b >> put c
+ put (Test_string_ok a) = putWord16be 120 >> putWord16be 21 >> put a
+ put (Test_table a b c) = putWord16be 120 >> putWord16be 30 >> put a >> put b >> put c
+ put (Test_table_ok a b) = putWord16be 120 >> putWord16be 31 >> put a >> put b
+ put Test_content = putWord16be 120 >> putWord16be 40
+ put (Test_content_ok a) = putWord16be 120 >> putWord16be 41 >> put a
+ get = do
+ classID <- getWord16be
+ methodID <- getWord16be
+ case (classID, methodID) of
+ (10,10) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> get >>= \e -> return (Connection_start a b c d e)
+ (10,11) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> return (Connection_start_ok a b c d)
+ (10,20) -> get >>= \a -> return (Connection_secure a)
+ (10,21) -> get >>= \a -> return (Connection_secure_ok a)
+ (10,30) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Connection_tune a b c)
+ (10,31) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Connection_tune_ok a b c)
+ (10,40) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Connection_open a b c)
+ (10,41) -> get >>= \a -> return (Connection_open_ok a)
+ (10,50) -> get >>= \a -> get >>= \b -> return (Connection_redirect a b)
+ (10,60) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> return (Connection_close a b c d)
+ (10,61) -> return Connection_close_ok
+ (20,10) -> get >>= \a -> return (Channel_open a)
+ (20,11) -> return Channel_open_ok
+ (20,20) -> get >>= \a -> return (Channel_flow a)
+ (20,21) -> get >>= \a -> return (Channel_flow_ok a)
+ (20,30) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Channel_alert a b c)
+ (20,40) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> return (Channel_close a b c d)
+ (20,41) -> return Channel_close_ok
+ (30,10) -> get >>= \a -> getBits 5 >>= \[b,c,d,e,f] -> return (Access_request a b c d e f)
+ (30,11) -> get >>= \a -> return (Access_request_ok a)
+ (40,10) -> get >>= \a -> get >>= \b -> get >>= \c -> getBits 5 >>= \[d,e,f,g,h] -> get >>= \i -> return (Exchange_declare a b c d e f g h i)
+ (40,11) -> return Exchange_declare_ok
+ (40,20) -> get >>= \a -> get >>= \b -> getBits 2 >>= \[c,d] -> return (Exchange_delete a b c d)
+ (40,21) -> return Exchange_delete_ok
+ (50,10) -> get >>= \a -> get >>= \b -> getBits 5 >>= \[c,d,e,f,g] -> get >>= \h -> return (Queue_declare a b c d e f g h)
+ (50,11) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Queue_declare_ok a b c)
+ (50,20) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> get >>= \e -> get >>= \f -> return (Queue_bind a b c d e f)
+ (50,21) -> return Queue_bind_ok
+ (50,30) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Queue_purge a b c)
+ (50,31) -> get >>= \a -> return (Queue_purge_ok a)
+ (50,40) -> get >>= \a -> get >>= \b -> getBits 3 >>= \[c,d,e] -> return (Queue_delete a b c d e)
+ (50,41) -> get >>= \a -> return (Queue_delete_ok a)
+ (60,10) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Basic_qos a b c)
+ (60,11) -> return Basic_qos_ok
+ (60,20) -> get >>= \a -> get >>= \b -> get >>= \c -> getBits 4 >>= \[d,e,f,g] -> return (Basic_consume a b c d e f g)
+ (60,21) -> get >>= \a -> return (Basic_consume_ok a)
+ (60,30) -> get >>= \a -> get >>= \b -> return (Basic_cancel a b)
+ (60,31) -> get >>= \a -> return (Basic_cancel_ok a)
+ (60,40) -> get >>= \a -> get >>= \b -> get >>= \c -> getBits 2 >>= \[d,e] -> return (Basic_publish a b c d e)
+ (60,50) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> return (Basic_return a b c d)
+ (60,60) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> get >>= \e -> return (Basic_deliver a b c d e)
+ (60,70) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Basic_get a b c)
+ (60,71) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> get >>= \e -> return (Basic_get_ok a b c d e)
+ (60,72) -> get >>= \a -> return (Basic_get_empty a)
+ (60,80) -> get >>= \a -> get >>= \b -> return (Basic_ack a b)
+ (60,90) -> get >>= \a -> get >>= \b -> return (Basic_reject a b)
+ (60,100) -> get >>= \a -> return (Basic_recover a)
+ (70,10) -> get >>= \a -> get >>= \b -> get >>= \c -> return (File_qos a b c)
+ (70,11) -> return File_qos_ok
+ (70,20) -> get >>= \a -> get >>= \b -> get >>= \c -> getBits 4 >>= \[d,e,f,g] -> return (File_consume a b c d e f g)
+ (70,21) -> get >>= \a -> return (File_consume_ok a)
+ (70,30) -> get >>= \a -> get >>= \b -> return (File_cancel a b)
+ (70,31) -> get >>= \a -> return (File_cancel_ok a)
+ (70,40) -> get >>= \a -> get >>= \b -> return (File_open a b)
+ (70,41) -> get >>= \a -> return (File_open_ok a)
+ (70,50) -> return File_stage
+ (70,60) -> get >>= \a -> get >>= \b -> get >>= \c -> getBits 2 >>= \[d,e] -> get >>= \f -> return (File_publish a b c d e f)
+ (70,70) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> return (File_return a b c d)
+ (70,80) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> get >>= \e -> get >>= \f -> return (File_deliver a b c d e f)
+ (70,90) -> get >>= \a -> get >>= \b -> return (File_ack a b)
+ (70,100) -> get >>= \a -> get >>= \b -> return (File_reject a b)
+ (80,10) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> return (Stream_qos a b c d)
+ (80,11) -> return Stream_qos_ok
+ (80,20) -> get >>= \a -> get >>= \b -> get >>= \c -> getBits 3 >>= \[d,e,f] -> return (Stream_consume a b c d e f)
+ (80,21) -> get >>= \a -> return (Stream_consume_ok a)
+ (80,30) -> get >>= \a -> get >>= \b -> return (Stream_cancel a b)
+ (80,31) -> get >>= \a -> return (Stream_cancel_ok a)
+ (80,40) -> get >>= \a -> get >>= \b -> get >>= \c -> getBits 2 >>= \[d,e] -> return (Stream_publish a b c d e)
+ (80,50) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> return (Stream_return a b c d)
+ (80,60) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> return (Stream_deliver a b c d)
+ (90,10) -> return Tx_select
+ (90,11) -> return Tx_select_ok
+ (90,20) -> return Tx_commit
+ (90,21) -> return Tx_commit_ok
+ (90,30) -> return Tx_rollback
+ (90,31) -> return Tx_rollback_ok
+ (100,10) -> return Dtx_select
+ (100,11) -> return Dtx_select_ok
+ (100,20) -> get >>= \a -> return (Dtx_start a)
+ (100,21) -> return Dtx_start_ok
+ (110,10) -> get >>= \a -> return (Tunnel_request a)
+ (120,10) -> get >>= \a -> get >>= \b -> get >>= \c -> get >>= \d -> get >>= \e -> return (Test_integer a b c d e)
+ (120,11) -> get >>= \a -> return (Test_integer_ok a)
+ (120,20) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Test_string a b c)
+ (120,21) -> get >>= \a -> return (Test_string_ok a)
+ (120,30) -> get >>= \a -> get >>= \b -> get >>= \c -> return (Test_table a b c)
+ (120,31) -> get >>= \a -> get >>= \b -> return (Test_table_ok a b)
+ (120,40) -> return Test_content
+ (120,41) -> get >>= \a -> return (Test_content_ok a)
+data MethodPayload =
+
+ Connection_start
+ Octet -- version_major
+ Octet -- version_minor
+ FieldTable -- server_properties
+ LongString -- mechanisms
+ LongString -- locales
+ |
+ Connection_start_ok
+ FieldTable -- client_properties
+ ShortString -- mechanism
+ LongString -- response
+ ShortString -- locale
+ |
+ Connection_secure
+ LongString -- challenge
+ |
+ Connection_secure_ok
+ LongString -- response
+ |
+ Connection_tune
+ ShortInt -- channel_max
+ LongInt -- frame_max
+ ShortInt -- heartbeat
+ |
+ Connection_tune_ok
+ ShortInt -- channel_max
+ LongInt -- frame_max
+ ShortInt -- heartbeat
+ |
+ Connection_open
+ ShortString -- virtual_host
+ ShortString -- capabilities
+ Bit -- insist
+ |
+ Connection_open_ok
+ ShortString -- known_hosts
+ |
+ Connection_redirect
+ ShortString -- host
+ ShortString -- known_hosts
+ |
+ Connection_close
+ ShortInt -- reply_code
+ ShortString -- reply_text
+ ShortInt -- class_id
+ ShortInt -- method_id
+ |
+ Connection_close_ok
+
+ |
+ Channel_open
+ ShortString -- out_of_band
+ |
+ Channel_open_ok
+
+ |
+ Channel_flow
+ Bit -- active
+ |
+ Channel_flow_ok
+ Bit -- active
+ |
+ Channel_alert
+ ShortInt -- reply_code
+ ShortString -- reply_text
+ FieldTable -- details
+ |
+ Channel_close
+ ShortInt -- reply_code
+ ShortString -- reply_text
+ ShortInt -- class_id
+ ShortInt -- method_id
+ |
+ Channel_close_ok
+
+ |
+ Access_request
+ ShortString -- realm
+ Bit -- exclusive
+ Bit -- passive
+ Bit -- active
+ Bit -- write
+ Bit -- read
+ |
+ Access_request_ok
+ ShortInt -- ticket
+ |
+ Exchange_declare
+ ShortInt -- ticket
+ ShortString -- exchange
+ ShortString -- typ
+ Bit -- passive
+ Bit -- durable
+ Bit -- auto_delete
+ Bit -- internal
+ Bit -- nowait
+ FieldTable -- arguments
+ |
+ Exchange_declare_ok
+
+ |
+ Exchange_delete
+ ShortInt -- ticket
+ ShortString -- exchange
+ Bit -- if_unused
+ Bit -- nowait
+ |
+ Exchange_delete_ok
+
+ |
+ Queue_declare
+ ShortInt -- ticket
+ ShortString -- queue
+ Bit -- passive
+ Bit -- durable
+ Bit -- exclusive
+ Bit -- auto_delete
+ Bit -- nowait
+ FieldTable -- arguments
+ |
+ Queue_declare_ok
+ ShortString -- queue
+ LongInt -- message_count
+ LongInt -- consumer_count
+ |
+ Queue_bind
+ ShortInt -- ticket
+ ShortString -- queue
+ ShortString -- exchange
+ ShortString -- routing_key
+ Bit -- nowait
+ FieldTable -- arguments
+ |
+ Queue_bind_ok
+
+ |
+ Queue_purge
+ ShortInt -- ticket
+ ShortString -- queue
+ Bit -- nowait
+ |
+ Queue_purge_ok
+ LongInt -- message_count
+ |
+ Queue_delete
+ ShortInt -- ticket
+ ShortString -- queue
+ Bit -- if_unused
+ Bit -- if_empty
+ Bit -- nowait
+ |
+ Queue_delete_ok
+ LongInt -- message_count
+ |
+ Basic_qos
+ LongInt -- prefetch_size
+ ShortInt -- prefetch_count
+ Bit -- global
+ |
+ Basic_qos_ok
+
+ |
+ Basic_consume
+ ShortInt -- ticket
+ ShortString -- queue
+ ShortString -- consumer_tag
+ Bit -- no_local
+ Bit -- no_ack
+ Bit -- exclusive
+ Bit -- nowait
+ |
+ Basic_consume_ok
+ ShortString -- consumer_tag
+ |
+ Basic_cancel
+ ShortString -- consumer_tag
+ Bit -- nowait
+ |
+ Basic_cancel_ok
+ ShortString -- consumer_tag
+ |
+ Basic_publish
+ ShortInt -- ticket
+ ShortString -- exchange
+ ShortString -- routing_key
+ Bit -- mandatory
+ Bit -- immediate
+ |
+ Basic_return
+ ShortInt -- reply_code
+ ShortString -- reply_text
+ ShortString -- exchange
+ ShortString -- routing_key
+ |
+ Basic_deliver
+ ShortString -- consumer_tag
+ LongLongInt -- delivery_tag
+ Bit -- redelivered
+ ShortString -- exchange
+ ShortString -- routing_key
+ |
+ Basic_get
+ ShortInt -- ticket
+ ShortString -- queue
+ Bit -- no_ack
+ |
+ Basic_get_ok
+ LongLongInt -- delivery_tag
+ Bit -- redelivered
+ ShortString -- exchange
+ ShortString -- routing_key
+ LongInt -- message_count
+ |
+ Basic_get_empty
+ ShortString -- cluster_id
+ |
+ Basic_ack
+ LongLongInt -- delivery_tag
+ Bit -- multiple
+ |
+ Basic_reject
+ LongLongInt -- delivery_tag
+ Bit -- requeue
+ |
+ Basic_recover
+ Bit -- requeue
+ |
+ File_qos
+ LongInt -- prefetch_size
+ ShortInt -- prefetch_count
+ Bit -- global
+ |
+ File_qos_ok
+
+ |
+ File_consume
+ ShortInt -- ticket
+ ShortString -- queue
+ ShortString -- consumer_tag
+ Bit -- no_local
+ Bit -- no_ack
+ Bit -- exclusive
+ Bit -- nowait
+ |
+ File_consume_ok
+ ShortString -- consumer_tag
+ |
+ File_cancel
+ ShortString -- consumer_tag
+ Bit -- nowait
+ |
+ File_cancel_ok
+ ShortString -- consumer_tag
+ |
+ File_open
+ ShortString -- identifier
+ LongLongInt -- content_size
+ |
+ File_open_ok
+ LongLongInt -- staged_size
+ |
+ File_stage
+
+ |
+ File_publish
+ ShortInt -- ticket
+ ShortString -- exchange
+ ShortString -- routing_key
+ Bit -- mandatory
+ Bit -- immediate
+ ShortString -- identifier
+ |
+ File_return
+ ShortInt -- reply_code
+ ShortString -- reply_text
+ ShortString -- exchange
+ ShortString -- routing_key
+ |
+ File_deliver
+ ShortString -- consumer_tag
+ LongLongInt -- delivery_tag
+ Bit -- redelivered
+ ShortString -- exchange
+ ShortString -- routing_key
+ ShortString -- identifier
+ |
+ File_ack
+ LongLongInt -- delivery_tag
+ Bit -- multiple
+ |
+ File_reject
+ LongLongInt -- delivery_tag
+ Bit -- requeue
+ |
+ Stream_qos
+ LongInt -- prefetch_size
+ ShortInt -- prefetch_count
+ LongInt -- consume_rate
+ Bit -- global
+ |
+ Stream_qos_ok
+
+ |
+ Stream_consume
+ ShortInt -- ticket
+ ShortString -- queue
+ ShortString -- consumer_tag
+ Bit -- no_local
+ Bit -- exclusive
+ Bit -- nowait
+ |
+ Stream_consume_ok
+ ShortString -- consumer_tag
+ |
+ Stream_cancel
+ ShortString -- consumer_tag
+ Bit -- nowait
+ |
+ Stream_cancel_ok
+ ShortString -- consumer_tag
+ |
+ Stream_publish
+ ShortInt -- ticket
+ ShortString -- exchange
+ ShortString -- routing_key
+ Bit -- mandatory
+ Bit -- immediate
+ |
+ Stream_return
+ ShortInt -- reply_code
+ ShortString -- reply_text
+ ShortString -- exchange
+ ShortString -- routing_key
+ |
+ Stream_deliver
+ ShortString -- consumer_tag
+ LongLongInt -- delivery_tag
+ ShortString -- exchange
+ ShortString -- queue
+ |
+ Tx_select
+
+ |
+ Tx_select_ok
+
+ |
+ Tx_commit
+
+ |
+ Tx_commit_ok
+
+ |
+ Tx_rollback
+
+ |
+ Tx_rollback_ok
+
+ |
+ Dtx_select
+
+ |
+ Dtx_select_ok
+
+ |
+ Dtx_start
+ ShortString -- dtx_identifier
+ |
+ Dtx_start_ok
+
+ |
+ Tunnel_request
+ FieldTable -- meta_data
+ |
+ Test_integer
+ Octet -- integer_1
+ ShortInt -- integer_2
+ LongInt -- integer_3
+ LongLongInt -- integer_4
+ Octet -- operation
+ |
+ Test_integer_ok
+ LongLongInt -- result
+ |
+ Test_string
+ ShortString -- string_1
+ LongString -- string_2
+ Octet -- operation
+ |
+ Test_string_ok
+ LongString -- result
+ |
+ Test_table
+ FieldTable -- table
+ Octet -- integer_op
+ Octet -- string_op
+ |
+ Test_table_ok
+ LongLongInt -- integer_result
+ LongString -- string_result
+ |
+ Test_content
+
+ |
+ Test_content_ok
+ LongInt -- content_checksum
+
+ deriving Show
48 Network/AMQP/Helpers.hs
@@ -0,0 +1,48 @@
+module Network.AMQP.Helpers where
+
+import Control.Exception
+
+
+import Control.Concurrent.Chan
+import Control.Concurrent.MVar
+import qualified Data.ByteString.Char8 as BS
+import qualified Data.ByteString.Lazy.Char8 as BL
+import Control.Applicative
+
+
+toStrict :: BL.ByteString -> BS.ByteString
+toStrict x = BS.concat $ BL.toChunks x
+
+toLazy :: BS.ByteString -> BL.ByteString
+toLazy x = BL.fromChunks [x]
+
+
+
+-- if the lock is open, calls to waitLock will immediately return. if it is closed, calls to waitLock will block. if the lock is killed, it will always be open and can't be closed anymore
+data Lock = Lock (MVar Bool) (MVar ())
+
+newLock = do
+ a <- newMVar False
+ b <- newMVar ()
+ return $ Lock a b
+
+openLock :: Lock -> IO ()
+openLock (Lock a b) = do
+ tryPutMVar b ()
+ return ()
+
+closeLock :: Lock -> IO ()
+closeLock (Lock a b) = do
+ withMVar a $ \killed ->
+ if killed
+ then return ()
+ else tryTakeMVar b >> return ()
+ return ()
+
+
+waitLock (Lock a b) = readMVar b
+
+killLock (Lock a b) = do
+ modifyMVar_ a $ \x -> return True
+ tryPutMVar b ()
+
98 Network/AMQP/Protocol.hs
@@ -0,0 +1,98 @@
+module Network.AMQP.Protocol where
+
+
+import Data.Binary
+import Data.Binary.Get
+import Data.Binary.Put
+
+import qualified Data.ByteString.Lazy.Char8 as BL
+import qualified Data.Binary.Put as BPut
+import Control.Monad
+
+import Data.Char
+import Data.Int
+
+import Network.AMQP.Types
+import Network.AMQP.Generated
+
+
+--True if a content (contentheader and possibly contentbody) will follow the method
+hasContent (MethodPayload (Basic_get_ok _ _ _ _ _)) = True
+hasContent (MethodPayload (Basic_deliver _ _ _ _ _)) = True
+hasContent (MethodPayload (Basic_return _ _ _ _)) = True
+hasContent _ = False
+
+
+data Frame = Frame ChannelID FramePayload --channel, payload
+ deriving Show
+instance Binary Frame where
+ get = do
+ frameType <- getWord8
+ channel <- get :: Get ChannelID
+ payloadSize <- get :: Get PayloadSize
+ payload <- getPayload frameType payloadSize :: Get FramePayload
+ 0xCE <- getWord8 --frame end
+ return $ Frame channel payload
+ put (Frame chan payload) = do
+ putWord8 $ frameType payload
+ put chan
+ let buf = runPut $ putPayload payload
+ put ((fromIntegral $ BL.length buf)::PayloadSize)
+ putLazyByteString buf
+ putWord8 0xCE
+
+
+-- gets the size of the frame
+-- the bytestring should be at least 7 bytes long, otherwise this method will fail
+peekFrameSize :: BL.ByteString -> PayloadSize
+peekFrameSize b = runGet f b
+ where
+ f = do
+ getWord8 -- 1 byte
+ get :: Get ChannelID -- 2 bytes
+ ps <- get :: Get PayloadSize -- 4 bytes
+ return ps
+
+
+data FramePayload =
+ MethodPayload MethodPayload
+ | ContentHeaderPayload ShortInt ShortInt LongLongInt ContentHeaderProperties --classID, weight, bodySize, propertyFields
+ | ContentBodyPayload BL.ByteString
+ deriving Show
+
+frameType (MethodPayload _) = 1
+frameType (ContentHeaderPayload _ _ _ _) = 2
+frameType (ContentBodyPayload _) = 3
+
+getPayload 1 payloadSize = do --METHOD FRAME
+ payLoad <- get :: Get MethodPayload
+ return (MethodPayload payLoad)
+
+getPayload 2 payloadSize = do --content header frame
+ classID <- get :: Get ShortInt
+ weight <- get :: Get ShortInt
+ bodySize <- get :: Get LongLongInt
+
+ props <- getContentHeaderProperties classID
+ return (ContentHeaderPayload classID weight bodySize props)
+
+getPayload 3 payloadSize = do --content body frame
+ payload <- getLazyByteString $ fromIntegral payloadSize
+ return (ContentBodyPayload payload)
+
+putPayload (MethodPayload payload) = do
+ put payload
+
+putPayload (ContentHeaderPayload classID weight bodySize p) = do
+ put classID
+ put weight
+ put bodySize
+
+ putContentHeaderProperties p
+
+putPayload (ContentBodyPayload payload) = do
+ putLazyByteString payload
+
+
+
+
157 Network/AMQP/Types.hs
@@ -0,0 +1,157 @@
+module Network.AMQP.Types
+ (Octet,
+ Bit,
+ ChannelID,
+ PayloadSize,
+ ShortInt,
+ LongInt,
+ LongLongInt,
+ ShortString(..),
+ LongString(..),
+ Timestamp,
+ FieldTable(..),
+ FieldValue(..),
+ Decimals,
+ DecimalValue(..)
+ )
+ where
+
+
+import Data.Int
+import Data.Char
+import Data.Binary
+import Data.Binary.Get
+import Data.Binary.Put
+import qualified Data.ByteString.Char8 as BS
+import qualified Data.ByteString.Lazy.Char8 as BL
+import qualified Data.ByteString.Lazy.Internal as BL
+import qualified Data.Binary.Put as BPut
+import Control.Monad
+import qualified Data.Map as M
+
+
+
+-- performs runGet on a bytestring until the string is empty
+readMany :: (Show t, Binary t) => BL.ByteString -> [t]
+readMany str = runGet (readMany' [] 0) str
+readMany' _ 1000 = error "readMany overflow"
+readMany' acc overflow = do
+ x <- get
+ rem <- remaining
+ if rem > 0
+ then readMany' (x:acc) (overflow+1)
+ else return (x:acc)
+
+putMany x = mapM_ put x
+
+-- Lowlevel Types
+type Octet = Word8
+type Bit = Bool
+
+type ChannelID = ShortInt
+type PayloadSize = LongInt
+
+type ShortInt = Word16
+type LongInt = Word32
+type LongLongInt = Word64
+
+
+
+newtype ShortString = ShortString String
+ deriving (Show, Ord, Eq)
+instance Binary ShortString where
+ get = do
+ len <- getWord8
+ dat <- getByteString (fromIntegral len)
+ return $ ShortString $ BS.unpack dat
+ put (ShortString x) = do
+ let s = BS.pack $ take 255 x --ensure string isn't longer than 255 bytes
+ putWord8 $ fromIntegral (BS.length s)
+ putByteString s
+
+newtype LongString = LongString String
+ deriving Show
+instance Binary LongString where
+ get = do
+ len <- getWord32be
+ dat <- getByteString (fromIntegral len)
+ return $ LongString $ BS.unpack dat
+ put (LongString x) = do
+ putWord32be $ fromIntegral (length x)
+ putByteString (BS.pack x)
+
+type Timestamp = LongLongInt
+
+
+
+--- field-table ---
+data FieldTable = FieldTable (M.Map ShortString FieldValue)
+ deriving Show
+instance Binary FieldTable where
+ get = do
+ len <- get :: Get LongInt --length of fieldValuePairs in bytes
+
+ if len > 0
+ then do
+ fvp <- getLazyByteString (fromIntegral len)
+ let !fields = readMany fvp
+
+ return $ FieldTable $ M.fromList fields
+ else return $ FieldTable $ M.empty
+
+ put (FieldTable fvp) = do
+ let bytes = runPut (putMany $ M.toList fvp) :: BL.ByteString
+ put ((fromIntegral $ BL.length bytes):: LongInt)
+ putLazyByteString bytes
+
+
+
+--- field-value ---
+
+data FieldValue = FVLongString LongString
+ | FVSignedInt Int32
+ | FVDecimalValue DecimalValue
+ | FVTimestamp Timestamp
+ | FVFieldTable FieldTable
+ deriving Show
+
+instance Binary FieldValue where
+ get = do
+ fieldType <- getWord8
+ case chr $ fromIntegral fieldType of
+ 'S' -> do
+ x <- get :: Get LongString
+ return $ FVLongString x
+ 'I' -> do
+ x <- get :: Get Int32
+ return $ FVSignedInt x
+ 'D' -> do
+ x <- get :: Get DecimalValue
+ return $ FVDecimalValue $ x
+ 'T' -> do
+ x <- get :: Get Timestamp
+ return $ FVTimestamp x
+ 'F' -> do
+ ft <- get :: Get FieldTable
+ return $ FVFieldTable ft
+ put (FVLongString s) = put 'S' >> put s
+ put (FVSignedInt s) = put 'I' >> put s
+ put (FVDecimalValue s) = put 'D' >> put s
+ put (FVTimestamp s) = put 'T' >> put s
+ put (FVFieldTable s) = put 'F' >> put s
+
+
+
+data DecimalValue = DecimalValue Decimals LongInt
+ deriving Show
+instance Binary DecimalValue where
+ get = do
+ a <- getWord8
+ b <- get :: Get LongInt
+ return $ DecimalValue a b
+ put (DecimalValue a b) = put a >> put b
+
+type Decimals = Octet
+
+
+
3  Setup.lhs
@@ -0,0 +1,3 @@
+#!/usr/bin/env runhaskell
+> import Distribution.Simple
+> main = defaultMain
366 Tools/Builder.hs
@@ -0,0 +1,366 @@
+--reads the AMQP xml spec file and builds the "Generated.hs" module
+--this is pretty much a BIG HACK
+
+
+import Text.XML.Light
+import Text.XML.Light.Input
+import Text.XML.Light.Proc
+
+import qualified Data.Map as M
+import qualified Data.List as L
+
+import Data.Char
+import Data.Maybe
+
+
+data Class = Class String Int [Method] [Field] --className, classID, methods, content-fields
+ deriving Show
+data Method = Method String Int [Field] --methodName, methodID, fields
+ deriving Show
+data Field = TypeField String String --fieldName, fieldType
+ | DomainField String String --fieldName, domainName
+ deriving Show
+
+
+fieldType domainMap (TypeField _ x) = x
+fieldType domainMap (DomainField _ domain) = fromJust $ M.lookup domain domainMap
+
+
+main = do
+ spec <- readFile "amqp0-8.xml"
+ let parsed = parseXML spec
+ let !(Elem e) = parsed!!2
+
+ -- read domains
+ let domains = findChildren (unqual "domain") e
+
+ --map from domainName => type
+ let domainMap = M.fromList $ map readDomain domains
+
+
+ -- read classes
+ let classes = map readClass $ findChildren (unqual "class") e :: [Class]
+
+ -- generate data declaration
+ let dataDecl = "data MethodPayload = \n"++ (concat $ L.intersperse "\t|" $ concatMap (writeDataDeclForClass domainMap) classes)++"\n\tderiving Show"
+
+ -- generate binary instances for data-type
+ let binaryGetInst = (concat $ map ("\t"++) $ concatMap (writeBinaryGetInstForClass domainMap) classes)
+ let binaryPutInst = (concat $ map ("\t"++) $ concatMap (writeBinaryPutInstForClass domainMap) classes)
+
+ -- generate content types
+ let contentHeaders = (concat $ L.intersperse "\t|" $ map (writeContentHeaderForClass domainMap) classes)
+ let contentHeadersGetInst = concatMap (writeContentHeaderGetInstForClass domainMap) classes
+ let contentHeadersPutInst = concatMap (writeContentHeaderPutInstForClass domainMap) classes
+ let contentHeadersClassIDs = concatMap (writeContentHeaderClassIDsForClass domainMap) classes
+
+ writeFile "Generated.hs" (
+ "module Network.AMQP.Generated where\n\n"++
+ "import Network.AMQP.Types\n"++
+ "import Data.Maybe\n"++
+ "import Data.Binary\n"++
+ "import Data.Binary.Get\n"++
+ "import Data.Binary.Put\n"++
+ "import Data.Bits\n\n"++
+
+
+ contentHeadersGetInst++"\n"++
+ contentHeadersPutInst++"\n"++
+ contentHeadersClassIDs++"\n"++
+
+ "data ContentHeaderProperties = \n\t"++
+ contentHeaders++
+ "\n\tderiving Show\n\n"++
+
+
+
+ "--Bits need special handling because AMQP requires contiguous bits to be packed into a Word8\n"++
+ "-- | Packs up to 8 bits into a Word8\n"++
+ "putBits :: [Bit] -> Put\n"++
+ "putBits xs = putWord8 $ putBits' 0 xs\n"++
+ "putBits' _ [] = 0\n"++
+ "putBits' offset (x:xs) = (shiftL (toInt x) offset) .|. (putBits' (offset+1) xs)\n"++
+ " where toInt True = 1\n"++
+ " toInt False = 0\n"++
+
+
+ "getBits num = getWord8 >>= \\x -> return $ getBits' num 0 x\n"++
+ "getBits' 0 offset _= []\n"++
+ "getBits' num offset x = ((x .&. (2^offset)) /= 0) : (getBits' (num-1) (offset+1) x)\n"++
+
+ "-- | Packs up to 15 Bits into a Word16 (=Property Flags) \n"++
+ "putPropBits :: [Bit] -> Put\n"++
+ "putPropBits xs = putWord16be $ (putPropBits' 0 xs) \n"++
+ "putPropBits' _ [] = 0\n"++
+ "putPropBits' offset (x:xs) = (shiftL (toInt x) (15-offset)) .|. (putPropBits' (offset+1) xs)\n"++
+ " where toInt True = 1\n"++
+ " toInt False = 0\n"++
+
+
+ "getPropBits num = getWord16be >>= \\x -> return $ getPropBits' num 0 x \n"++
+ "getPropBits' 0 offset _= []\n"++
+ "getPropBits' num offset x = ((x .&. (2^(15-offset))) /= 0) : (getPropBits' (num-1) (offset+1) x)\n"++
+
+ "condGet False = return Nothing\n"++
+ "condGet True = get >>= \\x -> return $ Just x\n\n"++
+
+ "condPut (Just x) = put x\n"++
+ "condPut _ = return ()\n\n"++
+
+
+ "instance Binary MethodPayload where\n"++
+
+ -- put instances
+ binaryPutInst++
+
+ -- get instances
+ "\tget = do\n"++
+ "\t\tclassID <- getWord16be\n"++
+ "\t\tmethodID <- getWord16be\n"++
+ "\t\tcase (classID, methodID) of\n"++
+ binaryGetInst++
+
+ -- data declaration
+ dataDecl)
+
+
+translateType "octet" = "Octet"
+translateType "longstr" = "LongString"
+translateType "shortstr" = "ShortString"
+translateType "short" = "ShortInt"
+translateType "long" = "LongInt"
+translateType "bit" = "Bit"
+translateType "table" = "FieldTable"
+translateType "longlong" = "LongLongInt"
+translateType "timestamp" = "Timestamp"
+translateType x = error x
+
+
+fixClassName s = (toUpper $ head s):(tail s)
+fixMethodName s = map f s
+ where
+ f '-' = '_'
+ f x = x
+
+fixFieldName "type" = "typ"
+fixFieldName s = map f s
+ where
+ f ' ' = '_'
+ f x = x
+
+
+
+---- data declaration ----
+
+writeDataDeclForClass :: M.Map String String -> Class -> [String]
+writeDataDeclForClass domainMap (Class nam index methods _) =
+ map ("\n\t"++) $ map (writeDataDeclForMethod domainMap nam) methods
+
+writeDataDeclForMethod :: M.Map String String -> String -> Method -> String
+writeDataDeclForMethod domainMap className (Method nam index fields) =
+ let fullName = (fixClassName className) ++ "_"++(fixMethodName nam) in
+ --data type declaration
+ (writeTypeDecl domainMap fullName fields)
+ --binary instances
+ --(writeBinaryInstance fullName fields)
+
+writeTypeDecl domainMap fullName fields =
+ fullName++"\n\t\t"++(concat $ L.intersperse "\n\t\t" $ map writeF fields)++"\n"
+ where
+ writeF (TypeField nam typ) = (translateType typ)++" -- "++(fixFieldName nam)
+ writeF f@(DomainField nam domain) = (translateType $ fieldType domainMap f)++" -- "++(fixFieldName nam)
+
+
+
+---- binary get instance ----
+
+writeBinaryGetInstForClass :: M.Map String String -> Class -> [String]
+writeBinaryGetInstForClass domainMap (Class nam index methods _) =
+ map (writeBinaryGetInstForMethod domainMap nam index) methods
+
+writeBinaryGetInstForMethod :: M.Map String String -> String -> Int -> Method -> String
+writeBinaryGetInstForMethod domainMap className classIndex (Method nam index fields) =
+ let fullName = (fixClassName className) ++ "_"++(fixMethodName nam) in
+ --binary instances
+ "\t"++(writeBinaryGetInstance domainMap fullName classIndex index fields)
+
+
+writeBinaryGetInstance domainMap fullName classIndex methodIndex fields =
+ "\t("++(show classIndex)++","++(show methodIndex)++") -> "++getDef++"\n"
+
+ where
+ manyLetters = map (:[]) ['a'..'z']
+
+ fieldTypes :: [(String,String)] --(a..z, fieldType)
+ fieldTypes = zip manyLetters $ map (fieldType domainMap) fields
+
+ --consecutive BITS have to be merged into a Word8
+ --TODO: more than 8bits have to be split into several Word8
+ grouped :: [ [(String, String)] ]
+ grouped = L.groupBy (\(_,x) (_,y) -> x=="bit" && y=="bit") fieldTypes
+
+ --concatMap (\x -> " get >>= \\"++x++" ->") (take (length fields) manyLetters)
+
+ showBlob xs | length xs == 1 = "get >>= \\"++(fst $ xs!!0)++" -> "
+ showBlob xs = "getBits "++(show $ length xs)++" >>= \\["++(concat $ L.intersperse "," $ map fst xs)++"] -> "
+
+ getStmt = concatMap showBlob grouped
+
+ getDef =
+ let wrap = if (length fields) /= 0 then ("("++) . (++")") else id
+ in
+ getStmt
+ ++ " return "
+ ++ wrap (fullName ++ concatMap (" "++) (take (length fields) manyLetters))
+
+
+---- binary put instance ----
+
+writeBinaryPutInstForClass :: M.Map String String -> Class -> [String]
+writeBinaryPutInstForClass domainMap (Class nam index methods _) =
+ map (writeBinaryPutInstForMethod domainMap nam index) methods
+
+writeBinaryPutInstForMethod :: M.Map String String -> String -> Int -> Method -> String
+writeBinaryPutInstForMethod domainMap className classIndex (Method nam index fields) =
+ let fullName = (fixClassName className) ++ "_"++(fixMethodName nam) in
+ --binary instances
+ (writeBinaryPutInstance domainMap fullName classIndex index fields)
+
+
+writeBinaryPutInstance domainMap fullName classIndex methodIndex fields =
+ putDef++"\n"
+ where
+ manyLetters = map (:[]) ['a'..'z']
+
+ fieldTypes :: [(String,String)] --(a..z, fieldType)
+ fieldTypes = zip manyLetters $ map (fieldType domainMap) fields
+
+ --consecutive BITS have to be merged into a Word8
+ --TODO: more than 8bits have to be split into several Word8
+ grouped :: [ [(String, String)] ]
+ grouped = L.groupBy (\(_,x) (_,y) -> x=="bit" && y=="bit") fieldTypes
+
+ showBlob xs | length xs == 1 = " >> put "++(fst $ xs!!0)
+ showBlob xs = " >> putBits ["++(concat $ L.intersperse "," $ map fst xs)++"]"
+
+ putStmt = concatMap showBlob grouped
+
+ putDef =
+ let wrap = if (length fields) /= 0 then ("("++) . (++")") else id
+ pattern = fullName ++ concatMap (' ':) (take (length fields) manyLetters)
+ in
+ "put " ++ wrap pattern ++" = "
+ ++ "putWord16be "++(show classIndex)++" >> putWord16be "++(show methodIndex)
+ ++ putStmt
+
+
+---- content header declaration ----
+
+writeContentHeaderForClass :: M.Map String String -> Class -> String
+writeContentHeaderForClass domainMap (Class nam index methods fields) =
+ let fullName = "CH"++(fixClassName nam) in
+ (writeContentHeaderDecl domainMap fullName fields)
+
+writeContentHeaderDecl domainMap fullName fields =
+ fullName++"\n\t\t"++(concat $ L.intersperse "\n\t\t" $ map writeF fields)++"\n"
+ where
+ writeF (TypeField nam typ) = "(Maybe "++(translateType typ)++") -- "++(fixFieldName nam)
+ writeF f@(DomainField nam domain) = "(Maybe "++(translateType $ fieldType domainMap f)++") -- "++(fixFieldName nam)
+
+
+---- contentheader get instance ----
+
+writeContentHeaderGetInstForClass :: M.Map String String -> Class -> String
+writeContentHeaderGetInstForClass domainMap (Class nam index methods fields) =
+ let fullName = "CH"++(fixClassName nam) in
+ --binary instances
+ (writeContentHeaderGetInstance domainMap fullName index fields)
+
+writeContentHeaderGetInstance domainMap fullName classIndex fields =
+ "getContentHeaderProperties "++(show classIndex)++" = "++getDef++"\n"
+
+ where
+ manyLetters = map (:[]) ['a'..'z']
+ usedLetters = take (length fields) manyLetters
+
+ showBlob x = "condGet "++x++" >>= \\"++x++"' -> "
+
+ getStmt = concatMap showBlob usedLetters
+
+ getDef =
+ let wrap = if (length fields) /= 0 then ("("++) . (++")") else id
+ in
+ "getPropBits "++(show $ length fields) ++" >>= \\["++(concat $ L.intersperse "," usedLetters )++"] -> "++
+ getStmt
+ ++ " return "
+ ++ wrap (fullName ++ " " ++ concatMap (++"' ") (take (length fields) usedLetters))
+
+
+
+
+
+---- contentheader put instance ----
+
+writeContentHeaderPutInstForClass :: M.Map String String -> Class -> String
+writeContentHeaderPutInstForClass domainMap (Class nam index methods fields) =
+ let fullName = "CH"++ (fixClassName nam) in
+ --binary instances
+ (writeContentHeaderPutInstance domainMap fullName index fields)
+
+
+writeContentHeaderPutInstance domainMap fullName classIndex fields =
+ "putContentHeaderProperties "++putDef++"\n"
+ where
+ manyLetters = map (:[]) ['a'..'z']
+ usedLetters = take (length fields) manyLetters
+
+
+ showBlob x = " >> condPut "++x
+
+ putStmt = concatMap showBlob usedLetters
+
+ putDef =
+ let wrap = if (length fields) /= 0 then ("("++) . (++")") else id
+ pattern = fullName ++ concatMap (' ':) (take (length fields) manyLetters)
+ in
+ wrap pattern ++" = "
+ ++ "putPropBits "++"["++(concat $ L.intersperse "," $ map ("isJust "++) usedLetters)++"] "
+ ++ putStmt
+
+
+---- contentheader class ids -----
+writeContentHeaderClassIDsForClass :: M.Map String String -> Class -> String
+writeContentHeaderClassIDsForClass domainMap (Class nam index methods fields) =
+ let fullName = "CH"++(fixClassName nam)