Skip to content

Commit

Permalink
New Core
Browse files Browse the repository at this point in the history
  • Loading branch information
ar2rsawseen committed Feb 9, 2015
1 parent 4ce2e79 commit b657d76
Show file tree
Hide file tree
Showing 2,658 changed files with 60,921 additions and 380,638 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG
@@ -1,3 +1,17 @@
15.02

* Plugins system

* New core and scalable data structure

* Using new MongoDB drive

* Path mounting

* Accepting bulk requests as post

* Lots of plugins as Data Populator, Event Logger, DBViewer, etc

14.08

* Added density reporting for Android
Expand Down
6 changes: 4 additions & 2 deletions Gruntfile.js
Expand Up @@ -14,7 +14,9 @@ module.exports = function(grunt) {
node:true,
"-W041": true,
"-W038": true,
"-W082": true
"-W082": true,
"-W058": true,
"-W030": true
},
all: ['Gruntfile.js', 'api/api.js', 'api/lib/*.js', 'api/parts/**/*.js', 'api/utils/common.js', 'frontend/express/app.js']
},
Expand All @@ -23,7 +25,7 @@ module.exports = function(grunt) {
test: {
options: {
reporter: 'spec',
timeout: 5000
timeout: 10000
},
src: ['test/**/*.js']
}
Expand Down
211 changes: 125 additions & 86 deletions api/api.js
Expand Up @@ -2,7 +2,10 @@ var http = require('http'),
cluster = require('cluster'),
os = require('os'),
url = require('url'),
querystring = require('querystring'),
common = require('./utils/common.js'),
plugins = require('../plugins/pluginManager.js'),
crypto = require('crypto'),
countlyApi = {
data:{
usage:require('./parts/data/usage.js'),
Expand All @@ -14,6 +17,9 @@ var http = require('http'),
apps:require('./parts/mgmt/apps.js')
}
};

plugins.init();
plugins.dispatch("/init", {common:common});

http.globalAgent.maxSockets = common.config.api.max_sockets || 1024;

Expand All @@ -31,12 +37,11 @@ function validateAppForWriteAPI(params) {

params.app_id = app['_id'];
params.app_cc = app['country'];
params.app_name = app['name'];
params.appTimezone = app['timezone'];
params.time = common.initTimeObj(params.appTimezone, params.qstring.timestamp);

var updateSessions = {};
common.fillTimeObject(params, updateSessions, common.dbMap['events']);
common.db.collection('sessions').update({'_id':params.app_id}, {'$inc':updateSessions}, {'upsert':true}, function(err, res){});

plugins.dispatch("/i", {params:params, app:app});

if (params.qstring.events) {
countlyApi.data.events.processEvents(params);
Expand All @@ -45,7 +50,7 @@ function validateAppForWriteAPI(params) {
}

if (params.qstring.begin_session) {
countlyApi.data.usage.beginUserSession(params);
countlyApi.data.usage.beginUserSession(params);
} else if (params.qstring.end_session) {
if (params.qstring.session_duration) {
countlyApi.data.usage.processSessionDuration(params, function () {
Expand All @@ -57,6 +62,13 @@ function validateAppForWriteAPI(params) {
} else if (params.qstring.session_duration) {
countlyApi.data.usage.processSessionDuration(params);
} else {
// begin_session, session_duration and end_session handle incrementing request count in usage.js
var dbDateIds = common.getDateIds(params),
updateUsers = {};

common.fillTimeObjectMonth(params, updateUsers, common.dbMap['events']);
common.db.collection('users').update({'_id': params.app_id + "_" + dbDateIds.month}, {'$inc': updateUsers}, {'upsert':true}, function(err, res){});

return false;
}
});
Expand All @@ -68,7 +80,6 @@ function validateUserForWriteAPI(callback, params) {
common.returnMessage(params, 401, 'User does not exist');
return false;
}

params.member = member;
callback(params);
});
Expand All @@ -95,6 +106,8 @@ function validateUserForDataReadAPI(params, callback, callbackParam) {
params.app_id = app['_id'];
params.appTimezone = app['timezone'];
params.time = common.initTimeObj(params.appTimezone, params.qstring.timestamp);

plugins.dispatch("/o/validate", {params:params, app:app});

if (callbackParam) {
callback(callbackParam, params);
Expand Down Expand Up @@ -135,19 +148,25 @@ if (cluster.isMaster) {
cluster.on('exit', function(worker) {
cluster.fork();
});

plugins.dispatch("/worker", {});

} else {

http.Server(function (req, res) {

var urlParts = url.parse(req.url, true),
queryString = urlParts.query,
paths = urlParts.pathname.split("/"),
apiPath = "",
params = {
'qstring':queryString,
'res':res
'res':res,
'req':req
};

//remove countly path
if(common.config.path == "/"+paths[1]){
paths.splice(1, 1);
}

if (queryString.app_id && queryString.app_id.length != 24) {
common.returnMessage(params, 400, 'Invalid parameter "app_id"');
Expand All @@ -166,73 +185,81 @@ if (cluster.isMaster) {

apiPath += "/" + paths[i];
}

plugins.dispatch("/", {params:params, apiPath:apiPath});

switch (apiPath) {
case '/i/bulk':
{

var requests = queryString.requests,
appKey = queryString.app_key;

if (requests) {
try {
requests = JSON.parse(requests);
} catch (SyntaxError) {
console.log('Parse bulk JSON failed');
}
} else {
common.returnMessage(params, 400, 'Missing parameter "requests"');
return false;
}

for (var i = 0; i < requests.length; i++) {

if (!requests[i].app_key && !appKey) {
continue;
}

var tmpParams = {
'app_id':'',
'app_cc':'',
'ip_address':requests[i].ip_address,
'user':{
'country':requests[i].country_code || 'Unknown',
'city':requests[i].city || 'Unknown'
},
'qstring':{
'app_key':requests[i].app_key || appKey,
'device_id':requests[i].device_id,
'metrics':requests[i].metrics,
'events':requests[i].events,
'session_duration':requests[i].session_duration,
'begin_session':requests[i].begin_session,
'end_session':requests[i].end_session,
'timestamp':requests[i].timestamp
}
};

if (!tmpParams.qstring.device_id) {
continue;
} else {
tmpParams.app_user_id = common.crypto.createHash('sha1').update(tmpParams.qstring.app_key + tmpParams.qstring.device_id + "").digest('hex');
}

if (tmpParams.qstring.metrics) {
if (tmpParams.qstring.metrics["_carrier"]) {
tmpParams.qstring.metrics["_carrier"] = tmpParams.qstring.metrics["_carrier"].replace(/\w\S*/g, function (txt) {
return txt.charAt(0).toUpperCase() + txt.substr(1).toLowerCase();
});
}

if (tmpParams.qstring.metrics["_os"] && tmpParams.qstring.metrics["_os_version"]) {
tmpParams.qstring.metrics["_os_version"] = tmpParams.qstring.metrics["_os"][0].toLowerCase() + tmpParams.qstring.metrics["_os_version"];
}
}

validateAppForWriteAPI(tmpParams);
}

common.returnMessage(params, 200, 'Success');
{
function processBulk(queryString){
var requests = queryString.requests,
appKey = queryString.app_key;

if (requests) {
try {
requests = JSON.parse(requests);
} catch (SyntaxError) {
console.log('Parse bulk JSON failed');
}
} else {
common.returnMessage(params, 400, 'Missing parameter "requests"');
return false;
}
for (var i = 0; i < requests.length; i++) {

if (!requests[i].app_key && !appKey) {
continue;
}

var tmpParams = {
'app_id':'',
'app_cc':'',
'ip_address':requests[i].ip_address || getIpAddress(req),
'user':{
'country':requests[i].country_code || 'Unknown',
'city':requests[i].city || 'Unknown'
},
'qstring':requests[i]
};

tmpParams["qstring"]['app_key'] = requests[i].app_key || appKey;

if (!tmpParams.qstring.device_id) {
continue;
} else {
tmpParams.app_user_id = common.crypto.createHash('sha1').update(tmpParams.qstring.app_key + tmpParams.qstring.device_id + "").digest('hex');
}

if (tmpParams.qstring.metrics) {
if (tmpParams.qstring.metrics["_carrier"]) {
tmpParams.qstring.metrics["_carrier"] = tmpParams.qstring.metrics["_carrier"].replace(/\w\S*/g, function (txt) {
return txt.charAt(0).toUpperCase() + txt.substr(1).toLowerCase();
});
}

if (tmpParams.qstring.metrics["_os"] && tmpParams.qstring.metrics["_os_version"]) {
tmpParams.qstring.metrics["_os_version"] = tmpParams.qstring.metrics["_os"][0].toLowerCase() + tmpParams.qstring.metrics["_os_version"];
}
}
validateAppForWriteAPI(tmpParams);
}

common.returnMessage(params, 200, 'Success');
}
if(req.method.toLowerCase() == 'post'){
var body = "";
req.on('data', function(chunk) {
body += chunk.toString();
});

req.on('end', function() {
// parse the received body data
processBulk(querystring.parse(body));
});
}
else
//attempt process GET request
processBulk(queryString);
break;
}
case '/i/users':
Expand Down Expand Up @@ -334,8 +361,6 @@ if (cluster.isMaster) {

} catch (SyntaxError) {
console.log('Parse metrics JSON failed');
common.returnMessage(params, 400, 'metrics JSON is not properly formed');
return false
}
}

Expand All @@ -344,8 +369,6 @@ if (cluster.isMaster) {
params.qstring.events = JSON.parse(params.qstring.events);
} catch (SyntaxError) {
console.log('Parse events JSON failed');
common.returnMessage(params, 400, 'events JSON is not properly formed');
return false;
}
}

Expand Down Expand Up @@ -412,18 +435,25 @@ if (cluster.isMaster) {
}

switch (params.qstring.method) {
case 'get_period_obj':
validateUserForDataReadAPI(params, countlyApi.data.fetch.getPeriodObj, 'users');
break;
case 'locations':
case 'sessions':
case 'users':
case 'devices':
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchTimeObj, 'users');
break;
case 'app_versions':
case 'device_details':
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchTimeObj, 'device_details');
break;
case 'devices':
case 'carriers':
case 'app_versions':
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchTimeData, params.qstring.method);
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchTimeObj, params.qstring.method);
break;
case 'cities':
if (common.config.api.city_data !== false) {
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchTimeData, params.qstring.method);
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchTimeObj, params.qstring.method);
} else {
common.returnOutput(params, {});
}
Expand All @@ -434,8 +464,6 @@ if (cluster.isMaster) {
params.qstring.events = JSON.parse(params.qstring.events);
} catch (SyntaxError) {
console.log('Parse events array failed');
common.returnMessage(params, 400, 'events JSON is not properly formed');
break;
}

validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchMergedEventData);
Expand All @@ -447,7 +475,8 @@ if (cluster.isMaster) {
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchCollection, 'events');
break;
default:
common.returnMessage(params, 400, 'Invalid method');
if(!plugins.dispatch(apiPath, {params:params, validateUserForDataReadAPI:validateUserForDataReadAPI, validateUserForMgmtReadAPI:validateUserForMgmtReadAPI}))
common.returnMessage(params, 400, 'Invalid method');
break;
}

Expand All @@ -471,15 +500,25 @@ if (cluster.isMaster) {
break;
case 'countries':
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchCountries);
break;
case 'sessions':
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchSessions);
break;
case 'metric':
validateUserForDataReadAPI(params, countlyApi.data.fetch.fetchMetric);
break;
default:
common.returnMessage(params, 400, 'Invalid path, must be one of /dashboard or /countries');
if(!plugins.dispatch(apiPath, {params:params, validateUserForDataReadAPI:validateUserForDataReadAPI, validateUserForMgmtReadAPI:validateUserForMgmtReadAPI, paths:paths}))
common.returnMessage(params, 400, 'Invalid path, must be one of /dashboard or /countries');
break;
}

break;
}
default:
if(!plugins.dispatch(apiPath, {params:params, validateUserForDataReadAPI:validateUserForDataReadAPI, validateUserForMgmtReadAPI:validateUserForMgmtReadAPI, validateUserForWriteAPI:validateUserForWriteAPI, paths:paths}))
common.returnMessage(params, 400, 'Invalid path');
}

}).listen(common.config.api.port, common.config.api.host || '');
}
}

0 comments on commit b657d76

Please sign in to comment.