Skip to content

Commit

Permalink
Coalesce writing messages to disk + don't write if not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Jenkins committed Dec 5, 2015
1 parent fa8a40f commit 96b92b5
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 109 deletions.
1 change: 1 addition & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (consumer *Consumer) consume(id uint16) {
// TODO: what is this doing?
consumer.cqueue.MaybeReady() <- false
for _ = range consumer.incoming {

consumer.consumeOne()
}
}
Expand Down
222 changes: 113 additions & 109 deletions msgstore/msgstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,23 @@ func (qmf *QueueMessageFactory) New() proto.Unmarshaler {
return &amqp.QueueMessage{}
}

const FLAG_USE_ASYNC_PERSIST = false

const (
opAdd = iota
opDelete = iota
opIncrDelivered = iota
)

type PersistOp struct {
type PersistKey struct {
id int64
opType int
queueName string
}

type MessageStore struct {
index map[int64]*amqp.IndexMessage
messages map[int64]*amqp.Message
persistOps []*PersistOp
addOps map[PersistKey]*amqp.QueueMessage
delOps map[PersistKey]*amqp.QueueMessage
deliveredOps map[PersistKey]*amqp.QueueMessage
persistLock sync.Mutex
db *bolt.DB
msgLock sync.RWMutex
Expand All @@ -67,9 +66,12 @@ func NewMessageStore(fileName string) (*MessageStore, error) {
return nil, err
}
ms := &MessageStore{
index: make(map[int64]*amqp.IndexMessage),
messages: make(map[int64]*amqp.Message),
db: db,
index: make(map[int64]*amqp.IndexMessage),
messages: make(map[int64]*amqp.Message),
db: db,
addOps: make(map[PersistKey]*amqp.QueueMessage),
delOps: make(map[PersistKey]*amqp.QueueMessage),
deliveredOps: make(map[PersistKey]*amqp.QueueMessage),
}
// Stats
ms.statAdd = stats.MakeHistogram("add-message")
Expand Down Expand Up @@ -114,49 +116,88 @@ func (ms *MessageStore) periodicPersist() {
}
}

func (ms *MessageStore) clearOps() {
ms.delOps = make(map[PersistKey]*amqp.QueueMessage)
ms.addOps = make(map[PersistKey]*amqp.QueueMessage)
ms.deliveredOps = make(map[PersistKey]*amqp.QueueMessage)
}

func (ms *MessageStore) persistOnce() {
// fmt.Println("Starting persist")
// Snapshot so we can keep queueing persist ops
ms.persistLock.Lock()
defer ms.persistLock.Unlock()
if !FLAG_USE_ASYNC_PERSIST {
ms.persistOps = make([]*PersistOp, 0)
return
delOps := ms.delOps
addOps := ms.addOps
deliveredOps := ms.deliveredOps
ms.clearOps()
ms.persistLock.Unlock()

// We don't need to add or mark delivered anything we are going to delete
// We don't need to delete anything we haven't added yet
noDelete := make([]PersistKey, 0, len(addOps))
for id, _ := range delOps {
if _, ok := addOps[id]; ok {
delete(addOps, id)
noDelete = append(noDelete, id)
}
delete(deliveredOps, id)
}
for _, id := range noDelete {
delete(delOps, id)
}
// fmt.Printf("Persist: add:%d, del:%d, delivery:%d\n", len(addOps), len(delOps), len(deliveredOps))
err := ms.db.Update(func(tx *bolt.Tx) error {
for _, op := range ms.persistOps {
t := op.opType
switch {
case t == opAdd:
ms.addOp(tx, op)
case t == opDelete:
ms.deleteOp(tx, op)
case t == opIncrDelivered:
ms.incrDeliveredOp(tx, op)
// Add
msgsAdded := make(map[int64]bool)
for pk, qm := range addOps {
// Add -- Save messages to content/index stores
if _, ok := msgsAdded[pk.id]; !ok {
msg, okM := ms.GetNoChecks(pk.id)
im, okI := ms.GetIndex(pk.id)
if okM != okI {
panic("Message index integrity error")
}
if !okM {
// this message was deleted after we did a snapshot. We'll
// still call a useless delete on it later, but we don't
// need to add it now
continue
}
persistMessage(tx, msg)
persistIndexMessage(tx, im)
}
// Add -- Add messages to queues
persistQueueMessage(tx, pk.queueName, qm)
}

// Update Delivered
for pk, qm := range deliveredOps {
persistQueueMessage(tx, pk.queueName, qm)
}

// Delete
// Delete -- Remove from queue
for pk, qm := range delOps {
var err = depersistQueueMessage(tx, pk.queueName, qm.Id)
if err != nil {
return err
}
remaining, err := decrIndexMessage(tx, qm.Id, ms)
if err != nil {
return err
}
// Delete -- Delete message all together if there are no references left
if remaining == 0 {
return depersistMessage(tx, qm.Id)
}
}
return nil
})
if err == nil {
panic("Failed to persist")
// TODO: this should probably just print a critical log message rather than
// killing the server
if err != nil {
panic("Failed to persist: " + err.Error())
}
ms.persistOps = make([]*PersistOp, 0)
}

func (ms *MessageStore) addOp(tx *bolt.Tx, op *PersistOp) {

}

func (ms *MessageStore) deleteOp(tx *bolt.Tx, op *PersistOp) {

}

func (ms *MessageStore) incrDeliveredOp(tx *bolt.Tx, op *PersistOp) {

}

func (ms *MessageStore) enqueueOp(op *PersistOp) {
ms.persistLock.Lock()
ms.persistOps = append(ms.persistOps, op)
ms.persistLock.Unlock()
}

func (ms *MessageStore) LoadMessages() error {
Expand Down Expand Up @@ -288,33 +329,13 @@ func (ms *MessageStore) AddTxMessages(msgs []*amqp.TxMessage) (map[string][]*amq
}
// if any are durable, persist those ones
if anyDurable {
ms.persistLock.Lock()
for q, qms := range queueMessages {
for _, qm := range qms {
ms.enqueueOp(&PersistOp{
id: qm.Id,
queueName: q,
opType: opAdd,
})
ms.addOps[PersistKey{qm.Id, q}] = qm
}
}
err := ms.db.Update(func(tx *bolt.Tx) error {
// Save messages to content/index stores
for _, msg := range msgs {
persistMessage(tx, msg.Msg)
// fmt.Printf("Persisting: %d\n", msg.Msg.Id)
persistIndexMessage(tx, indexMessages[msg.Msg.Id])
}
// Add messages to queues
for q, qms := range queueMessages {
for _, qm := range qms {
persistQueueMessage(tx, q, qm)
}
}
return nil
})
if err != nil {
return nil, err
}
ms.persistLock.Unlock()
}
// Add to memory message store
ms.msgLock.Lock()
Expand All @@ -332,11 +353,9 @@ func (ms *MessageStore) AddTxMessages(msgs []*amqp.TxMessage) (map[string][]*amq
func (ms *MessageStore) IncrDeliveryCount(queueName string, qm *amqp.QueueMessage) (err error) {
qm.DeliveryCount += 1
if qm.Durable {
ms.enqueueOp(&PersistOp{id: qm.Id, opType: opIncrDelivered, queueName: queueName})
err = ms.db.Update(func(tx *bolt.Tx) error {
persistQueueMessage(tx, queueName, qm)
return nil
})
ms.persistLock.Lock()
ms.deliveredOps[PersistKey{qm.Id, queueName}] = qm
ms.persistLock.Unlock()
}
return
}
Expand All @@ -363,42 +382,25 @@ func (ms *MessageStore) RemoveRef(qm *amqp.QueueMessage, queueName string, rhs [
}
// Update disk
if im.Durable {
ms.enqueueOp(&PersistOp{id: im.Id, opType: opDelete, queueName: queueName})
err := ms.db.Update(func(tx *bolt.Tx) error {
// fmt.Printf("Remove from queue: %d '%s'\n", qm.Id, queueName)
var err = depersistQueueMessage(tx, queueName, qm.Id)
if err != nil {
return err
}
remaining, err := decrIndexMessage(tx, qm.Id)
// fmt.Printf("Remaining: %d\n", remaining)
if err != nil {
return err
}
if remaining == 0 {
return depersistMessage(tx, qm.Id)
}
return nil
})
if err != nil {
return err
ms.persistLock.Lock()
ms.delOps[PersistKey{im.Id, queueName}] = qm
ms.persistLock.Unlock()
} else {
// Update if only memory
im.Refs -= 1
if im.Refs == 0 {
ms.msgLock.Lock()
delete(ms.index, qm.Id)
ms.msgLock.Unlock()

ms.indexLock.Lock()
delete(ms.messages, qm.Id)
ms.indexLock.Unlock()
}
}
// Update memory
im.Refs -= 1
if im.Refs == 0 {

ms.msgLock.Lock()
delete(ms.index, qm.Id)
ms.msgLock.Unlock()

ms.indexLock.Lock()
delete(ms.messages, qm.Id)
ms.indexLock.Unlock()

for _, rh := range rhs {
rh.ReleaseResources(qm)
}
for _, rh := range rhs {
rh.ReleaseResources(qm)
}
return nil
}
Expand All @@ -411,7 +413,7 @@ func depersistMessage(tx *bolt.Tx, id int64) error {
return content_bucket.Delete(binaryId(id))
}

func decrIndexMessage(tx *bolt.Tx, id int64) (int32, error) {
func decrIndexMessage(tx *bolt.Tx, id int64, ms *MessageStore) (int32, error) {
// bucket
index_bucket, err := tx.CreateBucketIfNotExists(MESSAGE_INDEX_BUCKET)
if err != nil {
Expand All @@ -435,7 +437,13 @@ func decrIndexMessage(tx *bolt.Tx, id int64) (int32, error) {
im.Refs -= 1
// TODO: panic on <0
if im.Refs == 0 {
// fmt.Printf("Delete from Index: %d\n", id)
ms.msgLock.Lock()
delete(ms.index, id)
ms.msgLock.Unlock()

ms.indexLock.Lock()
delete(ms.messages, id)
ms.indexLock.Unlock()
return 0, index_bucket.Delete(bId)
}
newBytes, err := proto.Marshal(im)
Expand All @@ -453,10 +461,6 @@ func depersistQueueMessage(tx *bolt.Tx, queueName string, id int64) error {
return err
}
var key = binaryId(id)
var got = content_bucket.Get(key)
if got == nil {
return fmt.Errorf("Could not find '%d' in queue '%s'", id, queueName)
}
return content_bucket.Delete(key)
}

Expand Down

0 comments on commit 96b92b5

Please sign in to comment.