/
Pool.hs
215 lines (169 loc) · 7.15 KB
/
Pool.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
module Database.Cassandra.Pool
( CPool
, Server
, defServer
, defServers
, KeySpace
, Cassandra (..)
, createCassandraPool
, withResource
-- * Low Level Utilities
, openThrift
) where
------------------------------------------------------------------------------
import Control.Applicative ((<$>))
import Control.Arrow
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception (SomeException, handle, onException)
import Control.Monad (forM_, forever, join, liftM2, unless, when)
import Data.ByteString (ByteString)
import Data.List (find, nub, partition)
import Data.Maybe
import Data.Pool
import Data.Set (Set)
import qualified Data.Set as S
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import qualified Database.Cassandra.Thrift.Cassandra_Client as C
import qualified Database.Cassandra.Thrift.Cassandra_Types as C
import Network
import Prelude hiding (catch)
import System.IO (Handle(..), hClose)
import System.Mem.Weak (addFinalizer)
import Thrift.Protocol.Binary
import Thrift.Transport
import Thrift.Transport.Framed
import Thrift.Transport.Handle
------------------------------------------------------------------------------
------------------------------------------------------------------------------
-- | A round-robin pool of cassandra connections
type CPool = Pool Cassandra
------------------------------------------------------------------------------
-- | A (ServerName, Port) tuple
type Server = (HostName, Int)
-- | A localhost server with default configuration
defServer :: Server
defServer = ("127.0.0.1", 9160)
-- | A single localhost server with default configuration
defServers :: [Server]
defServers = [defServer]
------------------------------------------------------------------------------
type KeySpace = String
------------------------------------------------------------------------------
data Cassandra = Cassandra {
cHandle :: Handle
, cFramed :: FramedTransport Handle
, cProto :: BinaryProtocol (FramedTransport Handle)
}
-- | Create a pool of connections to a cluster of Cassandra boxes
--
-- Each box in the cluster will get up to n connections. The pool will send
-- queries in round-robin fashion to balance load on each box in the cluster.
createCassandraPool
:: [Server]
-- ^ List of servers to connect to
-> Int
-- ^ Number of stripes to maintain
-> Int
-- ^ Max connections per stripe
-> NominalDiffTime
-- ^ Kill each connection after this many seconds
-> KeySpace
-- ^ Each pool operates on a single KeySpace
-> IO CPool
createCassandraPool servers numStripes perStripe maxIdle ks = do
sring <- newTVarIO $ mkRing servers
pool <- createPool (cr 4 sring) dest numStripes maxIdle perStripe
-- forkIO (serverDiscoveryThread sring ks pool)
return pool
where
cr :: Int -> ServerRing -> IO Cassandra
cr n sring = do
s@(host, p) <- atomically $ do
ring@Ring{..} <- readTVar sring
writeTVar sring (next ring)
return current
handle (handler n sring s) $ do
(h,ft,proto) <- openThrift host p
C.set_keyspace (proto, proto) ks
return $ Cassandra h ft proto
handler :: Int -> ServerRing -> Server -> SomeException -> IO Cassandra
handler 0 _ _ e = error $ "Can't connect to cassandra after several tries: " ++ show e
handler n sring server e = do
-- we need a temporary removal system for servers; something
-- with a TTL just removing them from ring is dangerous, what if
-- the network is partitioned for a little while?
-- modifyServers sring (removeServer server)
-- wait 100ms to avoid crazy loops
threadDelay 100000
cr (n-1) sring
dest h = hClose $ cHandle h
-------------------------------------------------------------------------------
-- | Open underlying thrift connection
openThrift host port = do
h <- hOpen (host, PortNumber (fromIntegral port))
ft <- openFramedTransport h
let p = BinaryProtocol ft
return (h, ft, p)
------------------------------------------------------------------------------
modifyServers :: TVar (Ring a) -> (Ring a -> Ring a) -> IO ()
modifyServers sring f = atomically $ do
ring@Ring{..} <- readTVar sring
writeTVar sring $ f ring
return ()
------------------------------------------------------------------------------
serverDiscoveryThread :: TVar (Ring Server)
-> String
-> Pool Cassandra
-> IO b
serverDiscoveryThread sring ks pool = forever $ do
withResource pool (updateServers sring ks)
threadDelay 60000000
------------------------------------------------------------------------------
updateServers :: TVar (Ring Server) -> String -> Cassandra -> IO ()
updateServers sring ks (Cassandra _ _ p) = do
ranges <- C.describe_ring (p,p) ks
let hosts = concat $ catMaybes $ map C.f_TokenRange_endpoints ranges
servers = nub $ map (\e -> first (const e) defServer) hosts
-- putStrLn $ "Cassy: Discovered new servers: " ++ show servers
modifyServers sring (addNewServers servers)
------------------------------------------------------------------------------
type ServerRing = TVar (Ring Server)
------------------------------------------------------------------------------
data Ring a = Ring {
allItems :: Set a
, current :: !a
, used :: [a]
, upcoming :: [a]
}
------------------------------------------------------------------------------
mkRing [] = error "Can't make a ring from empty list"
mkRing all@(a:as) = Ring (S.fromList all) a [] as
------------------------------------------------------------------------------
next :: Ring a -> Ring a
next Ring{..}
| (n:rest) <- upcoming
= Ring allItems n (current : used) rest
next Ring{..}
| (n:rest) <- reverse (current : used)
= Ring allItems n [] rest
------------------------------------------------------------------------------
removeServer :: Ord a => a -> Ring a -> Ring a
removeServer s r@Ring{..}
| s `S.member` allItems = Ring all' cur' [] up'
| otherwise = r
where
all' = S.delete s allItems
cur' : up' = S.toList all'
------------------------------------------------------------------------------
addNewServers :: [Server] -> Ring Server -> Ring Server
addNewServers servers Ring{..} = Ring all' current' used' (new ++ upcoming')
where
all' = S.fromList servers
new = S.toList $ all' S.\\ allItems
used' = filter (`S.member` all') used
(current':upcoming') = filter (`S.member` all') (current:upcoming)