Skip to content

Commit

Permalink
Rename Connection to ConnPool. Edit tutorial and some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Tony Hannan committed Nov 1, 2010
1 parent ad13914 commit 36cc86f
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 124 deletions.
3 changes: 0 additions & 3 deletions Control/Monad/Util.hs
Expand Up @@ -20,9 +20,6 @@ instance (Monad m, Error e) => Applicative (ErrorT e m) where
class (MonadIO m, Applicative m, Functor m) => MonadIO' m
instance (MonadIO m, Applicative m, Functor m) => MonadIO' m

ignore :: (Monad m) => a -> m ()
ignore _ = return ()

loop :: (Functor m, Monad m) => m (Maybe a) -> m [a]
-- ^ Repeatedy execute action, collecting results, until it returns Nothing
loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act)
Expand Down
4 changes: 2 additions & 2 deletions Database/MongoDB.hs
Expand Up @@ -10,8 +10,8 @@ Simple example below. Use with language extension /OvererloadedStrings/.
> import Control.Monad.Trans (liftIO)
>
> main = do
> conn <- connect 1 (host "127.0.0.1")
> e <- access safe Master conn run
> pool <- newConnPool 1 (host "127.0.0.1")
> e <- access safe Master pool run
> print e
>
> run = use (Database "baseball") $ do
Expand Down
87 changes: 47 additions & 40 deletions Database/MongoDB/Connection.hs
@@ -1,16 +1,16 @@
{- | A Mongo connection is a pool of TCP connections to a single server or a replica set of servers. -}
{- | A pool of TCP connections to a single server or a replica set of servers. -}

{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes #-}
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes, FlexibleInstances #-}

module Database.MongoDB.Connection (
-- * Host
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM,
-- * ReplicaSet
ReplicaSet(..),
ReplicaSet(..), Name,
-- * MasterOrSlaveOk
MasterOrSlaveOk(..),
-- * Connection
Server(..), replicaSet
-- * Connection Pool
Server(..), connHost, replicaSet
) where

import Database.MongoDB.Internal.Protocol
Expand All @@ -30,6 +30,7 @@ import Database.MongoDB.Internal.Util () -- PortID instances
import Var.Pool
import System.Random (newStdGen, randomRs)
import Data.List (delete, find, nub)
import System.IO.Unsafe (unsafePerformIO)

type Name = UString

Expand Down Expand Up @@ -103,7 +104,7 @@ getReplicaInfo pipe = do
return info

type ReplicaInfo = Document
-- ^ Configuration info of a host in a replica set. Contains all the hosts in the replica set plus its role in that set (master, slave, or arbiter)
-- ^ Configuration info of a host in a replica set (result of /ismaster/ command). Contains all the hosts in the replica set plus its role in that set (master, slave, or arbiter)

{- isPrimary :: ReplicaInfo -> Bool
-- ^ Is the replica described by this info a master/primary (not slave or arbiter)?
Expand Down Expand Up @@ -139,42 +140,45 @@ data MasterOrSlaveOk =
isMS Master i = isPrimary i
isMS SlaveOk i = isSecondary i || isPrimary i -}

-- * Connection
-- * Connection Pool

type Pool' = Pool IOError

-- | A Server is a single server ('Host') or a replica set of servers ('ReplicaSet')
class Server t where
data Connection t
-- ^ A Mongo connection is a pool of TCP connections to a host or a replica set of hosts
connect :: (MonadIO' m) => Int -> t -> m (Connection t)
-- ^ Create a Mongo Connection to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host.
getPipe :: MasterOrSlaveOk -> Connection t -> ErrorT IOError IO Pipe
data ConnPool t
-- ^ A pool of TCP connections ('Pipe's) to a host or a replica set of hosts
newConnPool :: (MonadIO' m) => Int -> t -> m (ConnPool t)
-- ^ Create a ConnectionPool to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host.
getPipe :: MasterOrSlaveOk -> ConnPool t -> ErrorT IOError IO Pipe
-- ^ Return a TCP connection (Pipe) to the master or a slave in the server. Master must connect to the master, SlaveOk may connect to a slave or master. To spread the load, SlaveOk requests are distributed amongst all hosts in the server. Throw IOError if failed to connect to right type of host (Master/SlaveOk).
killPipes :: Connection t -> IO ()
-- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when this Connection is garbage collected.
killPipes :: ConnPool t -> IO ()
-- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when they are garbage collected.

-- ** Connection Host
-- ** ConnectionPool Host

instance Server Host where
data Connection Host = HostConnection {connHost :: Host, connPool :: Pool' Pipe}
data ConnPool Host = HostConnPool {connHost :: Host, connPool :: Pool' Pipe}
-- ^ A pool of TCP connections ('Pipe's) to a server, handed out in round-robin style.
connect poolSize' host' = liftIO (connectHost poolSize' host')
-- ^ Create a Connection (pool of TCP connections) to server (host or replica set)
newConnPool poolSize' host' = liftIO (newHostConnPool poolSize' host')
-- ^ Create a connection pool to server (host or replica set)
getPipe _ = getHostPipe
-- ^ Return a TCP connection (Pipe). If SlaveOk, connect to a slave if available. Round-robin if multiple slaves are available. Throw IOError if failed to connect.
killPipes (HostConnection _ pool) = killAll pool
killPipes (HostConnPool _ pool) = killAll pool

connectHost :: Int -> Host -> IO (Connection Host)
instance Show (ConnPool Host) where
show HostConnPool{connHost} = "ConnPool " ++ show connHost

newHostConnPool :: Int -> Host -> IO (ConnPool Host)
-- ^ Create a pool of N 'Pipe's (TCP connections) to server. 'getHostPipe' will return one of those pipes, round-robin style.
connectHost poolSize' host' = HostConnection host' <$> newPool Factory{..} poolSize' where
newHostConnPool poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where
newResource = tcpConnect host'
killResource = close
isExpired = isClosed

getHostPipe :: Connection Host -> ErrorT IOError IO Pipe
getHostPipe :: ConnPool Host -> ErrorT IOError IO Pipe
-- ^ Return next pipe (TCP connection) in connection pool, round-robin style. Throw IOError if can't connect to host.
getHostPipe (HostConnection _ pool) = aResource pool
getHostPipe (HostConnPool _ pool) = aResource pool

tcpConnect :: Host -> ErrorT IOError IO Pipe
-- ^ Create a TCP connection (Pipe) to the given host. Throw IOError if can't connect.
Expand All @@ -183,39 +187,42 @@ tcpConnect (Host hostname port) = ErrorT . E.try $ mkPipe =<< connectTo hostname
-- ** Connection ReplicaSet

instance Server ReplicaSet where
data Connection ReplicaSet = ReplicaSetConnection {
data ConnPool ReplicaSet = ReplicaSetConnPool {
repsetName :: Name,
currentMembers :: MVar [Connection Host] } -- master at head after a refresh
connect poolSize' repset = liftIO (connectSet poolSize' repset)
currentMembers :: MVar [ConnPool Host] } -- master at head after a refresh
newConnPool poolSize' repset = liftIO (newSetConnPool poolSize' repset)
getPipe = getSetPipe
killPipes ReplicaSetConnection{..} = withMVar currentMembers (mapM_ killPipes)
killPipes ReplicaSetConnPool{..} = withMVar currentMembers (mapM_ killPipes)

instance Show (ConnPool ReplicaSet) where
show r = "ConnPool " ++ show (unsafePerformIO $ replicaSet r)

replicaSet :: (MonadIO' m) => Connection ReplicaSet -> m ReplicaSet
-- ^ Set name with current members as seed list
replicaSet ReplicaSetConnection{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers
replicaSet :: (MonadIO' m) => ConnPool ReplicaSet -> m ReplicaSet
-- ^ Return replicas set name with current members as seed list
replicaSet ReplicaSetConnPool{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers

connectSet :: Int -> ReplicaSet -> IO (Connection ReplicaSet)
-- ^ Create a connection to each member of the replica set.
connectSet poolSize' repset = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (connect poolSize') (seedHosts repset)
return $ ReplicaSetConnection (setName repset) currentMembers
newSetConnPool :: Int -> ReplicaSet -> IO (ConnPool ReplicaSet)
-- ^ Create a connection pool to each member of the replica set.
newSetConnPool poolSize' repset = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (newConnPool poolSize') (seedHosts repset)
return $ ReplicaSetConnPool (setName repset) currentMembers

getMembers :: Name -> [Connection Host] -> ErrorT IOError IO [Host]
getMembers :: Name -> [ConnPool Host] -> ErrorT IOError IO [Host]
-- ^ Get members of replica set, master first. Query supplied connections until config found.
-- TODO: Verify config for request replica set name and not some other replica set. ismaster config should include replica set name in result but currently does not.
getMembers _repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections

refreshMembers :: Name -> [Connection Host] -> ErrorT IOError IO [Connection Host]
refreshMembers :: Name -> [ConnPool Host] -> ErrorT IOError IO [ConnPool Host]
-- ^ Update current members with master at head. Reuse unchanged members. Throw IOError if can't connect to any and fetch config. Dropped connections are not closed in case they still have users; they will be closed when garbage collected.
refreshMembers repsetName connections = do
n <- liftIO . poolSize . connPool $ head connections
mapM (connection n) =<< getMembers repsetName connections
where
connection n host' = maybe (connect n host') return $ find ((host' ==) . connHost) connections
connection n host' = maybe (newConnPool n host') return $ find ((host' ==) . connHost) connections

getSetPipe :: MasterOrSlaveOk -> Connection ReplicaSet -> ErrorT IOError IO Pipe
getSetPipe :: MasterOrSlaveOk -> ConnPool ReplicaSet -> ErrorT IOError IO Pipe
-- ^ Return a pipe to primary or a random secondary in replica set. Use primary for SlaveOk if and only if no secondaries. Note, refreshes members each time (makes ismaster call to primary).
getSetPipe mos ReplicaSetConnection{..} = modifyMVar currentMembers $ \conns -> do
getSetPipe mos ReplicaSetConnPool{..} = modifyMVar currentMembers $ \conns -> do
connections <- refreshMembers repsetName conns -- master at head after refresh
pipe <- case mos of
Master -> getHostPipe (head connections)
Expand Down
2 changes: 1 addition & 1 deletion Database/MongoDB/Internal/Protocol.hs
Expand Up @@ -42,7 +42,7 @@ import Control.Monad.Error
-- * Pipe

type Pipe = P.Pipeline Handle ByteString
-- ^ Thread-safe TCP connection to server with pipelined requests
-- ^ Thread-safe TCP connection with pipelined requests

mkPipe :: Handle -> IO Pipe
-- ^ New thread-safe pipelined connection over handle
Expand Down
8 changes: 1 addition & 7 deletions Database/MongoDB/Internal/Util.hs
@@ -1,4 +1,4 @@
-- | Miscellaneous general functions
-- | Miscellaneous general functions and Show, Eq, and Ord instances for PortID

{-# LANGUAGE StandaloneDeriving #-}

Expand All @@ -14,12 +14,6 @@ deriving instance Show PortID
deriving instance Eq PortID
deriving instance Ord PortID

snoc :: [a] -> a -> [a]
-- ^ add element to end of list (/snoc/ is reverse of /cons/, which adds to front of list)
snoc list a = list ++ [a]

type Secs = Float

bitOr :: (Bits a) => [a] -> a
-- ^ bit-or all numbers together
bitOr = foldl (.|.) 0
Expand Down
51 changes: 27 additions & 24 deletions Database/MongoDB/Query.hs
@@ -1,4 +1,4 @@
-- | Query and update documents residing on a MongoDB server(s)
-- | Query and update documents

{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, RankNTypes, ImpredicativeTypes #-}

Expand All @@ -24,7 +24,7 @@ module Database.MongoDB.Query (
-- ** Delete
delete, deleteOne,
-- * Read
slaveOk,
readMode,
-- ** Query
Query(..), QueryOption(..), Projector, Limit, Order, BatchSize,
explain, find, findOne, count, distinct,
Expand Down Expand Up @@ -62,23 +62,12 @@ import Database.MongoDB.Internal.Util ((<.>), true1)
mapErrorIO :: (Throw e m, MonadIO m) => (e' -> e) -> ErrorT e' IO a -> m a
mapErrorIO f = throwLeft' f . liftIO . runErrorT

send :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> m ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw 'ConnectionFailure' if pipe fails.
send ns = mapErrorIO ConnectionFailure . flip P.send ns =<< context

call :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw Failure n, MonadIO n) => n Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive.
call ns r = do
pipe <- context
promise <- mapErrorIO ConnectionFailure (P.call pipe ns r)
return (mapErrorIO ConnectionFailure promise)

-- * Mongo Monad

access :: (Server s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> Connection s -> Action m a -> m (Either Failure a)
-- ^ Run action with access to server or replica set via one of the 'Pipe's (TCP connections) in given 'Connection' pool
access w mos conn act = do
ePipe <- liftIO . runErrorT $ getPipe mos conn
access :: (Server s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> ConnPool s -> Action m a -> m (Either Failure a)
-- ^ Run action under given write and read mode against the server or replicaSet behind given connection pool. Return Left Failure if there is a connection failure or read/write error.
access w mos pool act = do
ePipe <- liftIO . runErrorT $ getPipe mos pool
either (return . Left . ConnectionFailure) (runAction act w mos) ePipe

-- | A monad with access to a 'Pipe', 'MasterOrSlaveOk', and 'WriteMode', and throws 'Failure' on read, write, or pipe failure
Expand All @@ -93,10 +82,11 @@ instance MonadTrans Action where
lift = Action . lift . lift . lift . lift

runAction :: Action m a -> WriteMode -> MasterOrSlaveOk -> Pipe -> m (Either Failure a)
-- ^ Run action with access to pipe. It starts out assuming it is master (invoke 'slaveOk' inside it to change that) and that writes don't need to be check (invoke 'writeMode' to change that). Return Left Failure if error in execution. Throws IOError if pipe fails during execution.
-- ^ Run action with given write mode and read mode (master or slave-ok) against given pipe (TCP connection). Return Left Failure if read/write error or connection failure.
-- 'access' calls runAction. Use this directly if you want to use the same connection and not take from the pool again. However, the connection may still be used by other threads at the same time. For instance, the pool will still hand this connection out.
runAction (Action action) w mos = runReaderT (runReaderT (runReaderT (runErrorT action) w) mos)

-- | Read or write exception like cursor expired or inserting a duplicate key.
-- | A connection failure, or a read or write exception like cursor expired or inserting a duplicate key.
-- Note, unexpected data from the server is not a Failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
data Failure =
ConnectionFailure IOError -- ^ TCP connection ('Pipe') failed. Make work if you try again on the same Mongo 'Connection' which will create a new Pipe.
Expand All @@ -115,7 +105,7 @@ newtype Database = Database {databaseName :: UString} deriving (Eq, Ord)

instance Show Database where show (Database x) = unpack x

-- | As 'Access' monad with access to a particular 'Database'
-- | 'Access' monad with a particular 'Database' in context
class (Context Database m, Access m) => DbAccess m
instance (Context Database m, Access m) => DbAccess m

Expand All @@ -124,7 +114,7 @@ allDatabases :: (Access m) => m [Database]
allDatabases = map (Database . at "name") . at "databases" <$> use (Database "admin") (runCommand1 "listDatabases")

use :: Database -> ReaderT Database m a -> m a
-- ^ Run Db action against given database
-- ^ Run action against given database
use = flip runReaderT

thisDatabase :: (DbAccess m) => m Database
Expand Down Expand Up @@ -297,9 +287,9 @@ delete' opts (Select sel col) = do

-- ** MasterOrSlaveOk

slaveOk :: (Access m) => m a -> m a
-- ^ Ok to execute given action against slave, ie. eventually consistent reads
slaveOk = push (const SlaveOk)
readMode :: (Access m) => MasterOrSlaveOk -> m a -> m a
-- ^ Execute action using given read mode. Master = consistent reads, SlaveOk = eventually consistent reads.
readMode = push . const

msOption :: MasterOrSlaveOk -> [P.QueryOption]
msOption Master = []
Expand Down Expand Up @@ -619,6 +609,19 @@ eval :: (DbAccess m) => Javascript -> m Document
-- ^ Run code on server
eval code = at "retval" <$> runCommand ["$eval" =: code]

-- * Primitives

send :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> m ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw 'ConnectionFailure' if pipe fails.
send ns = mapErrorIO ConnectionFailure . flip P.send ns =<< context

call :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw Failure n, MonadIO n) => n Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive.
call ns r = do
pipe <- context
promise <- mapErrorIO ConnectionFailure (P.call pipe ns r)
return (mapErrorIO ConnectionFailure promise)


{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10gen Inc.
Expand Down
5 changes: 3 additions & 2 deletions Var/Pool.hs
Expand Up @@ -10,6 +10,7 @@ import Data.Array.IO
import Data.Maybe (catMaybes)
import Control.Monad.Error
import System.Random (randomRIO)
import Control.Exception (assert)

-- | Creator, destroyer, and checker of resources of type r. Creator may throw error or type e.
data Factory e r = Factory {
Expand All @@ -18,8 +19,8 @@ data Factory e r = Factory {
isExpired :: r -> IO Bool }

newPool :: Factory e r -> Int -> IO (Pool e r)
-- ^ Create new pool of initial max size
newPool f n = do
-- ^ Create new pool of initial max size, which must be >= 1
newPool f n = assert (n > 0) $ do
arr <- newArray (0, n-1) Nothing
var <- newMVar arr
return (Pool f var)
Expand Down
2 changes: 1 addition & 1 deletion mongoDB.cabal
Expand Up @@ -23,7 +23,7 @@ homepage: http://github.com/TonyGen/mongoDB-haskell
package-url:
bug-reports:
synopsis: A driver for MongoDB
description: This module lets you connect to MongoDB, do inserts, queries, updates, etc.
description: This module lets you connect to MongoDB (www.mongodb.org) and do inserts, queries, updates, etc.
category: Database
author: Scott Parish <srp@srparish.net> & Tony Hannan <tony@10gen.com>
tested-with:
Expand Down

0 comments on commit 36cc86f

Please sign in to comment.