Skip to content

Commit

Permalink
asyncified filters
Browse files Browse the repository at this point in the history
  • Loading branch information
frozeman committed Jun 5, 2015
1 parent 61f1ba6 commit ddafe00
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 207 deletions.
211 changes: 126 additions & 85 deletions dist/web3-light.js

Large diffs are not rendered by default.

85 changes: 85 additions & 0 deletions dist/web3-light.js.map

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions dist/web3-light.min.js

Large diffs are not rendered by default.

211 changes: 126 additions & 85 deletions dist/web3.js

Large diffs are not rendered by default.

85 changes: 85 additions & 0 deletions dist/web3.js.map

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions dist/web3.min.js

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions lib/web3.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ web3.eth.filter = function (fil, eventParams, options, formatter) {
return fil(eventParams, options);
}

// what outputLogFormatter? that's wrong
//return new Filter(fil, watches.eth(), formatters.outputLogFormatter);
// output logs works for blockFilter and pendingTransaction filters?
return new Filter(fil, watches.eth(), formatter || formatters.outputLogFormatter);
};
/*jshint maxparams:3 */
Expand Down
77 changes: 48 additions & 29 deletions lib/web3/filter.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var getOptions = function (options) {
};

var Filter = function (options, methods, formatter) {
var self = this;
var implementation = {};
methods.forEach(function (method) {
method.attachToObject(implementation);
Expand All @@ -83,51 +84,69 @@ var Filter = function (options, methods, formatter) {
this.implementation = implementation;
this.callbacks = [];
this.formatter = formatter;
this.filterId = this.implementation.newFilter(this.options);
this.implementation.newFilter(this.options, function(error, id){
if(error)
self.filterError = error;
else
self.filterId = id;
});
};

Filter.prototype.watch = function (callback) {
this.callbacks.push(callback);
var self = this;

var onMessage = function (error, messages) {
if (error) {
return self.callbacks.forEach(function (callback) {
callback(error);
});
}
// check inf an interval of 10ms if the filter id has arrived
var intervalId = setInterval(function(){

messages.forEach(function (message) {
message = self.formatter ? self.formatter(message) : message;
self.callbacks.forEach(function (callback) {
callback(null, message);
});
});
};
if(self.filterId || self.filterError)
clearInterval(intervalId);

// call getFilterLogs on start
if (!utils.isString(this.options)) {
this.get(function (err, messages) {
// don't send all the responses to all the watches again... just to this one
if (err) {
callback(err);
if(self.filterError || !self.filterId)
return;

self.callbacks.push(callback);

var onMessage = function (error, messages) {
if (error) {
return self.callbacks.forEach(function (callback) {
callback(error);
});
}

messages.forEach(function (message) {
callback(null, message);
message = self.formatter ? self.formatter(message) : message;
self.callbacks.forEach(function (callback) {
callback(null, message);
});
});
});
}
};

// call getFilterLogs on start
if (!utils.isString(self.options)) {
self.get(function (err, messages) {
// don't send all the responses to all the watches again... just to self one
if (err) {
callback(err);
}

messages.forEach(function (message) {
callback(null, message);
});
});
}

RequestManager.getInstance().startPolling({
method: this.implementation.poll.call,
params: [this.filterId],
}, this.filterId, onMessage, this.stopWatching.bind(this));
RequestManager.getInstance().startPolling({
method: self.implementation.poll.call,
params: [self.filterId],
}, self.filterId, onMessage, self.stopWatching.bind(self));

}, 10);
};

Filter.prototype.stopWatching = function () {
RequestManager.getInstance().stopPolling(this.filterId);
this.implementation.uninstallFilter(this.filterId);
// remove async
this.implementation.uninstallFilter(this.filterId, function(){});
this.callbacks = [];
};

Expand Down
4 changes: 2 additions & 2 deletions lib/web3/watches.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ var eth = function () {

switch(type) {
case 'latest':
args.pop();
args.shift();
this.params = 0;
return 'eth_newBlockFilter';
case 'pending':
args.pop();
args.shift();
this.params = 0;
return 'eth_newPendingTransactionFilter';
default:
Expand Down

0 comments on commit ddafe00

Please sign in to comment.