Skip to content

Commit

Permalink
Added ErrDBClosed, reset length Close call, fixed stack error comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Conner Hewitt committed Jun 14, 2016
1 parent 800c439 commit 39d78f4
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 20 deletions.
5 changes: 5 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ var (
// ErrOutOfBounds is returned when the ID used to lookup an item
// in the queue is outside the current range of the queue.
ErrOutOfBounds = errors.New("goque: ID used is out of the range of the queue")

// ErrDBClosed is returned when the Close function has already
// been called, causing the stack, queue, or priority queue to
// close, as well as its underlying database.
ErrDBClosed = errors.New("goque: The database is closed")
)
50 changes: 50 additions & 0 deletions pqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (pq *PriorityQueue) Enqueue(item *PriorityItem) error {
pq.Lock()
defer pq.Unlock()

// If queue is already closed.
if !pq.isOpen {
return ErrDBClosed
}

// Get the priorityLevel.
level := pq.levels[item.Priority]

Expand All @@ -109,6 +114,11 @@ func (pq *PriorityQueue) Dequeue() (*PriorityItem, error) {
pq.Lock()
defer pq.Unlock()

// If queue is already closed.
if !pq.isOpen {
return nil, ErrDBClosed
}

// Try to get the next item in the current priority level.
item, err := pq.getNextItem()
if err != nil {
Expand All @@ -132,6 +142,11 @@ func (pq *PriorityQueue) DequeueByPriority(priority uint8) (*PriorityItem, error
pq.Lock()
defer pq.Unlock()

// If queue is already closed.
if !pq.isOpen {
return nil, ErrDBClosed
}

// Try to get the next item in the given priority level.
item, err := pq.getItemByPriorityID(priority, pq.levels[priority].head+1)
if err != nil {
Expand All @@ -153,6 +168,12 @@ func (pq *PriorityQueue) DequeueByPriority(priority uint8) (*PriorityItem, error
func (pq *PriorityQueue) Peek() (*PriorityItem, error) {
pq.RLock()
defer pq.RUnlock()

// If queue is already closed.
if !pq.isOpen {
return nil, ErrDBClosed
}

return pq.getNextItem()
}

Expand All @@ -162,6 +183,11 @@ func (pq *PriorityQueue) PeekByOffset(offset uint64) (*PriorityItem, error) {
pq.RLock()
defer pq.RUnlock()

// If queue is already closed.
if !pq.isOpen {
return nil, ErrDBClosed
}

// Check if queue is empty.
if pq.Length() == 0 {
return nil, ErrEmpty
Expand All @@ -186,6 +212,12 @@ func (pq *PriorityQueue) PeekByOffset(offset uint64) (*PriorityItem, error) {
func (pq *PriorityQueue) PeekByPriorityID(priority uint8, id uint64) (*PriorityItem, error) {
pq.RLock()
defer pq.RUnlock()

// If queue is already closed.
if !pq.isOpen {
return nil, ErrDBClosed
}

return pq.getItemByPriorityID(priority, id)
}

Expand All @@ -194,6 +226,12 @@ func (pq *PriorityQueue) PeekByPriorityID(priority uint8, id uint64) (*PriorityI
func (pq *PriorityQueue) Update(item *PriorityItem, newValue []byte) error {
pq.Lock()
defer pq.Unlock()

// If queue is already closed.
if !pq.isOpen {
return ErrDBClosed
}

item.Value = newValue
return pq.db.Put(item.Key, item.Value, nil)
}
Expand All @@ -206,6 +244,9 @@ func (pq *PriorityQueue) UpdateString(item *PriorityItem, newValue string) error

// Length returns the total number of items in the priority queue.
func (pq *PriorityQueue) Length() uint64 {
pq.RLock()
defer pq.RUnlock()

var length uint64
for _, v := range pq.levels {
length += v.length()
Expand All @@ -216,11 +257,20 @@ func (pq *PriorityQueue) Length() uint64 {

// Close closes the LevelDB database of the priority queue.
func (pq *PriorityQueue) Close() {
pq.Lock()
defer pq.Unlock()

// If queue is already closed.
if !pq.isOpen {
return
}

// Reset head and tail of each priority level.
for i := 0; i <= 255; i++ {
pq.levels[uint8(i)].head = 0
pq.levels[uint8(i)].tail = 0
}

pq.db.Close()
pq.isOpen = false
}
Expand Down
46 changes: 39 additions & 7 deletions pqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,38 @@ import (
"time"
)

func TestPriorityQueueClose(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
pq, err := OpenPriorityQueue(file, ASC)
if err != nil {
t.Error(err)
}
defer pq.Drop()

for p := 0; p <= 4; p++ {
for i := 1; i <= 10; i++ {
item := NewPriorityItemString(fmt.Sprintf("value for item %d", i), uint8(p))
if err = pq.Enqueue(item); err != nil {
t.Error(err)
}
}
}

if pq.Length() != 50 {
t.Errorf("Expected queue length of 1, got %d", pq.Length())
}

pq.Close()

if _, err = pq.Dequeue(); err != ErrDBClosed {
t.Errorf("Expected to get database closed error, got %s", err.Error())
}

if pq.Length() != 0 {
t.Errorf("Expected queue length of 0, got %d", pq.Length())
}
}

func TestPriorityQueueDrop(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
pq, err := OpenPriorityQueue(file, ASC)
Expand Down Expand Up @@ -232,7 +264,7 @@ func TestPriorityQueuePeekByOffsetEmptyAsc(t *testing.T) {

_, err = pq.PeekByOffset(0)
if err != ErrEmpty {
t.Errorf("Expected to get queue empty error, got %s", err.Error())
t.Errorf("Expected to get empty error, got %s", err.Error())
}

if err = pq.Enqueue(NewPriorityItemString("value", 0)); err != nil {
Expand All @@ -250,7 +282,7 @@ func TestPriorityQueuePeekByOffsetEmptyAsc(t *testing.T) {

_, err = pq.PeekByOffset(0)
if err != ErrEmpty {
t.Errorf("Expected to get queue empty error, got %s", err.Error())
t.Errorf("Expected to get empty error, got %s", err.Error())
}
}

Expand All @@ -264,7 +296,7 @@ func TestPriorityQueuePeekByOffsetEmptyDesc(t *testing.T) {

_, err = pq.PeekByOffset(0)
if err != ErrEmpty {
t.Errorf("Expected to get queue empty error, got %s", err.Error())
t.Errorf("Expected to get empty error, got %s", err.Error())
}

if err = pq.Enqueue(NewPriorityItemString("value", 0)); err != nil {
Expand All @@ -282,7 +314,7 @@ func TestPriorityQueuePeekByOffsetEmptyDesc(t *testing.T) {

_, err = pq.PeekByOffset(0)
if err != ErrEmpty {
t.Errorf("Expected to get queue empty error, got %s", err.Error())
t.Errorf("Expected to get empty error, got %s", err.Error())
}
}

Expand All @@ -296,7 +328,7 @@ func TestPriorityQueuePeekByOffsetBoundsAsc(t *testing.T) {

_, err = pq.PeekByOffset(0)
if err != ErrEmpty {
t.Errorf("Expected to get queue empty error, got %s", err.Error())
t.Errorf("Expected to get empty error, got %s", err.Error())
}

if err = pq.Enqueue(NewPriorityItemString("value", 0)); err != nil {
Expand Down Expand Up @@ -343,7 +375,7 @@ func TestPriorityQueuePeekByOffsetBoundsDesc(t *testing.T) {

_, err = pq.PeekByOffset(0)
if err != ErrEmpty {
t.Errorf("Expected to get queue empty error, got %s", err.Error())
t.Errorf("Expected to get empty error, got %s", err.Error())
}

if err = pq.Enqueue(NewPriorityItemString("value", 0)); err != nil {
Expand Down Expand Up @@ -759,7 +791,7 @@ func TestPriorityQueueEmpty(t *testing.T) {

_, err = pq.Dequeue()
if err != ErrEmpty {
t.Errorf("Expected to get queue empty error, got %s", err.Error())
t.Errorf("Expected to get empty error, got %s", err.Error())
}
}

Expand Down
41 changes: 41 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (q *Queue) Enqueue(item *Item) error {
q.Lock()
defer q.Unlock()

// If queue is already closed.
if !q.isOpen {
return ErrDBClosed
}

// Set item ID and key.
item.ID = q.tail + 1
item.Key = idToKey(item.ID)
Expand All @@ -74,6 +79,11 @@ func (q *Queue) Dequeue() (*Item, error) {
q.Lock()
defer q.Unlock()

// If queue is already closed.
if !q.isOpen {
return nil, ErrDBClosed
}

// Try to get the next item in the queue.
item, err := q.getItemByID(q.head + 1)
if err != nil {
Expand All @@ -95,6 +105,12 @@ func (q *Queue) Dequeue() (*Item, error) {
func (q *Queue) Peek() (*Item, error) {
q.RLock()
defer q.RUnlock()

// If queue is already closed.
if !q.isOpen {
return nil, ErrDBClosed
}

return q.getItemByID(q.head + 1)
}

Expand All @@ -103,20 +119,38 @@ func (q *Queue) Peek() (*Item, error) {
func (q *Queue) PeekByOffset(offset uint64) (*Item, error) {
q.RLock()
defer q.RUnlock()

// If queue is already closed.
if !q.isOpen {
return nil, ErrDBClosed
}

return q.getItemByID(q.head + offset + 1)
}

// PeekByID returns the item with the given ID without removing it.
func (q *Queue) PeekByID(id uint64) (*Item, error) {
q.RLock()
defer q.RUnlock()

// If queue is already closed.
if !q.isOpen {
return nil, ErrDBClosed
}

return q.getItemByID(id)
}

// Update updates an item in the queue without changing its position.
func (q *Queue) Update(item *Item, newValue []byte) error {
q.Lock()
defer q.Unlock()

// If queue is already closed.
if !q.isOpen {
return ErrDBClosed
}

item.Value = newValue
return q.db.Put(item.Key, item.Value, nil)
}
Expand All @@ -134,11 +168,18 @@ func (q *Queue) Length() uint64 {

// Close closes the LevelDB database of the queue.
func (q *Queue) Close() {
q.Lock()
defer q.Unlock()

// If queue is already closed.
if !q.isOpen {
return
}

// Reset queue head and tail.
q.head = 0
q.tail = 0

q.db.Close()
q.isOpen = false
}
Expand Down
35 changes: 31 additions & 4 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,33 @@ import (
"time"
)

func TestQueueClose(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
q, err := OpenQueue(file)
if err != nil {
t.Error(err)
}
defer q.Drop()

if err = q.Enqueue(NewItemString("value")); err != nil {
t.Error(err)
}

if q.Length() != 1 {
t.Errorf("Expected queue length of 1, got %d", q.Length())
}

q.Close()

if _, err = q.Dequeue(); err != ErrDBClosed {
t.Errorf("Expected to get database closed error, got %s", err.Error())
}

if q.Length() != 0 {
t.Errorf("Expected queue length of 0, got %d", q.Length())
}
}

func TestQueueDrop(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
q, err := OpenQueue(file)
Expand Down Expand Up @@ -35,7 +62,7 @@ func TestQueueIncompatibleType(t *testing.T) {
pq.Close()

if _, err = OpenQueue(file); err != ErrIncompatibleType {
t.Error("Expected priority queue to return ErrIncompatibleTypes when opening Queue")
t.Error("Expected priority queue to return ErrIncompatibleTypes when opening goquePriorityQueue")
}
}

Expand Down Expand Up @@ -75,7 +102,7 @@ func TestQueueDequeue(t *testing.T) {
}

if q.Length() != 10 {
t.Errorf("Expected queue length of 1, got %d", q.Length())
t.Errorf("Expected queue length of 10, got %d", q.Length())
}

deqItem, err := q.Dequeue()
Expand All @@ -84,7 +111,7 @@ func TestQueueDequeue(t *testing.T) {
}

if q.Length() != 9 {
t.Errorf("Expected queue length of 0, got %d", q.Length())
t.Errorf("Expected queue length of 9, got %d", q.Length())
}

compStr := "value for item 1"
Expand Down Expand Up @@ -314,7 +341,7 @@ func TestQueueEmpty(t *testing.T) {

_, err = q.Dequeue()
if err != ErrEmpty {
t.Errorf("Expected to get queue empty error, got %s", err.Error())
t.Errorf("Expected to get empty error, got %s", err.Error())
}
}

Expand Down
Loading

0 comments on commit 39d78f4

Please sign in to comment.