Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Merge 0b3edb2 into 13ad96d
Browse files Browse the repository at this point in the history
  • Loading branch information
braydonf committed Nov 20, 2017
2 parents 13ad96d + 0b3edb2 commit 5685b33
Show file tree
Hide file tree
Showing 15 changed files with 2,283 additions and 726 deletions.
27 changes: 27 additions & 0 deletions bin/storj-cron.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env node

'use strict';

const async = require('async');
const program = require('commander');
const Config = require('../lib/config');
const StorageEventsCron = require('../lib/cron/storage-events');

program.version(require('../package').version);
program.option('-c, --config <path_to_config_file>', 'path to the config file');
program.option('-d, --datadir <path_to_datadir>', 'path to the data directory');
program.parse(process.argv);

var config = new Config(process.env.NODE_ENV || 'develop', program.config, program.datadir);

var jobs = [
new StorageEventsCron(config)
];

async.eachSeries(jobs, function(job, next) {
job.start(next);
}, function(err) {
if (err) {
throw err;
}
});
1 change: 1 addition & 0 deletions lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const DEFAULTS = {
from: 'robot@storj.io'
},
application: {
unknownReportThreshold: 0.3, // one third unknown reports
activateSIP6: false,
powOpts: {
retargetPeriod: 10000, // milliseconds
Expand Down
223 changes: 223 additions & 0 deletions lib/cron/storage-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
'use strict';

const assert = require('assert');
const CronJob = require('cron').CronJob;
const Config = require('../config');
const Storage = require('storj-service-storage-models');
const log = require('../logger');

function StorageEventsCron(config) {
if (!(this instanceof StorageEventsCron)) {
return new StorageEventsCron(config);
}

assert(config instanceof Config, 'Invalid config supplied');

this._config = config;
}

StorageEventsCron.CRON_TIME = '* */10 * * * *'; // every ten minutes
StorageEventsCron.MAX_RUN_TIME = 600000; // 10 minutes
StorageEventsCron.FINALITY_TIME = 10800000; // 3 hours

StorageEventsCron.prototype.start = function(callback) {
log.info('starting the storage events cron');

this.storage = new Storage(
this._config.storage.mongoUrl,
this._config.storage.mongoOpts,
{ logger: log }
);

this.job = new CronJob({
cronTime: StorageEventsCron.CRON_TIME,
onTick: this.run.bind(this),
start: false,
timeZone: 'UTC'
});

this.job.start();

setImmediate(callback);
};

StorageEventsCron.prototype._resolveCodes = function(event, user) {
const threshold = this._config.application.unknownReportThreshold;

let success = event.success;
let successModified = false;
let unknown = success ? false : true;
if (unknown) {
resolveCodes();
}

function resolveCodes() {
const failureCode = 1100;

const clientCode = event.clientReport ?
event.clientReport.exchangeResultCode : undefined;
const farmerCode = event.farmerReport ?
event.farmerReport.exchangeResultCode : undefined;

/* jshint eqeqeq: false */
if (farmerCode == failureCode && !clientCode) {
success = false;
unknown = false;
return;
}

if (farmerCode == failureCode && clientCode == failureCode) {
success = false;
unknown = false;
return;
}

if (!farmerCode && clientCode == failureCode) {
success = false;
unknown = false;
return;
}

if (user.exceedsUnknownReportsThreshold(threshold)) {
successModified = true;
success = true;
unknown = true;
return;
}
}

return {
success: success,
successModified: successModified,
unknown: unknown
};

};

StorageEventsCron.prototype._resolveEvent = function(event, callback) {

this.storage.models.User.findOne({_id: event.user}, (err, user) => {
if (err) {
return callback(err);
}

const {success, successModified, unknown} = this._resolveCodes(event);

if (successModified) {
// TODO also give reputation points to farmer for successful
// transfer for the storage event
event.success = success;
event.save((err) => {
if (err) {
return callback(err);
}
finalize();
});
} else {
finalize();
}

function finalize() {
user.updateUnknownReports(unknown, event.timestamp, (err) => {
if (err) {
return callback(err);
}

callback(null, event.timestamp);
});
}
});
};

StorageEventsCron.prototype._run = function(lastTimestamp, callback) {

const StorageEvent = this.storage.models.StorageEvent;
const finalityTime = Date.now() - StorageEventsCron.FINALITY_TIME;

const cursor = StorageEvent.find({
timestamp: {
$lt: finalityTime,
$gt: lastTimestamp
},
user: {
$exists: true
}
}).sort({timestamp: 1}).cursor();

const timeout = setTimeout(() => {
finish(new Error('Job exceeded max duration'));
}, StorageEventsCron.MAX_RUN_TIME);

let callbackCalled = false;

function finish(err) {
clearTimeout(timeout);
cursor.close();
if (!callbackCalled) {
callbackCalled = true;
callback(err, lastTimestamp);
}
}

cursor.on('error', finish);

cursor.on('data', (event) => {
cursor.pause();
this._resolveEvent(event, (err, _lastTimestamp) => {
if (err) {
return finish(err);
}
lastTimestamp = _lastTimestamp;
cursor.resume();
});
});

cursor.on('end', finish);
};

StorageEventsCron.prototype.run = function() {
const name = 'StorageEventsFinalityCron';
const Cron = this.storage.models.CronJob;
Cron.lock(name, StorageEventsCron.MAX_RUN_TIME, (err, locked, res) => {
if (err) {
return log.error('%s lock failed, reason: %s', name, err.message);
}
if (!locked) {
return log.warn('%s already running', name);
}

log.info('Starting %s cron job', name);

let lastTimestamp = new Date(0);
if (res &&
res.value &&
res.value.rawData &&
res.value.rawData.lastTimestamp) {
lastTimestamp = new Date(res.value.rawData.lastTimestamp);
} else {
log.warn('%s cron has unknown lastTimestamp', name);
}

this._run(lastTimestamp, (err, _lastTimestamp) => {
if (err) {
let message = err.message ? err.message : 'unknown';
log.error('Error running %s, reason: %s', name, message);
}

log.info('Stopping %s cron', name);
const rawData = {};
if (_lastTimestamp) {
rawData.lastTimestamp = _lastTimestamp.getTime();
}

Cron.unlock(name, rawData, (err) => {
if (err) {
return log.error('%s unlock failed, reason: %s', name, err.message);
}
});

});
});
};

module.exports = StorageEventsCron;
70 changes: 68 additions & 2 deletions lib/monitor/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const assert = require('assert');
const crypto = require('crypto');
const storj = require('storj-lib');
const MonitorConfig = require('./config');
const Storage = require('storj-service-storage-models');
const StorageModels = require('storj-service-storage-models');
const points = StorageModels.constants.POINTS;
const ComplexClient = require('storj-complex').createClient;
const MongoDBStorageAdapter = require('storj-mongodb-adapter');
const ms = require('ms');
Expand Down Expand Up @@ -44,7 +45,7 @@ Monitor.MAX_SIGINT_WAIT = 5000;
Monitor.prototype.start = function(callback) {
log.info('Farmer monitor service is starting');

this.storage = new Storage(
this.storage = new StorageModels(
this._config.storage.mongoUrl,
this._config.storage.mongoOpts,
{ logger: log }
Expand Down Expand Up @@ -152,6 +153,33 @@ Monitor.prototype._saveShard = function(shard, destination, callback) {
});
};

Monitor.prototype._createStorageEvent = function(token,
shardHash,
shardBytes,
client,
farmer) {
const StorageEvent = this.storage.models.StorageEvent;

const storageEvent = new StorageEvent({
token: token,
user: null,
client: client,
farmer: farmer, // farmer storing the new mirror
timestamp: Date.now(),
downloadBandwidth: 0,
storage: shardBytes,
shardHash: shardHash,
success: false
});

storageEvent.save((err) => {
if (err) {
log.warn('_createStorageEvent: Error saving event, ' +
'reason: %s', err.message);
}
});
};

Monitor.prototype._transferShard = function(shard, state, callback) {
const source = state.sources[0];
const destination = state.destinations[0];
Expand Down Expand Up @@ -187,6 +215,11 @@ Monitor.prototype._transferShard = function(shard, state, callback) {

const farmer = storj.Contact(destination.contact);

const token = pointer.token;
const shardHash = shard.hash;
const shardBytes = contract.get('data_size');
this._createStorageEvent(token, shardHash, shardBytes, source.nodeID, farmer.nodeID);

this.network.getMirrorNodes([pointer], [farmer], (err) => {
if (err) {
log.warn('Unable to mirror to farmer %s, reason: %s',
Expand Down Expand Up @@ -247,6 +280,36 @@ Monitor.prototype._replicateFarmer = function(contact) {
});
};

Monitor.prototype._giveOfflinePenalty = function(contact) {
contact.recordPoints(points.OFFLINE);
contact.save((err) => {
if (err) {
this._logger.error('_giveOfflinePenalty: Unable to save contact %s ' +
'to update reputation, reason: %s',
contact._id, err.message);
}
});
};

Monitor.prototype._markStorageEventsEnded = function(contact) {
const StorageEvent = this.storage.models.StorageEvent;
StorageEvent.update({
farmer: contact._id,
farmerEnd: { $exists: false }
}, {
$currentDate: {
farmerEnd: true
}
}, {
multi: true
}, (err) => {
if (err) {
log.error('Unable to update farmer %s storage events with farmerEnd, ' +
'reason: %s', contact._id, err.message);
}
});
};

Monitor.prototype.run = function() {
if (this._running) {
return this.wait();
Expand Down Expand Up @@ -317,7 +380,10 @@ Monitor.prototype.run = function() {
if (contactData.timeoutRate >= timeoutRateThreshold) {
log.warn('Shards need replication, farmer: %s, timeoutRate: %s',
contact.nodeID, contactData.timeoutRate);

this._replicateFarmer(contact);
this._giveOfflinePenalty(contactData);
this._markStorageEventsEnded(contactData);
}

} else {
Expand Down
2 changes: 2 additions & 0 deletions lib/server/middleware/farmer-auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ function authFarmer(req, res, next) {
return next(new errors.BadRequestError('Invalid signature header'));
}

req.farmerNodeID = nodeID;

next();
}

Expand Down

0 comments on commit 5685b33

Please sign in to comment.