Permalink
Browse files

expose needed disk queue stuff to other packages

  • Loading branch information...
Dieterbe committed Aug 4, 2014
1 parent 293ca31 commit 4d6ebb37451ce6e05b606ea4ba6221611d367f71
Showing with 25 additions and 25 deletions.
  1. +25 −25 nsqd/diskqueue.go
View
@@ -15,9 +15,9 @@ import (
"time"
)
// diskQueue implements the BackendQueue interface
// DiskQueue implements the BackendQueue interface
// providing a filesystem backed FIFO queue
type diskQueue struct {
type DiskQueue struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
// run-time state (also persisted to disk)
@@ -60,10 +60,10 @@ type diskQueue struct {
exitSyncChan chan int
}
// newDiskQueue instantiates a new instance of diskQueue, retrieving metadata
// NewDiskQueue instantiates a new instance of DiskQueue, retrieving metadata
// from the filesystem and starting the read ahead goroutine
func newDiskQueue(name string, dataPath string, maxBytesPerFile int64, syncEvery int64, syncTimeout time.Duration) BackendQueue {
d := diskQueue{
func NewDiskQueue(name string, dataPath string, maxBytesPerFile int64, syncEvery int64, syncTimeout time.Duration) BackendQueue {
d := DiskQueue{
name: name,
dataPath: dataPath,
maxBytesPerFile: maxBytesPerFile,
@@ -90,17 +90,17 @@ func newDiskQueue(name string, dataPath string, maxBytesPerFile int64, syncEvery
}
// Depth returns the depth of the queue
func (d *diskQueue) Depth() int64 {
func (d *DiskQueue) Depth() int64 {
return atomic.LoadInt64(&d.depth)
}
// ReadChan returns the []byte channel for reading data
func (d *diskQueue) ReadChan() chan []byte {
func (d *DiskQueue) ReadChan() chan []byte {
return d.readChan
}
// Put writes a []byte to the queue
func (d *diskQueue) Put(data []byte) error {
func (d *DiskQueue) Put(data []byte) error {
d.RLock()
defer d.RUnlock()
@@ -113,19 +113,19 @@ func (d *diskQueue) Put(data []byte) error {
}
// Close cleans up the queue and persists metadata
func (d *diskQueue) Close() error {
func (d *DiskQueue) Close() error {
err := d.exit(false)
if err != nil {
return err
}
return d.sync()
}
func (d *diskQueue) Delete() error {
func (d *DiskQueue) Delete() error {
return d.exit(true)
}
func (d *diskQueue) exit(deleted bool) error {
func (d *DiskQueue) exit(deleted bool) error {
d.Lock()
defer d.Unlock()
@@ -156,7 +156,7 @@ func (d *diskQueue) exit(deleted bool) error {
// Empty destructively clears out any pending data in the queue
// by fast forwarding read positions and removing intermediate files
func (d *diskQueue) Empty() error {
func (d *DiskQueue) Empty() error {
d.RLock()
defer d.RUnlock()
@@ -170,7 +170,7 @@ func (d *diskQueue) Empty() error {
return <-d.emptyResponseChan
}
func (d *diskQueue) deleteAllFiles() error {
func (d *DiskQueue) deleteAllFiles() error {
err := d.skipToNextRWFile()
innerErr := os.Remove(d.metaDataFileName())
@@ -182,7 +182,7 @@ func (d *diskQueue) deleteAllFiles() error {
return err
}
func (d *diskQueue) skipToNextRWFile() error {
func (d *DiskQueue) skipToNextRWFile() error {
var err error
if d.readFile != nil {
@@ -217,7 +217,7 @@ func (d *diskQueue) skipToNextRWFile() error {
// readOne performs a low level filesystem read for a single []byte
// while advancing read positions and rolling files, if necessary
func (d *diskQueue) readOne() ([]byte, error) {
func (d *DiskQueue) readOne() ([]byte, error) {
var err error
var msgSize int32
@@ -282,7 +282,7 @@ func (d *diskQueue) readOne() ([]byte, error) {
// writeOne performs a low level filesystem write for a single []byte
// while advancing write positions and rolling files, if necessary
func (d *diskQueue) writeOne(data []byte) error {
func (d *DiskQueue) writeOne(data []byte) error {
var err error
if d.writeFile == nil {
@@ -349,7 +349,7 @@ func (d *diskQueue) writeOne(data []byte) error {
}
// sync fsyncs the current writeFile and persists metadata
func (d *diskQueue) sync() error {
func (d *DiskQueue) sync() error {
if d.writeFile != nil {
err := d.writeFile.Sync()
if err != nil {
@@ -369,7 +369,7 @@ func (d *diskQueue) sync() error {
}
// retrieveMetaData initializes state from the filesystem
func (d *diskQueue) retrieveMetaData() error {
func (d *DiskQueue) retrieveMetaData() error {
var f *os.File
var err error
@@ -396,7 +396,7 @@ func (d *diskQueue) retrieveMetaData() error {
}
// persistMetaData atomically writes state to the filesystem
func (d *diskQueue) persistMetaData() error {
func (d *DiskQueue) persistMetaData() error {
var f *os.File
var err error
@@ -424,15 +424,15 @@ func (d *diskQueue) persistMetaData() error {
return atomic_rename(tmpFileName, fileName)
}
func (d *diskQueue) metaDataFileName() string {
func (d *DiskQueue) metaDataFileName() string {
return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.meta.dat"), d.name)
}
func (d *diskQueue) fileName(fileNum int64) string {
func (d *DiskQueue) fileName(fileNum int64) string {
return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum)
}
func (d *diskQueue) checkTailCorruption(depth int64) {
func (d *DiskQueue) checkTailCorruption(depth int64) {
if d.readFileNum < d.writeFileNum || d.readPos < d.writePos {
return
}
@@ -464,7 +464,7 @@ func (d *diskQueue) checkTailCorruption(depth int64) {
}
}
func (d *diskQueue) moveForward() {
func (d *DiskQueue) moveForward() {
oldReadFileNum := d.readFileNum
d.readFileNum = d.nextReadFileNum
d.readPos = d.nextReadPos
@@ -485,7 +485,7 @@ func (d *diskQueue) moveForward() {
d.checkTailCorruption(depth)
}
func (d *diskQueue) handleReadError() {
func (d *DiskQueue) handleReadError() {
// jump to the next read file and rename the current (bad) file
if d.readFileNum == d.writeFileNum {
// if you can't properly read from the current write file it's safe to
@@ -525,7 +525,7 @@ func (d *diskQueue) handleReadError() {
// go channels
//
// conveniently this also means that we're asynchronously reading from the filesystem
func (d *diskQueue) ioLoop() {
func (d *DiskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64

0 comments on commit 4d6ebb3

Please sign in to comment.