Skip to content

Commit

Permalink
perf(pull): Read atvise server nodes in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasHechenberger committed Mar 18, 2017
1 parent b0aef0c commit 5a34ef0
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 93 deletions.
66 changes: 29 additions & 37 deletions src/lib/server/ReadStream.js
@@ -1,55 +1,47 @@
import { StatusCodes } from 'node-opcua';
import Stream from './Stream';
import QueueStream from './QueueStream';

/**
* An object transform stream that reads the written {@link NodeId}s.
* A stream that reads atvise server nodes for the {@link node-opcua~ReferenceDescription}s passed.
*/
export default class ReadStream extends Stream {
export default class ReadStream extends QueueStream {

/**
* Reads the given node.
* @param {node-opcua~ReferenceDescription} referenceDescription The reference description of the
* node to read from.
* @param {function(err: ?Error, data: ?ReadStream.ReadResult)} callback Called with the error
* that occurred, or the read results the read results otherwise.
* Returns an error message specifically for the given reference description.
* @param {node-opcua~ReferenceDescription} referenceDescription The reference description to get
* the error message for.
* @return {String} The specific error message.
*/
readNode(referenceDescription, callback) {
processErrorMessage(referenceDescription) {
return `Error reading node ${referenceDescription.nodeId.toString()}`;
}

/**
* Returns a {ReadStream.ReadResult} for the given reference description.
* @param {node-opcua~ReferenceDescription} referenceDescription The reference description to read
* the atvise server node for.
* @param {function(err: Error, statusCode: node-opcua~StatusCodes, onSuccess: function)}
* handleErrors The error handler to call. See {@link QueueStream#processChunk} for details.
*/
processChunk(referenceDescription, handleErrors) {
const nodeId = referenceDescription.nodeId;

this.session.read([{ nodeId }], (err, nodesToRead, results) => {
if (err) {
callback(new Error(`Reading ${nodeId.toString()} failed: ${err.message}`));
} else if (!results || results.length === 0) {
callback(new Error(`Reading ${nodeId.toString()} failed: No results`));
} else if (results[0].statusCode !== StatusCodes.Good) {
callback(new Error(`Reading ${nodeId.toString()} failed: Status ${results[0].statusCode}`));
if (!err && (!results || results.length === 0)) {
handleErrors(new Error('No results'));
} else {
callback(null, {
nodeId,
value: results[0].value,
referenceDescription,
mtime: results[0].sourceTimestamp,
handleErrors(err, results && results.length > 0 ? results[0].statusCode : null, done => {
this.push({
nodeId,
value: results[0].value,
referenceDescription,
mtime: results[0].sourceTimestamp,
});
done();
});
}
});
}

/**
* Calls {@link ReadStream#readNode} once the session is open for the passed node.
* @param {NodeId} nodeId The node to read.
* @param {String} enc The encoding used.
* @param {function(err: ?Error, data: ?Object)} callback Called by {@link ReadStream#readNode}
* once reading ended.
* @listens {Session} Listens to the `session-open`-event if the session is not open yet.
*/
_transform(nodeId, enc, callback) {
if (this.session) {
this.readNode(nodeId, callback);
} else {
this.once('session-open', () => this.readNode(nodeId, callback));
}
}

}

/**
Expand Down
101 changes: 45 additions & 56 deletions test/src/lib/server/ReadStream.spec.js
@@ -1,90 +1,79 @@
import { spy, stub } from 'sinon';
import { StatusCodes } from 'node-opcua';
import expect from '../../../expect';

import NodeId from '../../../../src/lib/server/NodeId';
import ReadStream from '../../../../src/lib/server/ReadStream';

/** @test {ReadStream} */
describe('ReadStream', function() {
const validNodeId = new NodeId('ns=0;i=2262'); // ProductUri
/** @test {ReadStream#processErrorMessage} */
describe('#processErrorMessage', function() {
it('should include node id', function() {
const nodeId = new NodeId('ns=1;s=AGENT.DISPLAYS.Main');
expect(ReadStream.prototype.processErrorMessage({ nodeId }), 'to contain', nodeId.value);
});
});

/** @test {ReadStream#readNode} */
describe('#readNode', function() {
it('should fail if an error occurs', function() {
/** @test {ReadStream#processChunk} */
describe('#processChunk', function() {
it('should error without results', function() {
const stream = new ReadStream();

stream.once('session-open', () => {
stream.session.read = (node, cb) => cb(new Error('Failed'));
stream.session.read = (node, cb) => cb(null, [node], undefined);
});

return expect([{ nodeId: validNodeId }], 'when piped through', stream,
'to error with', `Reading ${validNodeId.toString()} failed: Failed`);
return expect([{ nodeId: new NodeId('ns=1;s=AGENT.DISPLAYS.Main') }],
'when piped through', stream,
'to error with', /No results/);
});

it('should with no or empty results', function() {
it('should error with empty results', function() {
const stream = new ReadStream();

stream.once('session-open', () => {
stream.session.read = (node, cb) => cb(null, []);
stream.session.read = (node, cb) => cb(null, [node], []);
});

return expect([{ nodeId: validNodeId }], 'when piped through', stream,
'to error with', `Reading ${validNodeId.toString()} failed: No results`);
return expect([{ nodeId: new NodeId('ns=1;s=AGENT.DISPLAYS.Main') }],
'when piped through', stream,
'to error with', /No results/);
});

it('should fail with non-good status code', function() {
it('should error when reading fails', function() {
const stream = new ReadStream();
const nodeId = new NodeId('ns=123;i=2262'); // This node does not exist

return expect([{ nodeId }], 'when piped through', stream,
'to error with', /Reading ns=123;i=2262 failed: Status BadNodeIdUnknown/);
});

it('should read variables', function() {
const stream = new ReadStream();

return expect([{ nodeId: validNodeId }], 'when piped through', stream,
'to yield objects satisfying', [{
nodeId: validNodeId,
value: { value: 'http://www.atvise.com' },
}]);
});
});

/** @test {ReadStream#_transform} */
describe('#_transform', function() {
it('should wait for session to open', function(done) {
const stream = new ReadStream();
stub(stream, 'readNode', (node, cb) => cb(null));
spy(stream, '_transform');

stream.on('data', () => {}); // Unpause readable stream
stream.write(new NodeId('ns=1;s=AGENT.DISPLAYS'));

expect(stream._transform.calledOnce, 'to be', true);
expect(stream.readNode.callCount, 'to equal', 0);

stream.once('end', () => {
expect(stream._transform.calledOnce, 'to be', true);
expect(stream.readNode.calledOnce, 'to be', true);
done();
stream.once('session-open', () => {
stream.session.read = (node, cb) => cb(new Error('Test'), [node], []);
});
stream.end();

return expect([{ nodeId: new NodeId('ns=1;s=AGENT.DISPLAYS.Main') }],
'when piped through', stream,
'to error with', /Test/);
});

it('should read immediate if session is open', function(done) {
it('should push result when reading succeeds', function() {
const stream = new ReadStream();
stub(stream, 'readNode', (node, cb) => cb(null));

stream.on('data', () => {}); // Unpause readable stream
const result = {
value: 'test',
sourceTimestamp: new Date(),
statusCode: StatusCodes.Good,
};

stream.once('session-open', () => {
stream.write(new NodeId('ns=1;s=AGENT.DISPLAYS'));
expect(stream.readNode.calledOnce, 'to be true');
stream.end();
stream.session.read = (node, cb) => cb(null, [node], [result]);
});

stream.on('end', done);
const nodeId = new NodeId('ns=1;s=AGENT.DISPLAYS.Main');
return expect([{ nodeId }],
'when piped through', stream,
'to yield objects satisfying', [
expect.it('to equal', {
nodeId,
value: result.value,
referenceDescription: { nodeId },
mtime: result.sourceTimestamp,
}),
]);
});
});
});

0 comments on commit 5a34ef0

Please sign in to comment.