Skip to content

Commit

Permalink
Changed API
Browse files Browse the repository at this point in the history
  • Loading branch information
beeker1121 committed Jul 30, 2016
1 parent 05f8d5f commit 6263a9f
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 330 deletions.
46 changes: 0 additions & 46 deletions item.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,6 @@ type Item struct {
Value []byte
}

// NewItem creates a new item for use with a stack or queue.
func NewItem(value []byte) *Item {
return &Item{Value: value}
}

// NewItemString is a helper function for NewItem that accepts a
// value as a string rather than a byte slice.
func NewItemString(value string) *Item {
return NewItem([]byte(value))
}

// NewItemObject is a helper function for NewItem that accepts any
// value type, which is then encoded into a byte slice using
// encoding/gob.
func NewItemObject(value interface{}) (*Item, error) {
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
if err := enc.Encode(value); err != nil {
return nil, err
}
return NewItem(buffer.Bytes()), nil
}

// ToString returns the item value as a string.
func (i *Item) ToString() string {
return string(i.Value)
Expand All @@ -61,29 +38,6 @@ type PriorityItem struct {
Value []byte
}

// NewPriorityItem creates a new item for use with a priority queue.
func NewPriorityItem(value []byte, priority uint8) *PriorityItem {
return &PriorityItem{Priority: priority, Value: value}
}

// NewPriorityItemString is a helper function for NewPriorityItem
// that accepts a value as a string rather than a byte slice.
func NewPriorityItemString(value string, priority uint8) *PriorityItem {
return NewPriorityItem([]byte(value), priority)
}

// NewPriorityItemObject is a helper function for NewPriorityItem
// that accepts any value type, which is then encoded into a byte
// slice using encoding/gob.
func NewPriorityItemObject(value interface{}, priority uint8) (*PriorityItem, error) {
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
if err := enc.Encode(value); err != nil {
return nil, err
}
return NewPriorityItem(buffer.Bytes(), priority), nil
}

// ToString returns the priority item value as a string.
func (pi *PriorityItem) ToString() string {
return string(pi.Value)
Expand Down
51 changes: 0 additions & 51 deletions item_test.go

This file was deleted.

108 changes: 72 additions & 36 deletions pqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,34 +81,58 @@ func OpenPriorityQueue(dataDir string, order order) (*PriorityQueue, error) {
}

// Enqueue adds an item to the priority queue.
func (pq *PriorityQueue) Enqueue(item *PriorityItem) error {
func (pq *PriorityQueue) Enqueue(priority uint8, value []byte) (*PriorityItem, error) {
pq.Lock()
defer pq.Unlock()

// Check if queue is closed.
if !pq.isOpen {
return ErrDBClosed
return nil, ErrDBClosed
}

// Get the priorityLevel.
level := pq.levels[item.Priority]
level := pq.levels[priority]

// Set item ID and key.
item.ID = level.tail + 1
item.Key = pq.generateKey(item.Priority, item.ID)
// Create new PriorityItem.
item := &PriorityItem{
ID: level.tail + 1,
Priority: priority,
Key: pq.generateKey(priority, level.tail+1),
Value: value,
}

// Add it to the priority queue.
err := pq.db.Put(item.Key, item.Value, nil)
if err == nil {
level.tail++
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
return nil, err
}

// If this priority level is more important than the curLevel.
if pq.cmpAsc(item.Priority) || pq.cmpDesc(item.Priority) {
pq.curLevel = item.Priority
}
// Increment tail position.
level.tail++

// If this priority level is more important than the curLevel.
if pq.cmpAsc(priority) || pq.cmpDesc(priority) {
pq.curLevel = priority
}

return err
return item, nil
}

// EnqueueString is a helper function for Enqueue that accepts a
// value as a string rather than a byte slice.
func (pq *PriorityQueue) EnqueueString(priority uint8, value string) (*PriorityItem, error) {
return pq.Enqueue(priority, []byte(value))
}

// EnqueueObject is a helper function for Enqueue that accepts any
// value type, which is then encoded into a byte slice using
// encoding/gob.
func (pq *PriorityQueue) EnqueueObject(priority uint8, value interface{}) (*PriorityItem, error) {
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
if err := enc.Encode(value); err != nil {
return nil, err
}
return pq.Enqueue(priority, buffer.Bytes())
}

// Dequeue removes the next item in the priority queue and returns it.
Expand All @@ -124,15 +148,15 @@ func (pq *PriorityQueue) Dequeue() (*PriorityItem, error) {
// Try to get the next item.
item, err := pq.getNextItem()
if err != nil {
return item, err
return nil, err
}

// Remove this item from the priority queue.
if err = pq.db.Delete(item.Key, nil); err != nil {
return item, err
return nil, err
}

// Increment position.
// Increment head position.
pq.levels[pq.curLevel].head++

return item, nil
Expand All @@ -152,15 +176,15 @@ func (pq *PriorityQueue) DequeueByPriority(priority uint8) (*PriorityItem, error
// Try to get the next item in the given priority level.
item, err := pq.getItemByPriorityID(priority, pq.levels[priority].head+1)
if err != nil {
return item, err
return nil, err
}

// Remove this item from the priority queue.
if err = pq.db.Delete(item.Key, nil); err != nil {
return item, err
return nil, err
}

// Increment position.
// Increment head position.
pq.levels[priority].head++

return item, nil
Expand Down Expand Up @@ -219,41 +243,52 @@ func (pq *PriorityQueue) PeekByPriorityID(priority uint8, id uint64) (*PriorityI

// Update updates an item in the priority queue without changing its
// position.
func (pq *PriorityQueue) Update(item *PriorityItem, newValue []byte) error {
func (pq *PriorityQueue) Update(priority uint8, id uint64, newValue []byte) (*PriorityItem, error) {
pq.Lock()
defer pq.Unlock()

// Check if queue is closed.
if !pq.isOpen {
return ErrDBClosed
return nil, ErrDBClosed
}

// Check if item exists in queue.
if item.ID <= pq.levels[item.Priority].head || item.ID > pq.levels[item.Priority].tail {
return ErrOutOfBounds
if id <= pq.levels[priority].head || id > pq.levels[priority].tail {
return nil, ErrOutOfBounds
}

// Create new PriorityItem.
item := &PriorityItem{
ID: id,
Priority: priority,
Key: pq.generateKey(priority, id),
Value: newValue,
}

item.Key = pq.generateKey(item.Priority, item.ID)
item.Value = newValue
return pq.db.Put(item.Key, item.Value, nil)
// Update this item in the queue.
if err := pq.db.Put(item.Key, item.Value, nil); err != nil {
return nil, err
}

return item, nil
}

// UpdateString is a helper function for Update that accepts a value
// as a string rather than a byte slice.
func (pq *PriorityQueue) UpdateString(item *PriorityItem, newValue string) error {
return pq.Update(item, []byte(newValue))
func (pq *PriorityQueue) UpdateString(priority uint8, id uint64, newValue string) (*PriorityItem, error) {
return pq.Update(priority, id, []byte(newValue))
}

// UpdateObject is a helper function for Update that accepts any
// value type, which is then encoded into a byte slice using
// encoding/gob.
func (pq *PriorityQueue) UpdateObject(item *PriorityItem, newValue interface{}) error {
func (pq *PriorityQueue) UpdateObject(priority uint8, id uint64, newValue interface{}) (*PriorityItem, error) {
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
if err := enc.Encode(newValue); err != nil {
return err
return nil, err
}
return pq.Update(item, buffer.Bytes())
return pq.Update(priority, id, buffer.Bytes())
}

// Length returns the total number of items in the priority queue.
Expand Down Expand Up @@ -407,13 +442,14 @@ func (pq *PriorityQueue) getItemByPriorityID(priority uint8, id uint64) (*Priori
return nil, ErrOutOfBounds
}

// Get item from database.
var err error

// Create a new PriorityItem.
item := &PriorityItem{ID: id, Priority: priority, Key: pq.generateKey(priority, id)}
item.Value, err = pq.db.Get(item.Key, nil)
if item.Value, err = pq.db.Get(item.Key, nil); err != nil {
return nil, err
}

return item, err
return item, nil
}

// generatePrefix creates the key prefix for the given priority level.
Expand Down
Loading

0 comments on commit 6263a9f

Please sign in to comment.