Skip to content

Commit

Permalink
continue monitored item
Browse files Browse the repository at this point in the history
  • Loading branch information
erossignon committed Mar 30, 2014
1 parent 02db447 commit 5cd91bc
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 27 deletions.
20 changes: 17 additions & 3 deletions lib/server/monitored_item.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ function MonitoredItem(options){
assert(_.isFinite(options.samplingInterval));
assert(_.isBoolean(options.discardOldest));
assert(_.isFinite(options.queueSize));

assert(options.queueSize>0);

var self = this;
self.clientHandle = options.clientHandle;
self.samplingInterval = options.samplingInterval;
Expand Down Expand Up @@ -55,8 +56,7 @@ MonitoredItem.prototype._on_sampling_timer = function() {
self.emit("samplingEvent",self.oldValue);
};

MonitoredItem.prototype.recordValue = function (variant,sourceTimestamp,sourcePicoseconds)
{
MonitoredItem.prototype.recordValue = function (variant,sourceTimestamp,sourcePicoseconds) {
var self = this;
// store last value
self.oldValue = variant;
Expand Down Expand Up @@ -94,5 +94,19 @@ MonitoredItem.prototype.recordValue = function (variant,sourceTimestamp,sourcePi

};

MonitoredItem.prototype.extractMonitoredItemNotifications = function() {
// MonitoredItemNotification
var self = this;
var notifications = self.queue.map( function(dataValue){
return { clientHandle: self.clientHandle, value: dataValue}
});
// empty queue
self.queue = [];

return notifications;
};




exports.MonitoredItem = MonitoredItem;
4 changes: 2 additions & 2 deletions lib/server/server_publish_engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ ServerSidePublishEngine.prototype.send_keep_alive_response = function (subscript

};

ServerSidePublishEngine.prototype._send_publish_response = function(subscriptionId,sequenceNubmer,notificationData,availableSequenceNumbers){
ServerSidePublishEngine.prototype._send_publish_response = function(subscriptionId,sequenceNumber,notificationData,availableSequenceNumbers){

var self = this;

Expand All @@ -126,7 +126,7 @@ ServerSidePublishEngine.prototype._send_publish_response = function(subscription
availableSequenceNumbers: availableSequenceNumbers,
moreNotifications: false,
notificationMessage: {
sequenceNumber: sequenceNubmer,
sequenceNumber: sequenceNumber,
publishTime: new Date(),
notificationData: notificationData
},
Expand Down
54 changes: 51 additions & 3 deletions lib/server/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ Subscription.prototype._attempt_to_publish_notification = function () {
var self = this;
self.emit("notification");
// var notification = self.popNotificationToSend();
// self.resetKeepAliveCounter();
};

/**
Expand All @@ -125,6 +124,9 @@ Subscription.prototype._tick = function () {
// request a notification update
self.emit("perform_update");

// collect notification from monitored items
self.prepareNotification();

self.increaseLifeTimeCounter();
self.discardOldSentNotifications();

Expand Down Expand Up @@ -221,12 +223,12 @@ Subscription.prototype.addNotificationMessage = function(notificationData) {

var self = this;

assert(_.isArray(notificationData));
assert(_.isObject(notificationData));

var notification_message= new NotificationMessage({
sequenceNumber: self._get_next_sequence_number(),
publishTime: new Date(),
notificationData: notificationData
notificationData: [ notificationData ]
});

assert(notification_message.hasOwnProperty("sequenceNumber"));
Expand Down Expand Up @@ -366,8 +368,20 @@ Subscription.prototype.createMonitoredItem = function(timestampsToReturn,monitor
return monitoredItemCreateResult;
};

/**
*
* @param monitoredItemId
* @returns {MonitoredItem}
*/
Subscription.prototype.getMonitoredItem = function(monitoredItemId) {
assert(_.isFinite(monitoredItemId));
var self = this;
return self.monitoredItems[monitoredItemId];
};

Subscription.prototype.removeMonitoredItem = function(monitoredItemId) {

assert(_.isFinite(monitoredItemId));
var self = this;
if (!self.monitoredItems.hasOwnProperty(monitoredItemId)) {
return StatusCodes.Bad_MonitoredItemIdInvalid;
Expand All @@ -383,4 +397,38 @@ Subscription.prototype.removeMonitoredItem = function(monitoredItemId) {

};

// collect DataChangeNotification
Subscription.prototype.collectDataChangeNotification = function(){

var self = this;
var monitoredItems = [];

var keys = Object.keys(self.monitoredItems);

keys.forEach(function(key){
var monitoredItem = self.monitoredItems[key];
var notifications = monitoredItem.extractMonitoredItemNotifications();
monitoredItems = monitoredItems.concat(notifications);
});

if (monitoredItems.length === 0) {
return null;
}
var DataChangeNotification = subscription_service.DataChangeNotification;

return new DataChangeNotification({
monitoredItems: monitoredItems,
diagnosticInfos: []
});
};

Subscription.prototype.prepareNotification = function(){
var self = this;
var notifications = self.collectDataChangeNotification();
if (notifications) {
self.addNotificationMessage(notifications);
}
};


exports.Subscription = Subscription;
22 changes: 11 additions & 11 deletions test/server/test_subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ describe("Subscriptions", function () {
maxKeepAliveCount: 20
});
subscription.on("perform_update", function () {
this.addNotificationMessage([{}]);
this.addNotificationMessage({});
});

var notification_event_spy = sinon.spy();
Expand Down Expand Up @@ -68,7 +68,7 @@ describe("Subscriptions", function () {
expire_event_spy.callCount.should.equal(0);

// a notification finally arrived !
subscription.addNotificationMessage([{}]);
subscription.addNotificationMessage({});

this.clock.tick(subscription.publishingInterval * 4);

Expand All @@ -77,7 +77,7 @@ describe("Subscriptions", function () {
expire_event_spy.callCount.should.equal(0);

// a other notification finally arrived !
subscription.addNotificationMessage([{}]);
subscription.addNotificationMessage({});

this.clock.tick(subscription.publishingInterval * 4);
notification_event_spy.callCount.should.equal(2);
Expand All @@ -95,7 +95,7 @@ describe("Subscriptions", function () {
maxKeepAliveCount: 20
});
subscription.on("perform_update", function () {
this.addNotificationMessage([{}]);
this.addNotificationMessage({});
});

var expire_event_spy = sinon.spy();
Expand All @@ -122,7 +122,7 @@ describe("Subscriptions", function () {
maxKeepAliveCount: 20
});
subscription.on("perform_update", function () {
this.addNotificationMessage([{}]);
this.addNotificationMessage({});
});

var expire_event_spy = sinon.spy();
Expand Down Expand Up @@ -199,7 +199,7 @@ describe("Subscriptions", function () {
});

subscription.pendingNotificationsCount.should.equal(0);
subscription.addNotificationMessage([{}]);
subscription.addNotificationMessage({});
subscription.pendingNotificationsCount.should.equal(1);

});
Expand All @@ -217,7 +217,7 @@ describe("Subscriptions", function () {
subscription.pendingNotificationsCount.should.equal(0);
subscription.sentNotificationsCount.should.equal(0);

subscription.addNotificationMessage([{}]);
subscription.addNotificationMessage({});
subscription.pendingNotificationsCount.should.equal(1);
subscription.sentNotificationsCount.should.equal(0);

Expand All @@ -237,8 +237,8 @@ describe("Subscriptions", function () {
maxKeepAliveCount: 20
});

subscription.addNotificationMessage([{}]);
subscription.addNotificationMessage([{}]);
subscription.addNotificationMessage({});
subscription.addNotificationMessage({});
subscription.pendingNotificationsCount.should.equal(2);
subscription.sentNotificationsCount.should.equal(0);

Expand Down Expand Up @@ -267,13 +267,13 @@ describe("Subscriptions", function () {
maxKeepAliveCount: 20 //
});
// create a notification at t=0
subscription.addNotificationMessage([{}]);
subscription.addNotificationMessage({});
subscription.popNotificationToSend();
subscription.sentNotificationsCount.should.equal(1);

this.clock.tick(1000*5);
// create a notification at t=1000*5
subscription.addNotificationMessage([{}]);
subscription.addNotificationMessage({});
subscription.popNotificationToSend();
subscription.sentNotificationsCount.should.equal(2);

Expand Down
91 changes: 85 additions & 6 deletions test/server/test_subscription_with_monitored_items.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var MonitoredItemCreateRequest = subscription_service.MonitoredItemCreateRequest
var DataType = require("../../lib/variant").DataType;
var DataValue = require("../../lib/datavalue").DataValue;
var Variant = require("../../lib/variant").Variant;
var dump = require("../../lib/utils").dump;



Expand Down Expand Up @@ -41,6 +42,7 @@ describe("Subscriptions and MonitoredItems", function () {
monitoringMode: subscription_service.MonitoringMode.Reporting,

requestedParameters: {
queueSize: 10,
samplingInterval: 10
}
});
Expand All @@ -58,7 +60,6 @@ describe("Subscriptions and MonitoredItems", function () {
monitoredItemCreateResult.revisedSamplingInterval.should.eql(10);



subscription.on("terminated",function(){
done();
// monitored Item shall be deleted at this stage
Expand All @@ -69,7 +70,7 @@ describe("Subscriptions and MonitoredItems", function () {
});


it("a subscription should collect monitored item notification at publishing interval",function(done){
it("a subscription should collect monitored item notification with collectDataChangeNotification",function(done){

var subscription = new Subscription({
publishingInterval: 1000,
Expand All @@ -79,21 +80,99 @@ describe("Subscriptions and MonitoredItems", function () {

var monitoredItemCreateRequest = new MonitoredItemCreateRequest({
itemToMonitor: {

},
monitoringMode: subscription_service.MonitoringMode.Reporting,

requestedParameters: {
queueSize: 10,
samplingInterval: 100
}
});

var monitoredItemCreateResult = subscription.createMonitoredItem(TimestampsToReturn.Both,monitoredItemCreateRequest);

this.clock.tick(2000);
subscription.on("notifications",function(notifications){
var monitoredItem = subscription.getMonitoredItem(monitoredItemCreateResult.monitoredItemId);

// at first, collectDataChangeNotification should collect nothing
var notification = subscription.collectDataChangeNotification();
should(notification).equal(null);

// now simulate some data change
this.clock.tick(100);
monitoredItem.recordValue({ dataType: DataType.UInt32 , value: 1000});

this.clock.tick(100);
monitoredItem.recordValue({ dataType: DataType.UInt32 , value: 1001});
monitoredItem.queue.length.should.eql(2);

// then, collectDataChangeNotification should collect at least 2 values
notification = subscription.collectDataChangeNotification();
notification.monitoredItems.length.should.eql(2);
notification.monitoredItems[0].clientHandle.should.eql(monitoredItem.clientHandle);

subscription.on("terminated",function(){
done();
});
subscription.terminate();

});

it("a subscription should collect monitored item notification at publishing interval",function(done){

var subscription = new Subscription({
publishingInterval: 1000,
maxKeepAliveCount: 20
});

// let spy the notifications event handler
var spy_notification_event = new sinon.spy();
subscription.on("notification",spy_notification_event);

var monitoredItemCreateRequest = new MonitoredItemCreateRequest({
itemToMonitor: {
},
monitoringMode: subscription_service.MonitoringMode.Reporting,
requestedParameters: {
clientHandle: 123,
queueSize: 10,
samplingInterval: 100
}
});

var monitoredItemCreateResult = subscription.createMonitoredItem(TimestampsToReturn.Both,monitoredItemCreateRequest);

var monitoredItem = subscription.getMonitoredItem(monitoredItemCreateResult.monitoredItemId);

// now simulate some data change
this.clock.tick(100);
monitoredItem.recordValue({ dataType: DataType.UInt32 , value: 1000});
this.clock.tick(100);
monitoredItem.recordValue({ dataType: DataType.UInt32 , value: 1001});
this.clock.tick(100);
monitoredItem.recordValue({ dataType: DataType.UInt32 , value: 1002});
monitoredItem.queue.length.should.eql(3);

this.clock.tick(800);
// monitoredItem values should have been harvested by subscription timer by now
monitoredItem.queue.length.should.eql(0);

//xx dump(subscription._pending_notifications);

// now let the subscription send a PublishResponse to the client
this.clock.tick(100);
monitoredItem.recordValue({ dataType: DataType.UInt32 , value: 2000});
this.clock.tick(100);
monitoredItem.recordValue({ dataType: DataType.UInt32 , value: 2001});
this.clock.tick(100);
monitoredItem.recordValue({ dataType: DataType.UInt32 , value: 2002});
monitoredItem.queue.length.should.eql(3);
this.clock.tick(800);

// monitoredItem values should have been harvested by subscription timer by now
monitoredItem.queue.length.should.eql(0);

spy_notification_event.callCount.should.be.greaterThan(3);


subscription.on("terminated",function(){
done();
});
Expand Down
4 changes: 2 additions & 2 deletions test/test_opcua_ClientServer_SubscriptionUseCase.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ describe("testing Client-Server subscription use case, on a fake server exposing
subscription.on("started",function(){
setTimeout(function() {
subscription.terminate(); }
, 500 );
, 1500 );
});
subscription.on("keepalive",function(){
nb_keep_alive_received +=1;
});
subscription.on("terminated",function(){
nb_keep_alive_received.should.be.greaterThan(0);
done();
nb_keep_alive_received.should.be.greaterThan(0);
});
},done);
});
Expand Down

0 comments on commit 5cd91bc

Please sign in to comment.