Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Mq_sqlite_persistence: bugfix in count_queue_msgs.

  • Loading branch information...
commit 99a2dd4c38be7d0d1742e746153ef5c10b0b0fd0 1 parent 54d360f
mfp mfp authored
Showing with 7 additions and 14 deletions.
  1. +7 −14 mq_sqlite_persistence.ml
21 mq_sqlite_persistence.ml
View
@@ -26,15 +26,6 @@ type t = {
let get_first = function [x] -> x | _ -> assert false
-let msgs_acked_in_mem ?dst t =
- get_first
- (match dst with
- Some dst ->
- select t.db
- sqlc"SELECT @L{COUNT(*)} FROM acked_msgs WHERE destination=%s"
- dst
- | None -> select t.db sqlc"SELECT @L{COUNT(*)} FROM acked_msgs")
-
let flush_acked_msgs db =
execute db
sqlc"DELETE FROM ocamlmq_msgs WHERE msg_id IN (SELECT * FROM acked_msgs)";
@@ -46,7 +37,7 @@ let flush t =
printf "Flushing to disk: %d msgs, %d pending ACKS, %Ld ACKS%!"
(Hashtbl.length t.in_mem_msgs)
(SSET.cardinal t.ack_pending)
- (msgs_acked_in_mem t);
+ (select_one t.db sqlc"SELECT @L{COUNT(*)} FROM acked_msgs");
Hashtbl.iter
(fun _ msg ->
execute db
@@ -253,9 +244,11 @@ let count_queue_msgs t dst =
let in_db =
get_first
(select t.db
- sqlc"SELECT @L{COUNT(*)} FROM ocamlmq_msgs WHERE destination=%s"
- dst) in
- let in_db_acked = msgs_acked_in_mem ~dst t in
- return (Int64.sub (Int64.add (Int64.of_int in_mem) in_db) in_db_acked)
+ sqlc"SELECT @L{COUNT(*)} FROM ocamlmq_msgs as msg
+ WHERE destination=%s
+ AND NOT EXISTS (SELECT 1 FROM acked_msgs WHERE msg_id = msg.msg_id)"
+ dst)
+ in
+ return (Int64.add (Int64.of_int in_mem) in_db)
let crash_recovery t = return ()
Please sign in to comment.
Something went wrong with that request. Please try again.