Skip to content

Commit

Permalink
Monitor file : use last data in file database instead of ion and ctime
Browse files Browse the repository at this point in the history
  • Loading branch information
Bertrand Paquet committed Oct 7, 2012
1 parent a32a945 commit 4525ddf
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 46 deletions.
53 changes: 29 additions & 24 deletions lib/lib/monitor_file.js
Expand Up @@ -37,10 +37,9 @@ MonitoredFile.prototype.close = function(callback) {
this.dir_watcher = undefined;
}
if (this.fdTailer) {
this.fdTailer.close(function() {
this.fdTailer.close(function(last_data) {
file_status[this.filename] = {
ino: this.ino,
ctime: this.ctime,
last_data: last_data,
index: this.fdTailer.current_index,
};
callback();
Expand Down Expand Up @@ -77,15 +76,8 @@ MonitoredFile.prototype.start = function(start_index) {
if (this.fdTailer) {
logger.debug('File', this.filename, 'changed');
this.fdTailer.read(function() {
fs.stat(this.filename, function(err, stats) {
if (err) {
return this.emit('error', err);
}
if (stats.ino != this.ino || stats.ctime.getTime() != this.ctime) {
logger.info('File', this.filename, 'has been rewritten, restart');
this.restart(0);
}
}.bind(this));
logger.info('File', this.filename, 'has changed, but no data read. Restart.');
this.restart(0);
}.bind(this));
}
});
Expand Down Expand Up @@ -122,27 +114,39 @@ MonitoredFile.prototype.restart = function(start_index) {
if (err) {
return this.emit('error', err);
}
this.ino = stats.ino;
this.ctime = stats.ctime.getTime();
if (file_status[this.filename]) {
if (file_status[this.filename].ino == this.ino && file_status[this.filename].ctime == this.ctime && file_status[this.filename].index <= stats.size) {
logger.info('Start from last read index', this.filename, 'at', file_status[this.filename].index, ', fd', fd);
this.fdTailer = new FdTailer(fd, file_status[this.filename].index, this.options, this);
this.fdTailer.read();
var last_data = file_status[this.filename].last_data;
if (file_status[this.filename].index >= last_data.length && stats.size > last_data.length) {
var buffer = new Buffer(last_data.length);
fs.read(fd, buffer, 0, last_data.length, file_status[this.filename].index - last_data.length, function(err, bytesRead, buffer) {
if (err) {
return this.emit('error', err);
}
if (bytesRead == last_data.length && last_data == buffer.toString(this.options.buffer_encoding || 'ascii', 0, last_data.length)) {
logger.info('Start from last read index', this.filename, 'at', file_status[this.filename].index, 'fd', fd);
this.fdTailer = new FdTailer(fd, file_status[this.filename].index, this.options, this);
this.fdTailer.read();
}
else {
logger.info('Have last read index, but last data are not correct.', 'Start reading', this.filename, 'at end fd', fd);
this.fdTailer = new FdTailer(fd, stats.size, this.options, this);
this.fdTailer.read();
}
}.bind(this));
}
else {
logger.info('Have last read index, but file has changed.', 'Start reading', this.filename, 'at 0, fd', fd);
this.fdTailer = new FdTailer(fd, 0, this.options, this);
logger.info('Have last read index, but file is too small.', 'Start reading', this.filename, 'at end fd', fd);
this.fdTailer = new FdTailer(fd, stats.size, this.options, this);
this.fdTailer.read();
}
}
else {
if (start_index === undefined) {
logger.info('Start reading', this.filename, 'at end', ', fd', fd);
logger.info('Start reading', this.filename, 'at end', 'fd', fd);
this.fdTailer = new FdTailer(fd, stats.size, this.options, this);
}
else {
logger.info('Start reading', this.filename, 'at', start_index, ', fd', fd);
logger.info('Start reading', this.filename, 'at', start_index, 'fd', fd);
this.fdTailer = new FdTailer(fd, start_index, this.options, this);
this.fdTailer.read();
}
Expand Down Expand Up @@ -206,7 +210,7 @@ FdTailer.prototype.close = function(callback) {
if (err) {
this.event_target.emit('error', err);
}
callback();
callback(this.last_data);
}.bind(this));
}

Expand Down Expand Up @@ -244,7 +248,8 @@ FdTailer.prototype.read = function(callback_nothing_read) {
}

FdTailer.prototype.handle = function(length) {
this.to_be_processed += this.buffer.toString(this.buffer_encoding, 0, length);
this.last_data = this.buffer.toString(this.buffer_encoding, 0, length);
this.to_be_processed += this.last_data;
while (true) {
index = this.to_be_processed.indexOf('\n');
if (index == -1) {
Expand Down
88 changes: 66 additions & 22 deletions test/test_30_monitor_file.js
Expand Up @@ -243,8 +243,8 @@ vows.describe('Monitor ').addBatch({
}, function check(m) {
no_error(m);
assert.deepEqual(m.lines, ['line1', 'line2', 'line3']);
}
),
},
undefined, {wait_delay_after_renaming: 100}),
}).addBatch({
'Incomplete line': create_test(function(m, callback) {
fs.writeFileSync(m.file, 'line1\nline2\nline3');
Expand Down Expand Up @@ -421,17 +421,19 @@ vows.describe('Monitor ').addBatch({
fs.appendFileSync(m1.file, 'line1\nline2\n');
setTimeout(function() {
m1.monitor.close(function() {
fs.appendFileSync(m1.file, 'line3\nline4\n');
var m2 = new TestMonitor(m1.file);
m2.monitor.start();
setTimeout(function() {
fs.appendFileSync(m1.file, 'line5\nline6\n');
fs.appendFileSync(m1.file, 'line3\nline4\n');
var m2 = new TestMonitor(m1.file);
m2.monitor.start();
setTimeout(function() {
m2.monitor.close(function() {
callback(undefined, m1, m2);
fs.appendFileSync(m1.file, 'line5\nline6\n');
setTimeout(function() {
m2.monitor.close(function() {
callback(undefined, m1, m2);
});
});
});
}, 200);
}, 200);
}, 500);
});
}, 200);
}, 200);
Expand All @@ -449,7 +451,7 @@ vows.describe('Monitor ').addBatch({
}
}
}).addBatch({
'Monitor restart with write while restart, inode change': {
'Monitor restart with write while restart, in a new file, too short': {
topic: function() {
var callback = this.callback;
var m1 = new TestMonitor(randomFile());
Expand All @@ -458,20 +460,22 @@ vows.describe('Monitor ').addBatch({
fs.appendFileSync(m1.file, 'line1\nline2\n');
setTimeout(function() {
m1.monitor.close(function() {
fs.unlinkSync(m1.file);
fs.appendFileSync(m1.file, 'line3\nline4\n');
var m2 = new TestMonitor(m1.file);
m2.monitor.start();
setTimeout(function() {
fs.appendFileSync(m1.file, 'line5\nline6\n');
fs.unlinkSync(m1.file);
fs.appendFileSync(m1.file, 'line3\n');
var m2 = new TestMonitor(m1.file);
m2.monitor.start();
setTimeout(function() {
m2.monitor.close(function() {
callback(undefined, m1, m2);
fs.appendFileSync(m1.file, 'line4\nline5\n');
setTimeout(function() {
m2.monitor.close(function() {
callback(undefined, m1, m2);
});
});
});
}, 200);
}, 200);
}, 500);
});
}, 1500);
}, 200);
}, 200);
},

Expand All @@ -482,7 +486,47 @@ vows.describe('Monitor ').addBatch({
no_error(m2);
assert.deepEqual(m1.lines, ['line1', 'line2']);
assert.equal(m1.changed_counter, 1);
assert.deepEqual(m2.lines, ['line3', 'line4', 'line5', 'line6']);
assert.deepEqual(m2.lines, ['line4', 'line5']);
assert.equal(m2.changed_counter, 1);
}
}
}).addBatch({
'Monitor restart with write while restart, in a new file, content not correct': {
topic: function() {
var callback = this.callback;
var m1 = new TestMonitor(randomFile());
m1.monitor.start();
setTimeout(function() {
fs.appendFileSync(m1.file, 'line1\nline2\n');
setTimeout(function() {
m1.monitor.close(function() {
setTimeout(function() {
fs.unlinkSync(m1.file);
fs.appendFileSync(m1.file, 'line3\nline4\nline5\n');
var m2 = new TestMonitor(m1.file);
m2.monitor.start();
setTimeout(function() {
fs.appendFileSync(m1.file, 'line6\nline7\n');
setTimeout(function() {
m2.monitor.close(function() {
callback(undefined, m1, m2);
});
});
}, 200);
}, 500);
});
}, 200);
}, 200);
},

check: function(err, m1, m2) {
assert.ifError(err);
fs.unlinkSync(m1.file);
no_error(m1);
no_error(m2);
assert.deepEqual(m1.lines, ['line1', 'line2']);
assert.equal(m1.changed_counter, 1);
assert.deepEqual(m2.lines, ['line6', 'line7']);
assert.equal(m2.changed_counter, 1);
}
}
Expand Down

0 comments on commit 4525ddf

Please sign in to comment.