Permalink
Browse files

WIP

  • Loading branch information...
1 parent fc2ac6d commit bdc18d60f8d51404327a20e58b1cb6fbda93969c Alan Zimmerman committed Mar 3, 2014
View
3 hroq.cabal
@@ -73,6 +73,7 @@ executable hroq
, thyme
, vector-space
, lens
+ , sqlite-simple
test-suite StatsGathererTests
type: exitcode-stdio-1.0
@@ -99,6 +100,7 @@ test-suite StatsGathererTests
, thyme
, vector-space
, lens
+ , sqlite-simple
hs-source-dirs:
tests src
ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N
@@ -133,6 +135,7 @@ test-suite AlarmServerTests
, thyme
, vector-space
, lens
+ , sqlite-simple
hs-source-dirs:
tests src
ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N
View
26 src/Data/Concurrent/Queue/Roq/Mnesia.hs
@@ -4,23 +4,23 @@
module Data.Concurrent.Queue.Roq.Mnesia
(
-- * Schema etc
- create_schema
- , delete_schema
- , create_table
+ -- create_schema
+ -- , delete_schema
+ create_table
, delete_table
, TableStorage(..)
- -- , Storable(..)
+
, change_table_copy_type
, dirty_write
, dirty_write_q
, dirty_write_q_sid
- -- , dirty_write_ls
+
, dirty_read
, dirty_read_q
- -- , dirty_read_ls
+
, dirty_delete_q
- -- , dirty_delete_ls
+
, dirty_all_keys
, wait_for_tables
@@ -205,6 +205,7 @@ data State = MnesiaState
, sRamQ :: !(Map.Map TableName [QEntry])
, sRamMeta :: !(Map.Map TableName [Meta])
, sEkg :: !EKG.Server
+ , sHpPid :: !(Maybe ProcessId)
} deriving (Show,Typeable,Generic)
instance Show EKG.Server where
@@ -427,7 +428,7 @@ startHroqMnesia initParams = do
-- init callback
initFunc :: InitHandler EKG.Server State
initFunc ekg = do
- let s = (MnesiaState Map.empty Map.empty Map.empty ekg)
+ let s = (MnesiaState Map.empty Map.empty Map.empty ekg Nothing)
ems <- liftIO $ Exception.try $ decodeFileSchema (tableNameToFileName schemaTable)
logm $ "HroqMnesia.initFunc:ems=" ++ (show ems)
let m = case ems of
@@ -736,11 +737,14 @@ do_dirty_write_q s tableName record = do
-- logm $ "HroqMnesia.not doing physical write" -- ++AZ++
-- liftIO $ defaultAppend (tableNameToFileName tableName) (encode record)
- hsid <- hroq_handle_pool_server_pid -- TODO: cache this in State
- append hsid (tableNameToFileName tableName) (encode record)
+ hpid <- case (sHpPid s) of
+ Just p -> return p
+ Nothing -> hroq_handle_pool_server_pid
+
+ append hpid (tableNameToFileName tableName) (encode record)
let s' = insertEntryQ s tableName record
- return s'
+ return s' { sHpPid = Just hpid }
{-
do_dirty_write_ls ::
View
7 src/Data/Concurrent/Queue/Roq/Queue.hs
@@ -455,8 +455,8 @@ enqueue_one_message mpid queueName v s = do
logt $ "enqueue_one_message 2"
-- HM.dirty_write_q enqueueWorkBucket msgRecord
- -- HM.dirty_write_q_sid (qsMnesiaSid s) enqueueWorkBucket msgRecord
-
+ HM.dirty_write_q_sid (qsMnesiaSid s) enqueueWorkBucket msgRecord
+{-
-- append (qsHandlePoolPid s) (HM.tableNameToFileName enqueueWorkBucket) (encode msgRecord)
(h,newHandleMap) <- case Map.lookup (HM.tableNameToFileName enqueueWorkBucket) (qsFileHandles s) of
@@ -467,6 +467,7 @@ enqueue_one_message mpid queueName v s = do
return (ha,fh)
liftIO $ B.hPut h (encode msgRecord) -- >> hFlush h
+-}
let newTotalQueuedMsg = (qsTotalQueueSize s) + 1
newEnqueueCount = (qsEnqueueCount s) + 1
@@ -514,7 +515,7 @@ enqueue_one_message mpid queueName v s = do
publish_queue_stats (qsStatsPid s) queueName (QStats appInfo newTotalQueuedMsg newEnqueueCount dequeueCount)
- return s' { qsFileHandles = newHandleMap }
+ return s' -- { qsFileHandles = newHandleMap }
-- ---------------------------------------------------------------------
View
4 src/Data/Concurrent/Queue/Roq/Util.hs
@@ -13,14 +13,12 @@ import Data.Concurrent.Queue.Roq.Hroq
import Data.Concurrent.Queue.Roq.Mnesia
-- import Data.Time.Clock
import Data.Thyme.Clock
-import System.IO.Unsafe
-- ---------------------------------------------------------------------
generate_key :: Process QKey
generate_key = do
- -- k <- liftIO $ getCurrentTime
- let k = unsafePerformIO $ getCurrentTime
+ k <- liftIO $ getCurrentTime
return $ QK (show k)
-- ---------------------------------------------------------------------
View
20 tests/TestAlarmServer.hs
@@ -45,21 +45,21 @@ logm = say
-- ---------------------------------------------------------------------
testQueueStats :: ProcessId -> TestResult Int -> Process ()
-testQueueStats _pid result = do
- SG.ReplyQStatsNotFound <- SG.get_queue_stats (QN "queue1")
- SG.publish_queue_stats (QN "queue1") (SG.QStats "info" 1 2 3)
- SG.publish_queue_stats (QN "queue2") (SG.QStats "info" 2 3 4)
- (SG.ReplyQStats (SG.QStats "info" 1 2 3)) <- SG.get_queue_stats (QN "queue1")
+testQueueStats pid result = do
+ SG.ReplyQStatsNotFound <- SG.get_queue_stats pid (QN "queue1")
+ SG.publish_queue_stats pid (QN "queue1") (SG.QStats "info" 1 2 3)
+ SG.publish_queue_stats pid (QN "queue2") (SG.QStats "info" 2 3 4)
+ (SG.ReplyQStats (SG.QStats "info" 1 2 3)) <- SG.get_queue_stats pid (QN "queue1")
stash result 5
-- ---------------------------------------------------------------------
testConsumerStats :: ProcessId -> TestResult Int -> Process ()
-testConsumerStats _pid result = do
- SG.ReplyCStatsNotFound <- SG.get_consumer_stats (CN "consumer1")
- SG.publish_consumer_stats (CN "consumer1") (SG.QStats "info" 1 2 3)
- SG.publish_consumer_stats (CN "consumer2") (SG.QStats "info" 2 3 4)
- (SG.ReplyCStats (SG.QStats "info" 1 2 3)) <- SG.get_consumer_stats (CN "consumer1")
+testConsumerStats pid result = do
+ SG.ReplyCStatsNotFound <- SG.get_consumer_stats pid (CN "consumer1")
+ SG.publish_consumer_stats pid (CN "consumer1") (SG.QStats "info" 1 2 3)
+ SG.publish_consumer_stats pid (CN "consumer2") (SG.QStats "info" 2 3 4)
+ (SG.ReplyCStats (SG.QStats "info" 1 2 3)) <- SG.get_consumer_stats pid (CN "consumer1")
stash result 5
-- --------------------------------------------------------------------
View
20 tests/TestStatsGatherer.hs
@@ -43,21 +43,21 @@ logm = say
-- ---------------------------------------------------------------------
testQueueStats :: ProcessId -> TestResult Int -> Process ()
-testQueueStats _pid result = do
- SG.ReplyQStatsNotFound <- SG.get_queue_stats (QN "queue1")
- SG.publish_queue_stats (QN "queue1") (SG.QStats "info" 1 2 3)
- SG.publish_queue_stats (QN "queue2") (SG.QStats "info" 2 3 4)
- (SG.ReplyQStats (SG.QStats "info" 1 2 3)) <- SG.get_queue_stats (QN "queue1")
+testQueueStats pid result = do
+ SG.ReplyQStatsNotFound <- SG.get_queue_stats pid (QN "queue1")
+ SG.publish_queue_stats pid (QN "queue1") (SG.QStats "info" 1 2 3)
+ SG.publish_queue_stats pid (QN "queue2") (SG.QStats "info" 2 3 4)
+ (SG.ReplyQStats (SG.QStats "info" 1 2 3)) <- SG.get_queue_stats pid (QN "queue1")
stash result 5
-- ---------------------------------------------------------------------
testConsumerStats :: ProcessId -> TestResult Int -> Process ()
-testConsumerStats _pid result = do
- SG.ReplyCStatsNotFound <- SG.get_consumer_stats (CN "consumer1")
- SG.publish_consumer_stats (CN "consumer1") (SG.QStats "info" 1 2 3)
- SG.publish_consumer_stats (CN "consumer2") (SG.QStats "info" 2 3 4)
- (SG.ReplyCStats (SG.QStats "info" 1 2 3)) <- SG.get_consumer_stats (CN "consumer1")
+testConsumerStats pid result = do
+ SG.ReplyCStatsNotFound <- SG.get_consumer_stats pid (CN "consumer1")
+ SG.publish_consumer_stats pid (CN "consumer1") (SG.QStats "info" 1 2 3)
+ SG.publish_consumer_stats pid (CN "consumer2") (SG.QStats "info" 2 3 4)
+ (SG.ReplyCStats (SG.QStats "info" 1 2 3)) <- SG.get_consumer_stats pid (CN "consumer1")
stash result 5
-- --------------------------------------------------------------------

0 comments on commit bdc18d6

Please sign in to comment.