Merge pull request #40 from AlainODea-haskell/TLS_using_connection
Add support for TLS connections (FIX #39)
hreinhardt committed Feb 19, 2014
2 parents 240be55 + a578c13 commit d80ea3a
Showing 5 changed files with 132 additions and 21 deletions.
3 changes: 2 additions & 1 deletion Network/AMQP.hs
Expand Up @@ -49,6 +49,7 @@ module Network.AMQP (
-- * Connection
Expand Down Expand Up @@ -519,7 +520,7 @@ flow chan active = do
-- * no limit on the number of used channels
defaultConnectionOpts :: ConnectionOpts
defaultConnectionOpts = ConnectionOpts [("localhost", 5672)] "/" [plain "guest" "guest"] (Just 131072) Nothing Nothing
defaultConnectionOpts = ConnectionOpts [("localhost", 5672)] "/" [plain "guest" "guest"] (Just 131072) Nothing Nothing Nothing

-- | @openConnection hostname virtualHost loginName loginPassword@ opens a connection to an AMQP server running on @hostname@.
-- @virtualHost@ is used as a namespace for AMQP resources (default is \"/\"), so different applications could use multiple virtual hosts on the same AMQP server.
66 changes: 47 additions & 19 deletions Network/AMQP/Internal.hs
Expand Up @@ -11,7 +11,6 @@ import Data.Maybe
import Data.Text (Text)
import Data.Typeable
import Network
import System.IO

import qualified Control.Exception as CE
import qualified Data.ByteString as BS
Expand All @@ -23,6 +22,7 @@ import qualified Data.IntMap as IM
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import qualified Data.Text.Encoding as E
import qualified Network.Connection as Conn

import Network.AMQP.Protocol
import Network.AMQP.Types
Expand Down Expand Up @@ -126,7 +126,7 @@ Outgoing Data: Application -> Socket

data Connection = Connection {
connHandle :: Handle,
connHandle :: Conn.Connection,
connChannels :: (MVar (IM.IntMap (Channel, ThreadId))), --open channels (channelID => (Channel, ChannelThread))
connMaxFrameSize :: Int, --negotiated maximum frame size
connClosed :: MVar (Maybe String),
Expand All @@ -148,8 +148,19 @@ data ConnectionOpts = ConnectionOpts {
coAuth :: ![SASLMechanism], -- ^ The 'SASLMechanism's to use for authenticating with the broker.
coMaxFrameSize :: !(Maybe Word32), -- ^ The maximum frame size to be used. If not specified, no limit is assumed.
coHeartbeatDelay :: !(Maybe Word16), -- ^ The delay in seconds, after which the client expects a heartbeat frame from the broker. If 'Nothing', the value suggested by the broker is used. Use @Just 0@ to disable the heartbeat mechnism.
coMaxChannel :: !(Maybe Word16) -- ^ The maximum number of channels the client will use.
coMaxChannel :: !(Maybe Word16), -- ^ The maximum number of channels the client will use.
coTLSSettings :: Maybe TLSSettings -- ^ Whether or not to connect to servers using TLS. See for details.
-- | Represents the kind of TLS connection to establish.
data TLSSettings =
TLSTrusted -- ^ Require trusted certificates (Recommended).
| TLSUntrusted -- ^ Allow untrusted certificates (Discouraged. Vulnerable to man-in-the-middle attacks)

connectionTLSSettings :: TLSSettings -> Maybe Conn.TLSSettings
connectionTLSSettings tlsSettings =
Just $ case tlsSettings of
TLSTrusted -> Conn.TLSSettingsSimple False False False
TLSUntrusted -> Conn.TLSSettingsSimple True False False

-- | A 'SASLMechanism' is described by its name ('saslName'), its initial response ('saslInitialResponse'), and an optional function ('saslChallengeFunc') that
-- transforms a security challenge provided by the server into response, which is then sent back to the server for verification.
Expand Down Expand Up @@ -188,13 +199,13 @@ openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' connOpts = withSocketsDo $ do
handle <- connect $ coServers connOpts
(maxFrameSize, heartbeatTimeout) <- CE.handle (\(_ :: CE.IOException) -> CE.throwIO $ ConnectionClosedException "Handshake failed. Please check the RabbitMQ logs for more information") $ do
BL.hPut handle $ BPut.runPut $ do
BPut.putByteString $ BC.pack "AMQP"
BPut.putWord8 1
BPut.putWord8 1 --TCP/IP
BPut.putWord8 0 --Major Version
BPut.putWord8 9 --Minor Version
hFlush handle
Conn.connectionPut handle $ BS.append (BC.pack "AMQP")
(BS.pack [
, 1 --TCP/IP
, 0 --Major Version
, 9 --Minor Version

-- S: connection.start
Frame 0 (MethodPayload (Connection_start _ _ _ (LongString serverMechanisms) _)) <- readFrame handle
Expand Down Expand Up @@ -236,7 +247,7 @@ openConnection'' connOpts = withSocketsDo $ do
--spawn the connectionReceiver
connThread <- forkIO $ CE.finally (connectionReceiver conn) $ do
-- try closing socket
CE.catch (hClose handle) (\(_ :: CE.SomeException) -> return ())
CE.catch (Conn.connectionClose handle) (\(_ :: CE.SomeException) -> return ())

-- mark as closed
modifyMVar_ cClosed $ return . Just . maybe "unknown reason" id
Expand All @@ -261,14 +272,21 @@ openConnection'' connOpts = withSocketsDo $ do
return conn
connect ((host, port) : rest) = do
result <- CE.try (connectTo host $ PortNumber port)
ctx <- Conn.initConnectionContext
result <- CE.try (Conn.connectTo ctx $ Conn.ConnectionParams
{ Conn.connectionHostname = host
, Conn.connectionPort = port
, Conn.connectionUseSecure = tlsSettings
, Conn.connectionUseSocks = Nothing
(\(ex :: CE.SomeException) -> do
putStrLn $ "Error connecting to "++show (host, port)++": "++show ex
connect rest)
connect [] = CE.throwIO $ ConnectionClosedException $ "Could not connect to any of the provided brokers: " ++ show (coServers connOpts)
tlsSettings = maybe Nothing connectionTLSSettings (coTLSSettings connOpts)
selectSASLMechanism handle serverMechanisms =
let serverSaslList = T.split (== ' ') $ E.decodeUtf8 serverMechanisms
clientMechanisms = coAuth connOpts
Expand Down Expand Up @@ -301,7 +319,7 @@ openConnection'' connOpts = withSocketsDo $ do
True))) -- insist; deprecated in 0-9-1

abortHandshake handle msg = do
hClose handle
Conn.connectionClose handle
CE.throwIO $ ConnectionClosedException msg

abortIfNothing m handle msg = case m of
Expand Down Expand Up @@ -375,13 +393,15 @@ addConnectionClosedHandler conn ifClosed handler = do
-- otherwise add it to the list
_ -> modifyMVar_ (connClosedHandlers conn) $ \old -> return $ handler:old

readFrame :: Handle -> IO Frame
readFrame :: Conn.Connection -> IO Frame
readFrame handle = do
dat <- BL.hGet handle 7
strictDat <- connectionGetExact handle 7
let dat = toLazy strictDat
-- NB: userError returns an IOException so it will be catched in 'connectionReceiver'
when (BL.null dat) $ CE.throwIO $ userError "connection not open"
let len = fromIntegral $ peekFrameSize dat
dat' <- BL.hGet handle (len+1) -- +1 for the terminating 0xCE
strictDat' <- connectionGetExact handle (len+1) -- +1 for the terminating 0xCE
let dat' = toLazy strictDat'
when (BL.null dat') $ CE.throwIO $ userError "connection not open"
let ret = runGetOrFail get (BL.append dat dat')
case ret of
Expand All @@ -390,10 +410,18 @@ readFrame handle = do
error $ "readFrame: parser should read " ++ show (len+8) ++ " bytes; but read " ++ show consumedBytes
Right (_, _, frame) -> return frame

writeFrame :: Handle -> Frame -> IO ()
-- belongs in connection package and will be removed once it lands there
connectionGetExact :: Conn.Connection -> Int -> IO BS.ByteString
connectionGetExact conn x = loop BS.empty 0
where loop bs y
| y == x = return bs
| otherwise = do
next <- Conn.connectionGet conn (x - y)
loop (BS.append bs next) (y + (BS.length next))

writeFrame :: Conn.Connection -> Frame -> IO ()
writeFrame handle f = do
BL.hPut handle . runPut . put $ f
hFlush handle
Conn.connectionPut handle . toStrict . runPut . put $ f

------------------------ CHANNEL -----------------------------

3 changes: 2 additions & 1 deletion amqp.cabal
Expand Up @@ -18,7 +18,7 @@ Extra-source-files: examples/ExampleConsumer.hs,

Build-Depends: base >= 4 && < 5, binary >= 0.7, containers>=0.2, bytestring>=0.9, network>=, data-binary-ieee754>=, text>=0.11.2, split>=0.2, clock >=, monad-control >= 0.3
Build-Depends: base >= 4 && < 5, binary >= 0.7, containers>=0.2, bytestring>=0.9, network>=, data-binary-ieee754>=, text>=0.11.2, split>=0.2, clock >=, monad-control >= 0.3, connection == 0.2.*
Exposed-modules: Network.AMQP, Network.AMQP.Types, Network.AMQP.Lifted
Other-modules: Network.AMQP.Generated, Network.AMQP.Helpers, Network.AMQP.Protocol, Network.AMQP.Internal
GHC-Options: -Wall
Expand Down Expand Up @@ -46,3 +46,4 @@ test-suite spec
base >= 4 && < 5, binary >= 0.7, containers>=0.2, bytestring>=0.9, network>=, data-binary-ieee754>=, text>=0.11.2, split>=0.2, clock >=
, hspec >= 1.3
, hspec-expectations >= 0.3.3
, connection == 0.2.*
46 changes: 46 additions & 0 deletions examples/ExampleTLSConsumer.hs
@@ -0,0 +1,46 @@
{-# OPTIONS -XOverloadedStrings #-}
import Network.AMQP

import qualified Data.ByteString.Lazy.Char8 as BL

main = do
let opts = defaultConnectionOpts {
coServers = [("", 5671)]
, coTLSSettings = Just TLSTrusted
conn <- openConnection'' opts
chan <- openChannel conn

--declare queues, exchanges and bindings
declareQueue chan newQueue {queueName = "myQueueDE"}
declareQueue chan newQueue {queueName = "myQueueEN"}

declareExchange chan newExchange {exchangeName = "topicExchg", exchangeType = "topic"}
bindQueue chan "myQueueDE" "topicExchg" "de.*"
bindQueue chan "myQueueEN" "topicExchg" "en.*"

--subscribe to the queues
consumeMsgs chan "myQueueDE" Ack myCallbackDE
consumeMsgs chan "myQueueEN" Ack myCallbackEN

getLine -- wait for keypress
closeConnection conn
putStrLn "connection closed"

myCallbackDE :: (Message,Envelope) -> IO ()
myCallbackDE (msg, env) = do
putStrLn $ "received from DE: "++(BL.unpack $ msgBody msg)
ackEnv env

myCallbackEN :: (Message,Envelope) -> IO ()
myCallbackEN (msg, env) = do
putStrLn $ "received from EN: "++(BL.unpack $ msgBody msg)
ackEnv env
35 changes: 35 additions & 0 deletions examples/ExampleTLSProducer.hs
@@ -0,0 +1,35 @@
{-# OPTIONS -XOverloadedStrings #-}
import Network.AMQP

import qualified Data.ByteString.Lazy.Char8 as BL

main = do
let opts = defaultConnectionOpts {
coServers = [("", 5671)]
, coTLSSettings = Just TLSTrusted
conn <- openConnection'' opts
chan <- openChannel conn

--declare queues, exchanges and bindings
declareQueue chan newQueue {queueName = "myQueueDE"}
declareQueue chan newQueue {queueName = "myQueueEN"}

declareExchange chan newExchange {exchangeName = "topicExchg", exchangeType = "topic"}
bindQueue chan "myQueueDE" "topicExchg" "de.*"
bindQueue chan "myQueueEN" "topicExchg" "en.*"

--publish messages
publishMsg chan "topicExchg" "de.hello"
(newMsg {msgBody = (BL.pack "hallo welt"),
msgDeliveryMode = Just NonPersistent}
publishMsg chan "topicExchg" "en.hello"
(newMsg {msgBody = (BL.pack "hello world"),
msgDeliveryMode = Just NonPersistent}

closeConnection conn

