Permalink
Browse files

Initial import, fleshing out the API

  • Loading branch information...
0 parents commit 52983af2ef43526be318aa6e748d6f37a09c1756 @ozataman ozataman committed Aug 22, 2011
Showing with 434 additions and 0 deletions.
  1. +1 −0 .ghci
  2. +4 −0 .gitignore
  3. +30 −0 LICENSE
  4. +2 −0 Setup.hs
  5. +58 −0 cassy.cabal
  6. +87 −0 src/Database/Cassandra/ColumnFamily.hs
  7. +175 −0 src/Database/Cassandra/Pool.hs
  8. +77 −0 src/Database/Cassandra/Types.hs
1 .ghci
@@ -0,0 +1 @@
+:set -isrc
@@ -0,0 +1,4 @@
+*.csv
+*.o
+*.hi
+dist
30 LICENSE
@@ -0,0 +1,30 @@
+Copyright (c)2011, Ozgun Ataman
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+
+ * Neither the name of Ozgun Ataman nor the names of other
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
@@ -0,0 +1,2 @@
+import Distribution.Simple
+main = defaultMain
@@ -0,0 +1,58 @@
+-- cassy.cabal auto-generated by cabal init. For additional options,
+-- see
+-- http://www.haskell.org/cabal/release/cabal-latest/doc/users-guide/authors.html#pkg-descr.
+-- The name of the package.
+Name: cassy
+
+-- The package version. See the Haskell package versioning policy
+-- (http://www.haskell.org/haskellwiki/Package_versioning_policy) for
+-- standards guiding when and how versions should be incremented.
+Version: 0.1
+
+-- A short (one-line) description of the package.
+Synopsis: A high level driver for Cassandra datastore
+
+-- A longer description of the package.
+-- Description:
+
+-- The license under which the package is released.
+License: BSD3
+
+-- The file containing the license text.
+License-file: LICENSE
+
+-- The package author(s).
+Author: Ozgun Ataman
+
+-- An email address to which users can send suggestions, bug reports,
+-- and patches.
+Maintainer: ozataman@gmail.com
+
+-- A copyright notice.
+-- Copyright:
+
+Category: Data
+
+Build-type: Simple
+
+-- Extra files to be distributed with the package, such as examples or
+-- a README.
+-- Extra-source-files:
+
+-- Constraint on the version of Cabal needed to build this package.
+Cabal-version: >=1.2
+
+
+Library
+ -- Modules exported by the library.
+ -- Exposed-modules:
+
+ -- Packages needed in order to build this package.
+ -- Build-depends:
+
+ -- Modules not exported by this package.
+ -- Other-modules:
+
+ -- Extra tools (e.g. alex, hsc2hs, ...) needed to build the source.
+ -- Build-tools:
+
@@ -0,0 +1,87 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE PatternGuards, NamedFieldPuns, RecordWildCards #-}
+
+
+module Database.Cassandra.ColumnFamily where
+
+import Control.Exception
+import Data.ByteString (ByteString)
+import qualified Database.Cassandra.Thrift.Cassandra_Client as C
+import qualified Database.Cassandra.Thrift.Cassandra_Types as T
+import Database.Cassandra.Thrift.Cassandra_Types
+ (ConsistencyLevel(..))
+import Network
+import Prelude hiding (catch)
+
+import Database.Cassandra.Pool
+import Database.Cassandra.Types
+
+test = do
+ pool <- createCassandraPool [("127.0.0.1", PortNumber 9160)] 3 300 "Keyspace1"
+ withPool pool $ \ Cassandra{..} -> do
+ let cp = T.ColumnParent (Just "CF1") Nothing
+ let sr = Just $ T.SliceRange (Just "") (Just "") (Just False) (Just 100)
+ let ks = Just ["eben"]
+ let sp = T.SlicePredicate Nothing sr
+ C.get_slice (cProto, cProto) "darak" cp sp ONE
+
+
+get
+ :: (BS k)
+ => Pool Cassandra Server
+ -> k
+ -> ColumnFamily
+ -> Selector
+ -> ConsistencyLevel
+ -> IO (Either CassandraException [Column])
+get p k cf s cl = undefined
+ where
+ k' = bs k
+
+
+getMulti
+ :: (BS k)
+ => Pool Cassandra Server
+ -> [k]
+ -> ColumnFamily
+ -> Selector
+ -> ConsistencyLevel
+ -> IO (Either CassandraException [Row])
+getMulti = undefined
+
+
+insert
+ :: Pool Cassandra Server
+ -> k
+ -> [Column]
+ -> ColumnFamily
+ -> ConsistencyLevel
+ -> Maybe Int
+ -> IO (Either CassandraException Integer)
+insert = undefined
+
+
+remove
+ :: (BS k)
+ => Pool Cassandra Server
+ -> k
+ -> Selector
+ -> ColumnFamily
+ -> ConsistencyLevel
+ -> IO (Either CassandraException Integer)
+remove = undefined
+
+
+wrapException :: IO a -> IO (Either CassandraException a)
+wrapException a =
+ (a >>= return . Right)
+ `catch` (\(T.NotFoundException) -> return $ Left NotFoundException)
+ `catch` (\(T.InvalidRequestException e) ->
+ return . Left . InvalidRequestException $ maybe "" id e)
+ `catch` (\T.UnavailableException -> return $ Left UnavailableException)
+ `catch` (\T.TimedOutException -> return $ Left TimedOutException)
+ `catch` (\(T.AuthenticationException e) ->
+ return . Left . AuthenticationException $ maybe "" id e)
+ `catch` (\(T.AuthorizationException e) ->
+ return . Left . AuthorizationException $ maybe "" id e)
+ `catch` (\T.SchemaDisagreementException -> return $ Left SchemaDisagreementException)
@@ -0,0 +1,175 @@
+{-# LANGUAGE PatternGuards, NamedFieldPuns, RecordWildCards #-}
+
+
+module Database.Cassandra.Pool where
+
+
+
+import Control.Applicative ((<$>))
+import Control.Concurrent.STM
+import Control.Exception (SomeException, catch, onException)
+import Control.Monad (forM_, forever, join, liftM2, unless, when)
+import Control.Monad.IO.Class (liftIO)
+import Data.ByteString (ByteString)
+import Data.List (partition)
+import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
+import Prelude hiding (catch)
+import System.Mem.Weak (addFinalizer)
+import System.IO (hClose, Handle(..))
+
+
+import qualified Database.Cassandra.Thrift.Cassandra_Client as C
+import Thrift.Transport
+import Thrift.Transport.Handle
+import Thrift.Transport.Framed
+import Thrift.Protocol.Binary
+import Network
+
+
+type Server = (HostName, PortID)
+
+
+data Cassandra = Cassandra {
+ cHandle :: Handle
+ , cFramed :: FramedTransport Handle
+ , cProto :: BinaryProtocol (FramedTransport Handle)
+}
+
+
+createCassandraPool
+ :: [Server]
+ -> Int
+ -> NominalDiffTime
+ -> String
+ -> IO (Pool Cassandra Server)
+createCassandraPool servers n maxIdle ks = createPool cr dest n maxIdle servers
+ where
+ cr :: Server -> IO Cassandra
+ cr (host, p) = do
+ h <- hOpen (host, p)
+ ft <- openFramedTransport h
+ let p = BinaryProtocol ft
+ C.set_keyspace (p,p) ks
+ return $ Cassandra h ft p
+ dest h = hClose $ cHandle h
+
+
+newtype Pool a s = Pool { stripes :: TVar (Ring (Stripe a s)) }
+
+
+createPool cr dest n maxIdle servers = do
+ when (maxIdle < 0.5) $
+ modError "pool " $ "invalid idle time " ++ show maxIdle
+ when (n < 1) $
+ modError "pool " $ "invalid maximum resource count " ++ show n
+ stripes' <- mapM (createStripe cr dest n maxIdle) servers
+ -- reaperId <- forkIO $ reaper destroy idleTime localPools
+ -- addFinalizer p $ killThread reaperId
+ tv <- atomically $ newTVar (mkRing stripes')
+ return $ Pool tv
+
+
+
+withPool :: Pool a s -> (a -> IO b) -> IO b
+withPool Pool{..} f = do
+ Ring{..} <- atomically $ do
+ r <- readTVar stripes
+ writeTVar stripes $ next r
+ return r
+ withStripe current f
+
+
+data Ring a = Ring {
+ current :: !a
+ , used :: [a]
+ , upcoming :: [a]
+ }
+
+
+mkRing [] = error "Can't make a ring from empty list"
+mkRing (a:as) = Ring a [] as
+
+
+next :: Ring a -> Ring a
+next Ring{..}
+ | (n:rest) <- upcoming
+ = Ring n (current : used) rest
+next Ring{..}
+ | (n:rest) <- reverse (current : used)
+ = Ring n [] rest
+
+
+data Stripe a s = Stripe {
+ idle :: TVar [Connection a]
+ -- ^ FIFO buffer of idle connections
+ , inUse :: TVar Int
+ -- ^ Set of in-use connections
+ , server :: s
+ -- ^ Server this strip is connected to
+ , create :: s -> IO a
+ -- ^ Create action
+ , destroy :: (a -> IO ())
+ -- ^ Destroy action
+ , cxns :: Int
+ -- ^ Max connections
+ , ttl :: NominalDiffTime
+ -- ^ TTL for each connection
+ }
+
+
+createStripe
+ :: (s -> IO a)
+ -> (a -> IO ())
+ -> Int
+ -> NominalDiffTime
+ -> s
+ -> IO (Stripe a s)
+createStripe cr dest n maxIdle s = atomically $ do
+ idles <- newTVar []
+ used <- newTVar 0
+ return $ Stripe {
+ idle = idles
+ , inUse = used
+ , server = s
+ , create = cr
+ , destroy = dest
+ , cxns = n
+ , ttl = maxIdle
+ }
+
+
+withStripe :: Stripe a s -> (a -> IO b) -> IO b
+withStripe Stripe{..} f = do
+ res <- join . atomically $ do
+ cs <- readTVar idle
+ case cs of
+ (Connection{..}:rest) -> writeTVar idle rest >> return (return cxn)
+ [] -> do
+ used <- readTVar inUse
+ when (used == cxns) retry
+ writeTVar inUse $! used + 1
+ return $ create server
+ `onException` atomically (modifyTVar_ inUse (subtract 1))
+ ret <- f res `onException` (destroy res `onException` return ())
+ now <- getCurrentTime
+ atomically $ modifyTVar_ idle (Connection res now : )
+ return ret
+
+
+
+data Connection a = Connection {
+ cxn :: a
+ , lastUse :: UTCTime
+ }
+
+
+
+modifyTVar_ :: TVar a -> (a -> a) -> STM ()
+modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a
+
+
+modError :: String -> String -> a
+modError func msg =
+ error $ "Data.Pool." ++ func ++ ": " ++ msg
+
+
Oops, something went wrong.

0 comments on commit 52983af

Please sign in to comment.