Permalink
Browse files

Add support for TLS connections

See **examples/ExampleTLSClient.hs** and **examples/ExampleTLSServer.hs**.

 - Add **coTLSSettings** labeled field to **ConnectionOpts**

 - Switch from **GHC.IO.Handle** to **Network.Connection**

 - Allow TLS trust configuration with **TLSTrusted**/**TLSUntrusted**

To preserve the behavior of existing client code, unsecured TCP
connections are used by default.

TLSTrusted is the recommended setting where the AMQP servers support
it.  Using it will require that the AMQP servers identify themselves
with trusted certificates.  Private CAs and self-signed certificates can
be supported using mechanisms dependent on the trust store of the
underlying Operating System. This is out of the scope of this package to
explain, but examples are readily available for most Operating Systems.

TLSUntrusted is an insecure option and should never be used in
production.  Bypassing TLS certificate validation opens client code and
its users to man-in-the-middle attacks.  As described above it is
relatively straight-forward to trust a self-signed cert or a private CA,
so there is very rarely a valid use case for TLSUntrusted.
  • Loading branch information...
alain-odea-vgh committed Feb 18, 2014
1 parent 240be55 commit a578c1396abf09c92d2c350f79ef5bee8d6b45c0
Showing with 132 additions and 21 deletions.
  1. +2 −1 Network/AMQP.hs
  2. +47 −19 Network/AMQP/Internal.hs
  3. +2 −1 amqp.cabal
  4. +46 −0 examples/ExampleTLSConsumer.hs
  5. +35 −0 examples/ExampleTLSProducer.hs
View
@@ -49,6 +49,7 @@ module Network.AMQP (
-- * Connection
Connection,
ConnectionOpts(..),
+ TLSSettings(..),
defaultConnectionOpts,
openConnection,
openConnection',
@@ -519,7 +520,7 @@ flow chan active = do
-- * no limit on the number of used channels
--
defaultConnectionOpts :: ConnectionOpts
-defaultConnectionOpts = ConnectionOpts [("localhost", 5672)] "/" [plain "guest" "guest"] (Just 131072) Nothing Nothing
+defaultConnectionOpts = ConnectionOpts [("localhost", 5672)] "/" [plain "guest" "guest"] (Just 131072) Nothing Nothing Nothing
-- | @openConnection hostname virtualHost loginName loginPassword@ opens a connection to an AMQP server running on @hostname@.
-- @virtualHost@ is used as a namespace for AMQP resources (default is \"/\"), so different applications could use multiple virtual hosts on the same AMQP server.
View
@@ -11,7 +11,6 @@ import Data.Maybe
import Data.Text (Text)
import Data.Typeable
import Network
-import System.IO
import qualified Control.Exception as CE
import qualified Data.ByteString as BS
@@ -23,6 +22,7 @@ import qualified Data.IntMap as IM
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import qualified Data.Text.Encoding as E
+import qualified Network.Connection as Conn
import Network.AMQP.Protocol
import Network.AMQP.Types
@@ -126,7 +126,7 @@ Outgoing Data: Application -> Socket
-}
data Connection = Connection {
- connHandle :: Handle,
+ connHandle :: Conn.Connection,
connChannels :: (MVar (IM.IntMap (Channel, ThreadId))), --open channels (channelID => (Channel, ChannelThread))
connMaxFrameSize :: Int, --negotiated maximum frame size
connClosed :: MVar (Maybe String),
@@ -148,8 +148,19 @@ data ConnectionOpts = ConnectionOpts {
coAuth :: ![SASLMechanism], -- ^ The 'SASLMechanism's to use for authenticating with the broker.
coMaxFrameSize :: !(Maybe Word32), -- ^ The maximum frame size to be used. If not specified, no limit is assumed.
coHeartbeatDelay :: !(Maybe Word16), -- ^ The delay in seconds, after which the client expects a heartbeat frame from the broker. If 'Nothing', the value suggested by the broker is used. Use @Just 0@ to disable the heartbeat mechnism.
- coMaxChannel :: !(Maybe Word16) -- ^ The maximum number of channels the client will use.
+ coMaxChannel :: !(Maybe Word16), -- ^ The maximum number of channels the client will use.
+ coTLSSettings :: Maybe TLSSettings -- ^ Whether or not to connect to servers using TLS. See http://www.rabbitmq.com/ssl.html for details.
}
+-- | Represents the kind of TLS connection to establish.
+data TLSSettings =
+ TLSTrusted -- ^ Require trusted certificates (Recommended).
+ | TLSUntrusted -- ^ Allow untrusted certificates (Discouraged. Vulnerable to man-in-the-middle attacks)
+
+connectionTLSSettings :: TLSSettings -> Maybe Conn.TLSSettings
+connectionTLSSettings tlsSettings =
+ Just $ case tlsSettings of
+ TLSTrusted -> Conn.TLSSettingsSimple False False False
+ TLSUntrusted -> Conn.TLSSettingsSimple True False False
-- | A 'SASLMechanism' is described by its name ('saslName'), its initial response ('saslInitialResponse'), and an optional function ('saslChallengeFunc') that
-- transforms a security challenge provided by the server into response, which is then sent back to the server for verification.
@@ -188,13 +199,13 @@ openConnection'' :: ConnectionOpts -> IO Connection
openConnection'' connOpts = withSocketsDo $ do
handle <- connect $ coServers connOpts
(maxFrameSize, heartbeatTimeout) <- CE.handle (\(_ :: CE.IOException) -> CE.throwIO $ ConnectionClosedException "Handshake failed. Please check the RabbitMQ logs for more information") $ do
- BL.hPut handle $ BPut.runPut $ do
- BPut.putByteString $ BC.pack "AMQP"
- BPut.putWord8 1
- BPut.putWord8 1 --TCP/IP
- BPut.putWord8 0 --Major Version
- BPut.putWord8 9 --Minor Version
- hFlush handle
+ Conn.connectionPut handle $ BS.append (BC.pack "AMQP")
+ (BS.pack [
+ 1
+ , 1 --TCP/IP
+ , 0 --Major Version
+ , 9 --Minor Version
+ ])
-- S: connection.start
Frame 0 (MethodPayload (Connection_start _ _ _ (LongString serverMechanisms) _)) <- readFrame handle
@@ -236,7 +247,7 @@ openConnection'' connOpts = withSocketsDo $ do
--spawn the connectionReceiver
connThread <- forkIO $ CE.finally (connectionReceiver conn) $ do
-- try closing socket
- CE.catch (hClose handle) (\(_ :: CE.SomeException) -> return ())
+ CE.catch (Conn.connectionClose handle) (\(_ :: CE.SomeException) -> return ())
-- mark as closed
modifyMVar_ cClosed $ return . Just . maybe "unknown reason" id
@@ -261,14 +272,21 @@ openConnection'' connOpts = withSocketsDo $ do
return conn
where
connect ((host, port) : rest) = do
- result <- CE.try (connectTo host $ PortNumber port)
+ ctx <- Conn.initConnectionContext
+ result <- CE.try (Conn.connectTo ctx $ Conn.ConnectionParams
+ { Conn.connectionHostname = host
+ , Conn.connectionPort = port
+ , Conn.connectionUseSecure = tlsSettings
+ , Conn.connectionUseSocks = Nothing
+ })
either
(\(ex :: CE.SomeException) -> do
putStrLn $ "Error connecting to "++show (host, port)++": "++show ex
connect rest)
(return)
result
connect [] = CE.throwIO $ ConnectionClosedException $ "Could not connect to any of the provided brokers: " ++ show (coServers connOpts)
+ tlsSettings = maybe Nothing connectionTLSSettings (coTLSSettings connOpts)
selectSASLMechanism handle serverMechanisms =
let serverSaslList = T.split (== ' ') $ E.decodeUtf8 serverMechanisms
clientMechanisms = coAuth connOpts
@@ -301,7 +319,7 @@ openConnection'' connOpts = withSocketsDo $ do
True))) -- insist; deprecated in 0-9-1
abortHandshake handle msg = do
- hClose handle
+ Conn.connectionClose handle
CE.throwIO $ ConnectionClosedException msg
abortIfNothing m handle msg = case m of
@@ -375,13 +393,15 @@ addConnectionClosedHandler conn ifClosed handler = do
-- otherwise add it to the list
_ -> modifyMVar_ (connClosedHandlers conn) $ \old -> return $ handler:old
-readFrame :: Handle -> IO Frame
+readFrame :: Conn.Connection -> IO Frame
readFrame handle = do
- dat <- BL.hGet handle 7
+ strictDat <- connectionGetExact handle 7
+ let dat = toLazy strictDat
-- NB: userError returns an IOException so it will be catched in 'connectionReceiver'
when (BL.null dat) $ CE.throwIO $ userError "connection not open"
let len = fromIntegral $ peekFrameSize dat
- dat' <- BL.hGet handle (len+1) -- +1 for the terminating 0xCE
+ strictDat' <- connectionGetExact handle (len+1) -- +1 for the terminating 0xCE
+ let dat' = toLazy strictDat'
when (BL.null dat') $ CE.throwIO $ userError "connection not open"
let ret = runGetOrFail get (BL.append dat dat')
case ret of
@@ -390,10 +410,18 @@ readFrame handle = do
error $ "readFrame: parser should read " ++ show (len+8) ++ " bytes; but read " ++ show consumedBytes
Right (_, _, frame) -> return frame
-writeFrame :: Handle -> Frame -> IO ()
+-- belongs in connection package and will be removed once it lands there
+connectionGetExact :: Conn.Connection -> Int -> IO BS.ByteString
+connectionGetExact conn x = loop BS.empty 0
+ where loop bs y
+ | y == x = return bs
+ | otherwise = do
+ next <- Conn.connectionGet conn (x - y)
+ loop (BS.append bs next) (y + (BS.length next))
+
+writeFrame :: Conn.Connection -> Frame -> IO ()
writeFrame handle f = do
- BL.hPut handle . runPut . put $ f
- hFlush handle
+ Conn.connectionPut handle . toStrict . runPut . put $ f
------------------------ CHANNEL -----------------------------
View
@@ -18,7 +18,7 @@ Extra-source-files: examples/ExampleConsumer.hs,
examples/ExampleProducer.hs
Library
- Build-Depends: base >= 4 && < 5, binary >= 0.7, containers>=0.2, bytestring>=0.9, network>=2.2.3.1, data-binary-ieee754>=0.4.2.1, text>=0.11.2, split>=0.2, clock >= 0.4.0.1, monad-control >= 0.3
+ Build-Depends: base >= 4 && < 5, binary >= 0.7, containers>=0.2, bytestring>=0.9, network>=2.2.3.1, data-binary-ieee754>=0.4.2.1, text>=0.11.2, split>=0.2, clock >= 0.4.0.1, monad-control >= 0.3, connection == 0.2.*
Exposed-modules: Network.AMQP, Network.AMQP.Types, Network.AMQP.Lifted
Other-modules: Network.AMQP.Generated, Network.AMQP.Helpers, Network.AMQP.Protocol, Network.AMQP.Internal
GHC-Options: -Wall
@@ -46,3 +46,4 @@ test-suite spec
base >= 4 && < 5, binary >= 0.7, containers>=0.2, bytestring>=0.9, network>=2.2.3.1, data-binary-ieee754>=0.4.2.1, text>=0.11.2, split>=0.2, clock >= 0.4.0.1
, hspec >= 1.3
, hspec-expectations >= 0.3.3
+ , connection == 0.2.*
@@ -0,0 +1,46 @@
+{-# OPTIONS -XOverloadedStrings #-}
+import Network.AMQP
+
+import qualified Data.ByteString.Lazy.Char8 as BL
+
+
+main = do
+ let opts = defaultConnectionOpts {
+ coServers = [("127.0.0.1", 5671)]
+ , coTLSSettings = Just TLSTrusted
+ }
+ conn <- openConnection'' opts
+ chan <- openChannel conn
+
+
+ --declare queues, exchanges and bindings
+ declareQueue chan newQueue {queueName = "myQueueDE"}
+ declareQueue chan newQueue {queueName = "myQueueEN"}
+
+ declareExchange chan newExchange {exchangeName = "topicExchg", exchangeType = "topic"}
+ bindQueue chan "myQueueDE" "topicExchg" "de.*"
+ bindQueue chan "myQueueEN" "topicExchg" "en.*"
+
+
+ --subscribe to the queues
+ consumeMsgs chan "myQueueDE" Ack myCallbackDE
+ consumeMsgs chan "myQueueEN" Ack myCallbackEN
+
+
+ getLine -- wait for keypress
+ closeConnection conn
+ putStrLn "connection closed"
+
+
+
+
+myCallbackDE :: (Message,Envelope) -> IO ()
+myCallbackDE (msg, env) = do
+ putStrLn $ "received from DE: "++(BL.unpack $ msgBody msg)
+ ackEnv env
+
+
+myCallbackEN :: (Message,Envelope) -> IO ()
+myCallbackEN (msg, env) = do
+ putStrLn $ "received from EN: "++(BL.unpack $ msgBody msg)
+ ackEnv env
@@ -0,0 +1,35 @@
+{-# OPTIONS -XOverloadedStrings #-}
+import Network.AMQP
+
+import qualified Data.ByteString.Lazy.Char8 as BL
+
+
+main = do
+ let opts = defaultConnectionOpts {
+ coServers = [("127.0.0.1", 5671)]
+ , coTLSSettings = Just TLSTrusted
+ }
+ conn <- openConnection'' opts
+ chan <- openChannel conn
+
+
+ --declare queues, exchanges and bindings
+ declareQueue chan newQueue {queueName = "myQueueDE"}
+ declareQueue chan newQueue {queueName = "myQueueEN"}
+
+ declareExchange chan newExchange {exchangeName = "topicExchg", exchangeType = "topic"}
+ bindQueue chan "myQueueDE" "topicExchg" "de.*"
+ bindQueue chan "myQueueEN" "topicExchg" "en.*"
+
+ --publish messages
+ publishMsg chan "topicExchg" "de.hello"
+ (newMsg {msgBody = (BL.pack "hallo welt"),
+ msgDeliveryMode = Just NonPersistent}
+ )
+ publishMsg chan "topicExchg" "en.hello"
+ (newMsg {msgBody = (BL.pack "hello world"),
+ msgDeliveryMode = Just NonPersistent}
+ )
+
+
+ closeConnection conn

0 comments on commit a578c13

Please sign in to comment.