Quick file-based fifo to buffer newline terminated strings, designed to be a very low overhead, very fast local journal that can both quickly persist large volumes of data and efficiently feed the ingest process that consumes it.
- Simple
open
/getline
/putline
/close
semantics. - Node
readline
emulation. - Synchronous calls, asynchronous file i/o.
- Efficient batched file reads and writes.
- Very high throughput, hundreds of megabytes of data per second.
- Universally supported, very easy, very fast newline terminated plaintext data format.
- Metadata is optional, and is kept alongside in a separate file. Only the consumer might need metadata.
- No external dependencies.
- Tested to work with node-v0.6 and up.
- QFifos are inherently single-reader, the consuming process owns the fifo.
- Single-writer; concurrent writes are not supported. (File locking is missing from nodejs, so this incarnation differs from the quicklib version it was based on.)
Create a fifo for reading or appending the named fifo file.
Fifos operate in read mode or append mode. In 'a'
append-only mode and 'a+'
read-write
mode fifos are always written to at the end, with missing fifos created; in 'r'
read mode it
is an error if the fifo does not already exist. The 'r+'
mode is accepted as a performance
optimization (see Options below), but should not be used for writing. For exclusive read-only
or append-only access, omit the +
from the mode flag.
NOTE: Reading fifos is always safe; writing 'r+'
read-mode fifos is currently not supported.
For now (as of version 0.7.0) always write fifos in 'a'
or 'a+'
append mode to ensure that
data is not overwritten.
Creating a new QFifo is a fast, it only allocates the object and initializes state; the filesystem
is only accessed on open
(but note that getline
, putline
and readlines
automatically open
the fifo).
Options may contain the following settings:
flag
: open mode flag forfs.open()
, default'r'
. The flag may be'r'
for read-only,'a'
for append-only, or'a+'
for read-write.'r+'
is allowed as an optimization for faster in-file header rsyncs, but should not be used for data writes.readSize
: how many bytes to read at a time, default 64K.writeSize
: how many pending chars trigger an immediate write instead of waitingwriteDelay
ms, default 16K. Set to0
zero to always write immediately.writeDelay
: write-combining optimization, how many ms to wait for more data to arrive before writing to file, default2
ms.updatePosition
: whether to updatefifo.position
with the byte offset in the file of the next line to be read. Defaulttrue
.position
is needed to checkpoint the read state, but omitting it is 25% faster (for e.g. when the whole file is consumed without checkpointing).reopenInterval
: how frequently to reopen the fifo file, -1 never. Default every 20 ms to ensure that writes will cease after a grab or an external file rename.
Open the file for use by the fifo. Returns the file descriptor used or the open error.
The fifo has no lines yet after the open, reading is started by the first getline()
.
It is safe to open the fifo more than once, the duplicate calls are harmless. Note that
getline
and putline
will open the file if it hasn't been already.
Close the file descriptor. The fifo will not be usable until it is reopened. Close is not
synchronized with reads/writes, closing the fifo while still in use will produce and error. Sync
data with flush()
and/or read state with rsync()
first. If a callback
is provided it will
be invoked once the files are closed.
Return the next unread line from the file or the empty string ''
if there are no new lines
available. Reading is asynchronous, it is done in batches in the background, so lines may
become available later. Always returns a utf8 string. The fifo.eof
flag is set to indicate
that zero bytes were read from the file. Retrying the read may clear the eof flag. Read errors
are saved in fifo.error
and stop the file being read.
var readFile(filename, callback) {
var fifo = new QFifo(filename, { flag: 'r', readSize: 256 * 1024 });
var contents = '';
(function readLines() {
for (var i=0; i<100; i++) contents += fifo.getline();
if (fifo.error || fifo.eof) callback(fifo.error, contents);
// yield the cpu so the fifo can read more of the file
else setTimeout(readLines);
})();
}
Checkpoint the byte offset of the next unread line from fifo.position
so if the fifo is
reopened it can resume reading where it left off. The information is saved in JSON form to a
separate file named the same as the fifo filename
but with '.hd'
appended.
Append a line of text to the file. All lines must be newline terminated; putline will supply
the newline if it is missing. Putline is a non-blocking call that buffers the data and returns
immediately. Writing is is done in batches asychronously in the background. The application
must periodically yield the cpu for the writes to happen. Any write errors are saved in
fifo.error
and no further writes will be performed. The optional callback
function will be
called after the line has been written.
Internal fifo append method that does not mandate newlines. Use with caution.
Invoke callback() once the currently buffered writes have completed. New writes made after the flush call are not waited for (but may still have been included in the batch).
Loop getline() and call visitor
with each line in the fifo. fifo.eof
will be set once the
fifo is empty and has no more lines. If the fifo is appended, eof
is cleared but the
readlines loop is not restarted. Note that readlines
and getline
return lines that include
the terminating newline, which differs from node readline
that strips them.
NOTE: This call is not reentrant, only a single function may be reading the fifo at a time.
Calling readlines() with a new visitor
displaces the first function.
fifo.readlines(function(line)) {
// if (fifo.eof) then no more lines
}
Suspend the readlines
loop, do not deliver any more lines until resumed.
Resume the readlines
loop, start delivering lines again.
Copy the unread portion of the fifo to the start of the file. This call assumes that all lines read have been successfully handled, and checkpoints the current read position, ie implicitly does an rsync. The fifo should not be read or written until this call completes.
Options:
minSize
only compact if file has grown to this many bytes, default 1 millionminReadRatio
only compact if this fraction of the file has been read, default 2/3readSize
copy in chunks of this many bytes, default fifooptions.readSize
(64K)dstOffset
how many bytes to leave without overwriting at the start of the file
Copy the data bytes read from the file descriptor srcFd
from between srcOffset
and
srcLimit
into the file descriptor dstFd
starting at offset dstOffset
. Buff
must be an
appropriately sized Buffer to hold the data chunks as they are processed, typically between 8
and 64 KB. Calls callback with the number of bytes copied. Pass srcLimit = Infinity
to
copy the whole file.
Remove the fifo file and its header. The fifo remains readable and writable until closed. Returns an error if the fifo file or header cannot be removed. It is not an error for the header to not exist.
Rename the fifo file. The fifo is not closed, it remains readable and writable. Returns an error if the rename fails or the fifo file does not exist. It is not an error for the header file to not exist.
Helper method to rename the filename
to filename.1
. If filename.1
already exists, rename
it to filename.2
and so on for all older versions of filename
. When done, calls callback
with the first error encountered, an array with all rename errors, and the list of successfully
renamed filenames.
Return the match results of all filenames in the directory that satisfy the regular expression.
The match results are produced by filename.match(regex)
so that match[0]
is always the
original filename. The matches are not sorted.
Helper method to help process items in batches. Returns a function to be called with each item,
and processBatch
will be called once with each batch of items.
var processItem = fifo.batchCalls(function processBatch(batch, cb) {
// first call => [1, 2]
// second call => [3]
cb();
}, { maxBatchSize: 2 });
processItem(1);
processItem(2);
processItem(3);
Options:
maxWaitMs
- how long to wait for more items before processing the batch. Default 0, to process immediately.maxBatchSize
- the cap on how large a batch may grow before it will be processed. Default 10.startBatch
- function to call withmaxBatchSize
to obtain an empty batch. The empty batch is populated withpush(item)
or withgrowBatch(item)
. Default empty batch is an empty array[]
.growBatch
- function to call to add an item to the batch, called withbatch
anditem
. Default isbatch.push
.
Byte offset of the next unseen line in the input file.
Read or write error that was encountered. Either stops the fifo. Errors also set fifo.eof
so
loops that check just eof
still terminate once no more data is forthcoming.
Flag set when the fifo contains no more lines, ie when the end of the file has been reached and
no more lines are left in the buffer. Appending lines to the fifo and retrying the read clears
the eof
flag.
preopen
option- make
r
-modeopen
run_getmore
to prefetch and wait for results before returning - allow for streaming Buffers straight to file
- maybe queue pending writes and have _writesome() write the queued parts
- optional
concurrent
true/false batchCalls config setting (default true) - harvest idle file descriptors whose reopenTime has expired
- 0.7.0 - fixes, optional callback to
close
, experimental support for in-file headers, much faster rsync forr+
mode fifos, fixflush
callbacks, optional callback toputline()
- 0.6.0 -
rename
method,remove
method,compact
method, move batchCalls options to front - 0.5.0 -
matchFiles
method, experimentalreopenInterval
option - 0.4.2 -
rotateFiles
helper, fledgelingbatchCalls
helper - 0.3.0 -
readlines/pause/resume
methods,updatePosition
option for faster reading, seteof
only when no more lines available - 0.2.2 - allow writing Buffers, space-pad header files
- 0.2.1 - constructor
options
, pass-throughoptions.flag
, auto-open on fifo read/write, faster rsync - 0.1.0 - first version