Skip to content
Browse files

vows tests added + online mode support

  • Loading branch information...
1 parent 24fc896 commit 18111009df5a941c3f8f325964e7d18705a9431a @catz committed May 6, 2012
View
5 .gitignore
@@ -1,3 +1,6 @@
npm-debug.log
log/*
-node_modules
+config/settings.js
+config/settings.local.js
+node_modules
+*.log
View
36 active_queue_cluster.js → actual_queue_cluster.js
@@ -7,7 +7,8 @@ var http = require('http');
var numCPUs = require('os').cpus().length;
var Worker = require('./workers/core_worker');
var Sender = require('./url_sender');
-var settings = require('./config/settings');
+var settings = require('./config/settings'),
+ logger = require('./logger');;
var stat = {
requestReceived: 0,
@@ -32,7 +33,8 @@ function startMaster() {
function addWorker() {
var worker = cluster.fork();
workers.push(worker);
- console.log('worker ' + worker.pid + ' started');
+ logger.info('env: ' + process.env.APP_ENV);
+ logger.info('worker ' + worker.pid + ' started');
worker.on('message', function(msg) {
if (msg.cmd) {
@@ -47,7 +49,7 @@ function startMaster() {
stat.requestSentError++;
break;
default:
- console.log("Unprocessed notify: " + msg.cmd);
+ logger.error("Unprocessed notify: " + msg.cmd);
}
}
});
@@ -56,7 +58,7 @@ function startMaster() {
function addREPL() {
net.createServer(function (socket) {
- console.log('repl started on port' + socket.port);
+ logger.info('repl started on port' + socket.port);
stat.replConnections += 1;
var r = repl.start("node via TCP socket> ", socket);
r.context.stat = stat;
@@ -71,7 +73,7 @@ function startMaster() {
addREPL();
cluster.on('death', function(worker) {
- console.log('worker ' + worker.pid + ' died');
+ logger.info('worker ' + worker.pid + ' died');
var idx = workers.indexOf(worker);;
if (idx >= 0) {
@@ -83,24 +85,30 @@ function startMaster() {
// Setting process.title currently only works on Linux, FreeBSD and Windows.
process.title = settings.PS_TITLE;
- console.log( process.title + " started with pid " + process.pid);
+ logger.info(process.title + " started with pid " + process.pid);
setInterval(function() {
- settings.redis.hmget(settings.REDIS_PREFIX + "-stats", "events_received",
- "events_processed", "events_sent_error", function(err, reply) {
+ settings.redis.hmget(settings.REDIS_PREFIX + "-stats", "events_received", "online_events_received",
+ "events_processed", "events_sent_error", "event-recheck-sent", "event-recheck-sent-error", function(err, reply) {
stat.requestReceived = reply[0] || 0;
- stat.requestSent = reply[1] || 0;
- stat.requestSentError = reply[2] || 0;
-
- console.log("requestReceived: " + stat.requestReceived +
+ stat.requestReceivedOnline = reply[1] || 0;
+ stat.requestSent = reply[2] || 0;
+ stat.requestSentError = reply[3] || 0;
+ stat.requestRecheckSent = reply[4] || 0;
+ stat.requestRecheckSentError = reply[5] || 0;
+
+ logger.debug("requestReceived: " + stat.requestReceived +
+ " requestReceivedOnline: " + stat.requestReceivedOnline +
" requestSent: " + stat.requestSent +
" requestSentError: "+ stat.requestSentError +
+ " requestRecheckSent: "+ stat.requestRecheckSent +
+ " requestRecheckSentError: "+ stat.requestRecheckSentError +
" workers: " + workers.length);
});
}, 2000);
process.on('uncaughtException', function (err) {
- console.log("exception: " + err);
+ logger.error("exception: " + err.stack);
});
}
@@ -117,7 +125,7 @@ function startWorker() {
});
process.on('uncaughtException', function (err) {
- console.log("exception: " + err);
+ logger.error("exception: " + err.stack);
});
}
View
10 common.js
@@ -0,0 +1,10 @@
+exports.getHourMill = function (date) {
+ var now = date || new Date();
+ var a = new Date(now.getFullYear(), now.getMonth(), now.getDate(), now.getHours());
+ return a.getTime();
+}
+exports.getDayMill = function (date) {
+ var now = date || new Date();
+ var a = new Date(now.getFullYear(), now.getMonth(), now.getDate());
+ return a.getTime();
+}
View
9 config/settings.dev.js
@@ -0,0 +1,9 @@
+var redis = require('redis');
+exports.redis = redis.createClient(6379, '127.0.0.1');
+exports.REDIS_PREFIX = "actual_queue_dev"
+exports.EVENT_QUEUE_TTL = 60 * 60; // 1 hour (in seconds)
+exports.USER_POLL_THRESHOLD = 60 * 60 * 1000; // 30 min (in milliseconds)
+exports.USET_POLL_THRESHOLD_ONLINE = 10 * 1000; //10 sec (in milliseconds)
+exports.PS_TITLE = "actual_queue";
+exports.PORT = 8000;
+exports.PORT_UI = 8888;
View
9 config/settings.test.js
@@ -0,0 +1,9 @@
+var redis = require('redis');
+exports.redis = redis.createClient(6379, '127.0.0.1');
+exports.REDIS_PREFIX = "actual_queue_test"
+exports.EVENT_QUEUE_TTL = 60 * 60; // 1 hour
+exports.USER_POLL_THRESHOLD = 60 * 60 * 1000; // 30 min; x1000 fixed
+exports.USET_POLL_THRESHOLD_ONLINE = 10 * 1000; //10 sec
+exports.PS_TITLE = "actual_queue";
+exports.PORT = 8000;
+exports.PORT_UI = 8888;
View
7 logger.js
@@ -33,10 +33,15 @@ var Logger = function() {
Logger.prototype = new Log('debug', stream)
Logger.prototype.debug = function(msg) {
+ //do not print debug logs in production
+ if (!this.isProd())
+ Log.prototype.debug.call(this, msg);
+}
+
+Logger.prototype.spec = function(msg) {
if (this.isTest()) {
settings.redis.lpush(testKey, msg);
}
- Log.prototype.debug.call(this, msg);
}
module.exports = new Logger();
View
8 package.json
@@ -1,5 +1,5 @@
{
- "name": "active_queue",
+ "name": "actual_queue",
"version": "0.0.1",
"description": "Actual queue",
"keywords": [
@@ -16,7 +16,7 @@
"Karavashkin Andrey <akaravashkin@swdrom.com>"
],
"scripts": {
- "start": "node active_queue_cluster.js"
+ "start": "node actual_queue_cluster.js"
},
"engines": {
"node": ">= 0.6.0 < 0.7.0"
@@ -27,6 +27,8 @@
"express": "~2.5.9",
"log": "~1.3.0",
"jade": "~0.24.0",
- "stylus": "~0.25.0"
+ "stylus": "~0.25.0",
+ "vows": "~0.6.2",
+ "request": "~2.9.202"
}
}
View
18 public/javascripts/main.js
@@ -9,7 +9,7 @@ function init() {
setInterval(function() {
daysTab.show();
hoursTab.show();
- }, 5000);
+ }, 2000);
}
function getStatsCommon() {
@@ -28,8 +28,11 @@ function refreshStats(data) {
if (!data)
return;
$('#received').text(data.received);
+ $('#online_received').text(data.online_received);
$('#sent').text(data.sent);
$('#errors').text(data.errors);
+ $('#recheck_sent').text(data.recheck_sent);
+ $('#recheck_errors').text(data.recheck_errors);
}
function TypeTable(type) {
@@ -79,14 +82,14 @@ TypeTable.prototype.redraw = function(data) {
switch (this.type) {
case 0:
- $(this.tableId + ' tbody tr:nth-child(1) th:nth-child('+i+')').text(adjDay.getDate() +
- '.' + (adjDay.getMonth().length==2?adjDay.getMonth():'0'+adjDay.getMonth()) );
+ $(this.tableId + ' tbody tr:nth-child(1) th:nth-child('+i+')').text(to2Digit(adjDay.getDate()) +
+ '.' + to2Digit(adjDay.getMonth()+1) );
arrMil.push(adjDay.getTime());
adjDay.setDate(adjDay.getDate()+1);
break;
case 1:
- $(this.tableId + ' tbody tr:nth-child(1) th:nth-child('+i+')').text(adjHour.getHours() +
+ $(this.tableId + ' tbody tr:nth-child(1) th:nth-child('+i+')').text(to2Digit(adjHour.getHours()) +
':' + '00');
arrMil.push(adjHour.getTime());
@@ -103,11 +106,16 @@ TypeTable.prototype.redraw = function(data) {
var tdPos = arrMil.indexOf(Number(time));
if (tdPos != -1) {
- $(this.tableId + ' tbody tr:nth-child('+ (count+1) +') td:nth-child(' + (tdPos+2) + ')').text(data[type][time]);
+ $(this.tableId + ' tbody tr:nth-child('+ (count+1) +') td:nth-child(' + (tdPos+2) + ')').text(data[type][time]['received'] + "/" + (data[type][time]['sent'] || 0));
}
}
count++;
}
+
+ function to2Digit(s) {
+ s = String(s);
+ return (s.length==2?s:'0'+s)
+ }
}
TypeTable.prototype.addRow = function(id, col, isHeader) {
View
57 routes/json.js
@@ -1,14 +1,18 @@
-var settings = require('./../config/settings');
-var cachedStats = {}; // {daily\hourly_timestamp: {type: count}}
+var settings = require('./../config/settings'),
+ common = require('../common');
+var cachedStats = {}; // {daily\hourly_timestamp: {type: {received: count, sent: count}}}
exports.common = function(req, res){
var obj = {};
- settings.redis.hmget(settings.REDIS_PREFIX + "-stats", "events_received",
- "events_processed", "events_sent_error", function(err, reply) {
+ settings.redis.hmget(settings.REDIS_PREFIX + "-stats", "events_received", "online_events_received",
+ "events_processed", "events_sent_error", "event-recheck-sent", "event-recheck-sent-error", function(err, reply) {
obj.received = reply[0] || 0;
- obj.sent = reply[1] || 0;
- obj.errors = reply[2] || 0;
+ obj.online_received = reply[1] || 0;
+ obj.sent = reply[2] || 0;
+ obj.errors = reply[3] || 0;
+ obj.recheck_sent = reply[4] || 0;
+ obj.recheck_errors = reply[5] || 0;
res.send(obj);
});
};
@@ -19,21 +23,21 @@ exports.types = function(req, res){
var arrDate = [];
var dates = [];
- var rkey = req.params.key;
+ var rkey = req.params.key; //daily or hourly
var keyPrefix = settings.REDIS_PREFIX + '-types-' + rkey;
//create arrDate - array of dates(redis keys) we need
if (rkey == 'daily') {
for (var i=0; i < 7; i++) {
var adjDate = new Date();
adjDate.setDate(adjDate.getDate()-i);
- arrDate.push(getDayMill(adjDate));
+ arrDate.push(common.getDayMill(adjDate));
}
} else {
for (var i=0; i < 24; i++) {
var adjDate = new Date();
adjDate.setHours(adjDate.getHours()-i);
- arrDate.push(getHourMill(adjDate));
+ arrDate.push(common.getHourMill(adjDate));
}
}
@@ -49,7 +53,10 @@ exports.types = function(req, res){
} else {
for (var type in cachedStats[rkey + '_' + arrDate[i]]) {
obj[type] = obj[type] || {};
- obj[type][arrDate[i]] = cachedStats[rkey + '_' + arrDate[i]][type];
+ obj[type][arrDate[i]] = obj[type][arrDate[i]] || {};
+ for(var process in cachedStats[rkey + '_' + arrDate[i]][type]) {
+ obj[type][arrDate[i]][process] = cachedStats[rkey + '_' + arrDate[i]][type][process];
+ }
}
}
}
@@ -58,32 +65,20 @@ exports.types = function(req, res){
for (var i = 0; i < reply.length; i++) {
if (reply[i]) {
for(var type in reply[i]) {
- obj[type] = obj[type] || {};
- obj[type][dates[i]] = reply[i][type];
+ //type = process:type
+ var proc_type = type.split(':');
+
+ obj[proc_type[1]] = obj[proc_type[1]] || {};
+ obj[proc_type[1]][dates[i]] = obj[proc_type[1]][dates[i]] || {};
+ obj[proc_type[1]][dates[i]][proc_type[0]] = reply[i][type];
cachedStats[rkey + '_' + dates[i]] = cachedStats[rkey + '_' + dates[i]] || {};
- cachedStats[rkey + '_' + dates[i]][type] = reply[i][type];
+ cachedStats[rkey + '_' + dates[i]][proc_type[1]] = cachedStats[rkey + '_' + dates[i]][proc_type[1]] || {};
+ cachedStats[rkey + '_' + dates[i]][proc_type[1]][proc_type[0]] = reply[i][type];
}
}
}
-
+
res.send(obj);
});
-
- // version with sorted set which contains keys
- // settings.redis.zrevrangebyscore(settings.REDIS_PREFIX + '-types-' + rkey, now.getTime(), adjDate.getTime(),
- // function(err,reply) {
- // );
};
-
-
-function getHourMill(date) {
- var now = date || new Date();
- var a = new Date(now.getFullYear(), now.getMonth(), now.getDate(), now.getHours());
- return a.getTime();
-}
-function getDayMill(date) {
- var now = date || new Date();
- var a = new Date(now.getFullYear(), now.getMonth(), now.getDate());
- return a.getTime();
-}
View
19 server.js
@@ -22,10 +22,14 @@ appUI.configure(function(){
});
var event_tracker = require('./workers/event_tracker');
+var online_event_tracker = require('./workers/online_event_tracker');
app.get('/send_delayed', event_tracker);
app.post('/send_delayed', event_tracker);
+app.get('/user_online', online_event_tracker);
+app.post('/user_online', online_event_tracker);
+
app.get('/health', function(req, res) {
// console.log('health received');
res.send("ok");
@@ -39,4 +43,17 @@ appUI.get('/stats/types/:key',json.types);
appUI.get('/', routes.index);
module.exports = app;
-module.exports.ui = appUI;
+module.exports.ui = appUI;
+
+// fake routes for recheck_url tests
+app.get('/recheck_url_correct', function(req, res) {
+ res.send("ok");
+});
+
+app.get('/recheck_url_error', function(req, res) {
+ res.send("error");
+});
+
+app.get('/recheck_url_404', function(req, res) {
+ res.send('what???', 404);
+});
View
253 test/delayed-request-test.js
@@ -0,0 +1,253 @@
+/*
+ * INSTRUCTIONS
+ *
+ * export APP_ENV=test
+ *
+ * run npm start
+ *
+ * run vows --spec test/cluster-delayed-test.js
+ *
+ */
+
+var settings = require("./../config/settings.test");
+
+var request = require('request'),
+ vows = require('vows'),
+ assert = require('assert'),
+ apiUrl = "http://localhost:"+settings.PORT,
+ testKey = settings.REDIS_PREFIX + "-test-list",
+ healthApi = apiUrl +'/health',
+ jsonApiUrl = "http://localhost:"+settings.PORT_UI,
+ redis_sskey = settings.REDIS_PREFIX + "-types-hourly",
+ redis_sskey2 = settings.REDIS_PREFIX + "-types-daily",
+ test_uid = "test_spec_user_666",
+ cookie = null;
+
+var apiTest = {
+ general: function( method, url, data, cb ){
+ request(
+ {
+ method: method,
+ url: url, //apiUrl+(url||''),
+ json: data || {},
+ headers: {Cookie: cookie}
+ },
+ function(req, res){
+ cb( req, res )
+ }
+ )
+ },
+ get: function( url, data, cb ){ apiTest.general( 'GET', url, data, cb ) },
+ post: function( url, data, cb ){ apiTest.general( 'POST', url, data, cb ) },
+ put: function( url, data, cb ){ apiTest.general( 'PUT', url, data, cb ) },
+ del: function( url, data, cb ){ apiTest.general( 'DELETE', url, data, cb ) }
+}
+
+process.on('uncaughtException', function(err) {
+ console.log('spy an error: ' + err);
+});
+
+
+function getHourMill(date) {
+ var now = date || new Date();
+ var a = new Date(now.getFullYear(), now.getMonth(), now.getDate(), now.getHours());
+ return a.getTime();
+}
+
+
+var suite = vows.describe('Actual queue API delayed test')
+
+.addBatch({
+ "*Test delayed: sending '/send_delayed'": {
+ topic: function(){
+ settings.redis.flushdb();
+ // settings.redis.del(testKey);
+ apiTest.get(apiUrl + '/send_delayed?url='+ healthApi +'&delay=1', {} ,this.callback )
+ },
+ 'should be 200' : function(res) {
+ assert.ok(res.statusCode == 200)
+ },
+ "server should send delayed request": {
+ topic: function() {
+ var self = this;
+ setTimeout(function() {
+ self.callback();
+ }, 5000);
+ },
+ 'after waiting' : {
+ topic: function() {
+ settings.redis.lrange(testKey, 0, 10, this.callback)
+ },
+ 'delayed request should be sent by strict sequence of actions': function(err, reply) {
+ assert.ok(reply.join() == 'event-processed,event-fetched,event-received')
+ }
+ }
+ }
+ }
+})
+
+.addBatch({
+ "*Test type: sending test_type_spec": {
+ topic: function(){
+ apiTest.get(apiUrl + '/send_delayed?url='+ healthApi +'&delay=1&type=test_type_spec', {} ,this.callback )
+ },
+ 'should be 200' : function(res) {
+ assert.ok(res.statusCode == 200)
+ },
+ ".": {
+ topic: function(){
+ apiTest.get(apiUrl + '/send_delayed?url='+ healthApi +'&delay=1&type=test_type_spec', {} ,this.callback )
+ },
+ 'should be 200' : function(res) {
+ assert.ok(res.statusCode == 200)
+ }
+ }
+ },
+
+ "*Test JSON API: server track type stats": {
+ topic: function() {
+ var self = this;
+ setTimeout(function() {
+ self.callback();
+ }, 2000);
+ },
+ 'after waiting' : {
+ topic: function() {
+ apiTest.get(jsonApiUrl+'/stats/types/hourly', {} ,this.callback )
+ },
+ "count of received types is 2": function(res) {
+ // *** TODO after merge add prefix ***
+ assert.ok(res.body['test_type_spec'][getHourMill()]['received'] == 2);
+ },
+ '//clean database': {
+ topic: function(data) {
+ settings.redis.zrevrangebyscore(settings.REDIS_PREFIX + "-types-hourly", '+inf', '-inf', 'limit', 0, 1, this.callback );
+ },
+ 'OK': function(hashkey) {
+ settings.redis.del(hashkey);
+ settings.redis.zrem(redis_sskey, hashkey);
+ settings.redis.zrem(redis_sskey2, hashkey);
+ }
+ }
+ }
+ }
+})
+
+.addBatch({
+ "Test UID delay: sending uid": {
+ topic: function(){
+ apiTest.get(apiUrl + '/send_delayed?url='+ healthApi +'&delay=1&uid='+test_uid, {} ,this.callback )
+ },
+ 'should be 200' : function(res) {
+ assert.ok(res.statusCode == 200)
+ },
+ ".": {
+ topic: function() {
+ var self = this;
+ setTimeout(function() {
+ self.callback();
+ }, 5000);
+ },
+ ".": {
+ topic: function(){
+ apiTest.get(apiUrl + '/send_delayed?url='+ healthApi +'&delay=1&uid='+test_uid, {} ,this.callback )
+ },
+ 'should be 200' : function(res) {
+ assert.ok(res.statusCode == 200)
+ },
+ "Server should NOT send second request": {
+ topic: function() {
+ var self = this;
+ setTimeout(function() {
+ self.callback();
+ }, 5000);
+ },
+ 'after waiting' : {
+ topic: function() {
+ settings.redis.lrange(testKey, 0, 100, this.callback)
+ },
+ 'user skipped': function(err, reply) {
+ assert.ok(reply.join().indexOf('user-skipped') != -1)
+ }
+ }
+ }
+ }
+ }
+ }
+})
+
+.addBatch({
+ "*Test online : sending delayed with uid": {
+ topic: function() {
+ var self = this;
+ settings.redis.flushdb();
+ setTimeout(function() {
+ self.callback();
+ }, 1000);
+ },
+ ".": {
+ topic: function(){
+ apiTest.get(apiUrl + '/send_delayed?url='+ healthApi +'&uid='+ test_uid +'&delay=1&send_than_online=true', {} ,this.callback )
+ },
+ 'should be 200' : function(res) {
+ assert.ok(res.statusCode == 200)
+ },
+ ".": {
+ topic: function() {
+ var self = this;
+ settings.redis.flushdb();
+ setTimeout(function() {
+ self.callback();
+ }, 5000);
+ },
+ ".": {
+ topic: function(){
+ apiTest.get(apiUrl + '/user_online?uid='+ test_uid, {} ,this.callback )
+ },
+ 'should be 200' : function(res) {
+ assert.ok(res.statusCode == 200)
+ },
+
+ "online sent by /user_online": {
+ topic: function() {
+ var self = this;
+ setTimeout(function() {
+ self.callback();
+ }, 7000);
+ },
+ 'after waiting' : {
+ topic: function() {
+ settings.redis.lrange(testKey, 0, 10, this.callback)
+ },
+ 'online sent': function(err, reply) {
+ assert.ok(reply.join().indexOf('online-event-to-process-queue') != -1)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+})
+
+.addBatch({
+ "*Test JSON API /common (depends on previous test, check online sending)": {
+ ".": {
+ topic: function() {
+ var self = this;
+ settings.redis.flushdb();
+ setTimeout(function() {
+ self.callback();
+ }, 2000);
+ },
+ topic: function(){
+ apiTest.get(jsonApiUrl + '/stats/common', {} ,this.callback )
+ },
+ 'stats should be correct' : function(res) {
+ settings.redis.flushdb();
+ assert.ok(res.body['sent']*1 == 1);
+ }
+ }
+ }
+})
+.export( module );
View
6 test/post_event.rb
@@ -3,9 +3,11 @@
api_url = "http://127.0.0.1:8000/send_delayed"
url = "http://127.0.0.1:8000/health"
+recheck_url = "http://127.0.0.1:8000/recheck_url_error"
uid=(rand()*1000).to_i
type="test"
-delay=10000
+delay=1
post_url = URI.parse(api_url)
-res = Net::HTTP.post_form(post_url,{"url" => url, "delay" => delay, "uid" => uid, "type" => type})
+res = Net::HTTP.post_form(post_url,{"url" => url, "delay" => delay,
+ "uid" => uid, "type" => type, "recheck_url" => recheck_url, "send_than_online" => true})
p res.inspect
View
73 url_sender.js
@@ -2,33 +2,74 @@ var settings = require('./config/settings'),
events = require('events'),
util = require('util'),
url = require('url'),
- http = require('http');
+ http = require('http'),
+ logger = require('./logger');
function UrlSender(options) {
}
util.inherits(UrlSender, events.EventEmitter)
-UrlSender.prototype.send = function(event_id, event_data) {
+UrlSender.prototype.send_remote = function(url_parts, event_id, uid, type) {
try {
- var self = this;
- var url_parts = url.parse(event_data.url, true);
- var uid = event_data.uid;
- var request = http.request(url_parts, function(response) {
- if (response.statusCode == 200) {
- self.emit("event-sent", event_id, uid);
- } else {
+ var self = this;
+
+ var request = http.request(url_parts, function(response) {
+ if (response.statusCode == 200) {
+ self.emit("event-sent", event_id, uid, type);
+ } else {
+ self.emit("event-sent-error", event_id);
+ };
+ });
+ request.on('error', function(err) {
+ logger.error(err);
self.emit("event-sent-error", event_id);
- };
- });
- request.on('error', function(err) {
- console.log(err);
+ })
+ request.end();
+ } catch (e) {
+ logger.error(e);
self.emit("event-sent-error", event_id);
- })
- request.end();
+ }
+}
+
+UrlSender.prototype.send = function(event_id, event_data) {
+ try {
+ var self = this;
+ var url_parts = url.parse(event_data.url, true);
+ var uid = event_data.uid;
+ var type = event_data.type;
+
+ if(event_data.recheck_url) {
+ var recheck_url_parts = url.parse(event_data.recheck_url, true);
+ logger.debug('recheck_url found: ' + event_data.recheck_url);
+
+ var recheck_request = http.request(recheck_url_parts, function(response) {
+ if (response.statusCode != 200) {
+ logger.error('recheck url '+ event_data.recheck_url + 'returns code ' + response.statusCode);
+ };
+
+ response.on('data', function (chunk) {
+ if(chunk && (chunk.toString().toLowerCase() == 'ok')) {
+ self.send_remote(url_parts, event_id, uid, type);
+ self.emit("event-recheck-sent", event_id, uid);
+ } else {
+ self.emit("event-recheck-sent-error", event_id);
+ }
+ });
+ });
+
+ recheck_request.on('error', function(err) {
+ logger.error(err);
+ self.emit("event-recheck-sent-error", event_id);
+ })
+ recheck_request.end();
+ } else {
+ self.send_remote(url_parts, event_id, uid, type);
+ }
+
} catch (e) {
- console.log(err);
+ logger.error(e);
self.emit("event-sent-error", event_id);
}
}
View
10 views/_common_stats.jade
@@ -1,7 +1,13 @@
ul#common
li received:
span#received 0
+ li online received:
+ span#online_received 0
li sent:
span#sent 0
- li erros:
- span#errors 0
+ li errors:
+ span#errors 0
+ li recheck_sent:
+ span#recheck_sent 0
+ li recheck_errors:
+ span#recheck_errors 0
View
11 workers/common_tracker.js
@@ -0,0 +1,11 @@
+module.exports.parse_params = function(req) {
+ function isEmpty(obj) {
+ for(var prop in obj) {
+ if(obj.hasOwnProperty(prop))
+ return false;
+ }
+ return true;
+ }
+
+ return isEmpty(req.query)?req.body:req.query;
+}
View
61 workers/core_worker.js
@@ -1,7 +1,8 @@
var settings = require('../config/settings'),
events = require('events'),
util = require('util'),
- logger = require('../logger');
+ logger = require('../logger'),
+ common = require('../common');
function Worker(sender, options) {
var self = this;
@@ -14,16 +15,30 @@ function Worker(sender, options) {
self.on('event-data', function(event_id, event_data) {
logger.debug('event-processed');
+ logger.spec('event-processed'); //only test env
self.sender.send(event_id, event_data);
})
- self.sender.on('event-sent', function(event_id, uid) {
+ self.sender.on('event-sent', function(event_id, uid, type) {
logger.info("drop event: " + event_id + " uid: " + uid);
var multi = self.client.multi();
+ type = type || 'none';
+
multi.del(event_id);
if (uid)
multi.set(settings.REDIS_PREFIX + "-uid-" + uid, Date.now());
+
multi.hincrby(settings.REDIS_PREFIX + "-stats", "events_processed", 1);
+
+ //sent types stats
+ var keyDaily = settings.REDIS_PREFIX + "-types-daily:" + common.getDayMill();
+ var keyHourly = settings.REDIS_PREFIX + "-types-hourly:" + common.getHourMill();
+ multi.hincrby(keyDaily, 'sent:'+type, 1);
+ multi.hincrby(keyHourly, 'sent:'+type, 1);
+
+ multi.zadd(settings.REDIS_PREFIX + "-types-daily", common.getDayMill(), keyDaily);
+ multi.zadd(settings.REDIS_PREFIX + "-types-hourly", common.getHourMill(), keyHourly);
+
multi.exec(function(err, reply) {
});
@@ -32,7 +47,15 @@ function Worker(sender, options) {
self.sender.on('event-sent-error', function(event_id) {
self.client.hincrby(settings.REDIS_PREFIX + "-stats", "events_sent_error", 1);
logger.info("event postponed: " + event_id);
- })
+ });
+
+ self.sender.on('event-recheck-sent', function(event_id) {
+ self.client.hincrby(settings.REDIS_PREFIX + "-stats", "event-recheck-sent", 1);
+ })
+
+ self.sender.on('event-recheck-sent-error', function(event_id) {
+ self.client.hincrby(settings.REDIS_PREFIX + "-stats", "event-recheck-sent-error", 1);
+ });
}
util.inherits(Worker, events.EventEmitter)
@@ -49,15 +72,24 @@ Worker.prototype.restore = function(fn) {
var restored_ids = reply;
logger.info("events to restore: " + restored_ids.length);
restored_ids.forEach(function(event_id, idx){
- // check event in events queue
- self.client.zrank(self.queue_key, event_id, function(err, reply) {
- if(!reply) {
- // get event firing time
- self.client.hmget(event_id, 'time', function(error, reply) {
- if(reply && !isNaN(reply)) {
- self.client.zadd(self.queue_key, reply, event_id);
- }
- });
+ //do not get online events
+ self.client.hmget(event_id, "data", function(err, reply) {
+ if (reply) {
+ if (JSON.parse(reply).send_than_online !== "true") {
+ // check event in events queue
+ self.client.zrank(self.queue_key, event_id, function(err, reply) {
+ if(!reply) {
+ // get event firing time
+ self.client.hmget(event_id, 'time', function(error, reply) {
+ if(reply && !isNaN(reply)) {
+ self.client.zadd(self.queue_key, reply, event_id);
+ }
+ });
+ }
+ });
+ } else {
+ logger.info("online event skipped within restore")
+ }
}
});
});
@@ -81,6 +113,7 @@ Worker.prototype.fetch = function(fn) {
multi.zrem(self.queue_key, event_id);
multi.exec(function(err, reply) {
logger.debug('event-fetched');
+ logger.spec('event-fetched'); //only test env
});
});
}
@@ -101,12 +134,14 @@ Worker.prototype.process = function() {
if(reply) {
var data = JSON.parse(reply);
var uid = data.uid;
- if(uid) {
+ //do not use 30 min delay for online
+ if(uid && data.send_than_online !== "true") {
self.client.get(settings.REDIS_PREFIX + "-uid-" + uid, function(err, last_poll) {
if(!reply || (Date.now() - last_poll) > settings.USER_POLL_THRESHOLD) {
self.emit('event-data', event_id, data);
} else {
logger.debug("skipping user: " + uid);
+ logger.spec('user-skipped'); //only test env
}
})
} else {
View
60 workers/event_tracker.js
@@ -1,5 +1,8 @@
var settings = require('../config/settings'),
- crypto = require('crypto');
+ crypto = require('crypto'),
+ common = require('../common'),
+ common_tracker = require('./common_tracker'),
+ logger = require('../logger');
module.exports = function(req, res) {
function random() {
@@ -8,58 +11,43 @@ module.exports = function(req, res) {
return crypto.createHash('sha1').update(current_date + random).digest('hex');
}
- function parse_params(hash) {
- function isEmpty(obj) {
- for(var prop in obj) {
- if(obj.hasOwnProperty(prop))
- return false;
- }
- return true;
- }
-
- return isEmpty(req.query)?req.body:req.query;
- }
-
function track_event(event_id, event_data) {
var unique_key = settings.REDIS_PREFIX + "-event-" + event_id;
- var delay = event_data.delay?parseInt(event_data.delay):0;
- var delayedTime = Date.now() + delay;
+ var delay = event_data.delay?parseInt(event_data.delay):0; //sec
+ var delayedTime = Date.now() + delay*1000; //millisec
var multi = settings.redis.multi();
var type = event_data.type || 'none';
delete event_data['delay'];
if (type) {
- var keyDaily = settings.REDIS_PREFIX + "-types-daily:" + getDayMill();
- var keyHourly = settings.REDIS_PREFIX + "-types-hourly:" + getHourMill();
- multi.hincrby(keyDaily, type, 1);
- multi.hincrby(keyHourly, type, 1);
+ var keyDaily = settings.REDIS_PREFIX + "-types-daily:" + common.getDayMill();
+ var keyHourly = settings.REDIS_PREFIX + "-types-hourly:" + common.getHourMill();
+ multi.hincrby(keyDaily, 'received:'+type, 1); // process:type
+ multi.hincrby(keyHourly, 'received:'+type, 1);
- multi.zadd(settings.REDIS_PREFIX + "-types-daily", getDayMill(), keyDaily);
- multi.zadd(settings.REDIS_PREFIX + "-types-hourly", getHourMill(), keyHourly);
+ multi.zadd(settings.REDIS_PREFIX + "-types-daily", common.getDayMill(), keyDaily);
+ multi.zadd(settings.REDIS_PREFIX + "-types-hourly", common.getHourMill(), keyHourly);
}
-
- multi.hincrby(settings.REDIS_PREFIX + "-stats", "events_received", 1);
+
multi.hmset (unique_key, 'data', JSON.stringify(event_data));
multi.hmset (unique_key, 'time', delayedTime);
- multi.zadd(settings.REDIS_PREFIX + "-queue", delayedTime, unique_key);
+
+ if (event_data.send_than_online) {
+ var uid = event_data.uid;
+ multi.zadd(settings.REDIS_PREFIX + "-online-queue:" + uid, delayedTime, unique_key);
+ multi.hincrby(settings.REDIS_PREFIX + "-stats", "online_events_received", 1);
+ } else {
+ multi.zadd(settings.REDIS_PREFIX + "-queue", delayedTime, unique_key);
+ multi.hincrby(settings.REDIS_PREFIX + "-stats", "events_received", 1);
+ }
multi.expire (unique_key, delay + settings.EVENT_QUEUE_TTL);
multi.exec(function(err, reply) {
// LOG
+ logger.spec('event-received'); //only test env
});
}
- track_event(random(), parse_params(req));
+ track_event(random(), common_tracker.parse_params(req));
res.send("ok");
};
-
-function getHourMill(date) {
- var now = date || new Date();
- var a = new Date(now.getFullYear(), now.getMonth(), now.getDate(), now.getHours());
- return a.getTime();
-}
-function getDayMill(date) {
- var now = date || new Date();
- var a = new Date(now.getFullYear(), now.getMonth(), now.getDate());
- return a.getTime();
-}
View
40 workers/online_event_tracker.js
@@ -0,0 +1,40 @@
+var settings = require('../config/settings'),
+ common_tracker = require('./common_tracker'),
+ logger = require('../logger');
+
+function track_event(params) {
+ // get online-queue for given user
+ var uid = params.uid;
+ var now = Date.now();
+
+ settings.redis.zrevrangebyscore(settings.REDIS_PREFIX + "-online-queue:" + uid, now,
+ "-inf", "limit", 0, 1, function(err, reply) {
+ if (!err && reply !== undefined && reply.length > 0) {
+ var event_id = reply[0];
+ //check time hash to prevent putting event to queue more than every 10 sec
+ settings.redis.hget(settings.REDIS_PREFIX + "-online-last_sent_time", uid, function(err, reply) {
+ if (!err) {
+ //last time not exist or less
+ if ((!reply || reply =="") || (now >= (settings.USET_POLL_THRESHOLD_ONLINE + reply*1))) {
+ var multi = settings.redis.multi();
+ //only one event at the action
+ multi.lpush(settings.REDIS_PREFIX + "-queue-in_process", event_id);
+ multi.zrem(settings.REDIS_PREFIX + "-online-queue:" + uid, event_id);
+ multi.hset(settings.REDIS_PREFIX + "-online-last_sent_time", uid, now);
+ multi.exec(function(err, reply) {
+ logger.debug('online-event-to-process-queue');
+ logger.spec('online-event-to-process-queue'); //only test env
+ });
+ } else {
+ logger.debug('online-event-posponed');
+ }
+ }
+ });
+ }
+ });
+}
+
+module.exports = function(req, res) {
+ track_event(common_tracker.parse_params(req));
+ res.send("ok");
+};

0 comments on commit 1811100

Please sign in to comment.
Something went wrong with that request. Please try again.