Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/middlewares/passport.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function fetchOrCreateUser (accessToken, refreshToken, profile, done) {
github.user.getEmails({
user: profile.id
}, function (err, emails) {
if (err) { cb(err); }
if (err) { return cb(err); }

var primaryEmail = find(emails, hasProps({ primary: true }));
if (!primaryEmail) {
Expand Down
3 changes: 2 additions & 1 deletion lib/models/apis/docker.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ Docker.prototype.startImageBuilderAndWait = function (sessionUser, version, cont
dockerTag: dockerTag,
buildLog: buildLogData,
dockerHost: self.dockerHost,
versionId: version._id
versionId: version._id,
completed: new Date()
});
var split = dockerTag.split(':');
var imageName = split[0];
Expand Down
153 changes: 135 additions & 18 deletions lib/models/mongo/context-version.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ var keypather = require('keypather')();
var find = require('101/find');
var equals = require('101/equals');
var noop = require('101/noop');
// var error = require('error');
var createCount = require('callback-count');
var dogstatsd = require('models/datadog');

/**
* d1 >= d2
* @param {Date} d1 date1
Expand Down Expand Up @@ -387,7 +388,7 @@ ContextVersionSchema.methods.setBuildCompleted = function (dockerInfo, cb) {
cb(Boom.badRequest('ContextVersion requires dockerImage'));
}
else {
var now = Date.now();
var now = dockerInfo.completed;
ContextVersion.findOneAndUpdate({
_id: contextVersion._id,
'build.started': {
Expand Down Expand Up @@ -476,22 +477,7 @@ ContextVersionSchema.methods.dedupe = function (callback) {
'build.started': { $exists: true },
infraCodeVersion: contextVersion.infraCodeVersion
};
if (contextVersion.appCodeVersions.length) {
query.$and = contextVersion.appCodeVersions.map(function (acv) {
return {
appCodeVersions: {
$elemMatch: {
lowerRepo: acv.lowerRepo,
commit: acv.commit
}
}
};
});
query.$and.push({appCodeVersions: { $size: contextVersion.appCodeVersions.length }});
}
else {
query.appCodeVersions = { $size: 0 };
}
query = addAppCodeVersionQuery(contextVersion, query);
opts = {
sort : '-build.started',
limit: 1
Expand Down Expand Up @@ -704,4 +690,135 @@ ContextVersionSchema.statics.modifyAppCodeVersionByRepo =
}, cb);
};

/**
* looks for build from contextVersions with the same hash and
* appcode then updates build if dupe
* @return contextVersion self
*/
ContextVersionSchema.methods.dedupeBuild = function (callback) {
var self = this;
var icvId = self.infraCodeVersion;
async.waterfall([
getHash,
setHash,
findPendingDupes,
findCompletedDupes, // must be done after pending due to race
replaceIfDupe,
], callback);

function getHash (cb) {
InfraCodeVersion.findById(icvId, function (err, icv) {
if (err) { return cb(err); }
icv.getHash(cb);
});
}
// hash should be set here so dedup will catch 2 builds comming at same time
function setHash (hash, cb) {
self.update({
$set: {
'build.hash' : hash
}
}, function(err) {
if (err) { return cb(err); }
self.build.hash = hash;
cb();
});
}
// find oldest pending build, (excluding self) which match hash and app-code
// self is determined by started time
function findPendingDupes (cb) {
var query = {
'build.completed': { $exists: false },
'build.hash': self.build.hash,
'build._id': { $ne: self.build._id }
};
query = addAppCodeVersionQuery(self, query);
var opts = {
sort : 'build.started',
limit: 1
};
ContextVersion.find(query, null, opts, function (err, duplicates) {
if (err) { return cb(err); }

// if none found, no completed dups exist
if (duplicates.length === 0) { return cb(null, null); }

// use oldest dupe
cb(null, duplicates[0]);
});
}

// find youngest completed builds, (excluding self) which match hash and app-code
// self is determined by started time
function findCompletedDupes (pending, cb) {
// always use oldest pending if exists
// else use youngest completeed if exists
// else no dupe
if (pending) {
return cb(null, pending);
}
var query = {
'build.completed': { $exists: true },
'build.hash': self.build.hash,
'build._id': { $ne: self.build._id }
};
query = addAppCodeVersionQuery(self, query);
var opts = {
sort : '-build.started',
limit: 1
};
ContextVersion.find(query, null, opts, function (err, duplicates) {
if (err) { return cb(err); }

// if none found, no completed dups exist
if (duplicates.length === 0) { return cb(null, null); }

// use oldest dupe
cb(null, duplicates[0]);
});
}


function replaceIfDupe(dupe, cb) {
if (dupe) {
dogstatsd.increment('api.contextVersion.build.deduped');
self.copyBuildFromContextVersion(dupe, cb);
} else {
dogstatsd.increment('api.contextVersion.build.noDupe');
cb(null, self);
}
}
};

function addAppCodeVersionQuery(contextVersion, query) {
if (contextVersion.appCodeVersions.length) {
query.$and = contextVersion.appCodeVersions.map(function (acv) {
return {
appCodeVersions: {
$elemMatch: {
lowerRepo: acv.lowerRepo,
commit: acv.commit
}
}
};
});
query.$and.push({appCodeVersions: { $size: contextVersion.appCodeVersions.length }});
} else {
query.appCodeVersions = { $size: 0 };
}
return query;
}

ContextVersionSchema.methods.copyBuildFromContextVersion = function (contextVersion, cb) {
var self = this;
self.build.dupeFound = true;
self.containerId = contextVersion.containerId;
self.update({
$set: {
'build': contextVersion.build,
'containerId': contextVersion.containerId
}
}, cb);
};

var ContextVersion = module.exports = mongoose.model('ContextVersions', ContextVersionSchema);
127 changes: 99 additions & 28 deletions lib/models/mongo/infra-code-version.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ var last = require('101/last');
var isFunction = require('101/is-function');
var debug = require('debug')('runnable-api:infra-code-version:model');
var regexpQuote = require('regexp-quote');
var crypto = require('crypto');
var bcrypt = require('bcrypt');
var jsonHash = require('json-stable-stringify');
var dogstatsd = require('models/datadog');
var uuid = require('uuid');

var path = require('path');
var join = path.join;
Expand Down Expand Up @@ -216,38 +219,46 @@ InfraCodeVersionSchema.methods.createFs = function (data, cb) {
infraCodeVersion.files.push(s3Data);
var fileData = infraCodeVersion.files.pop().toJSON();
var fileKey, dirKey;
var attrs = {
edited: true
};

if (last(fileData.Key) === '/') {
fileKey = fileData.Key.slice(0, -1);
dirKey = fileData.Key;
attrs.hash = hashString(data.body.toString());
update();
}
else {
fileKey = fileData.Key;
dirKey = join(fileData.Key, '/');
hashString(data.body, function (err, hash) {
if (err) { return cb(err); }
fileData.hash = hash;
update();
});
}

// atomic update
InfraCodeVersion.update({
_id: infraCodeVersion._id,
'files.Key': { $nin: [ fileKey, dirKey ] }
}, {
$push: {
files: fileData
},
$set: attrs
}, function (err, numUpdated) {
if (err) {
cb(err);
}
else if (numUpdated === 0) {
cb(Boom.conflict('Fs at path already exists: '+fullpath));
}
else {
cb(null, fileData);
}
});
function update () {
InfraCodeVersion.update({
_id: infraCodeVersion._id,
'files.Key': { $nin: [ fileKey, dirKey ] }
}, {
$push: {
files: fileData
},
$set: {
edited: true
}
}, function (err, numUpdated) {
if (err) {
cb(err);
}
else if (numUpdated === 0) {
cb(Boom.conflict('Fs at path already exists: '+fullpath));
}
else {
cb(null, fileData);
}
});
}
}
};

Expand All @@ -262,6 +273,7 @@ InfraCodeVersionSchema.methods.updateFile = function (fullpath, body, cb) {
async.waterfall([
findFile,
updateFile,
calcHash,
updateModel
], cb);
function findFile (cb) {
Expand All @@ -282,6 +294,13 @@ InfraCodeVersionSchema.methods.updateFile = function (fullpath, body, cb) {
cb(err, file, fileData);
});
}
function calcHash (file, fileData, cb) {
hashString(body, function(err, hash) {
if (err) { return cb(err); }
fileData.hash = hash;
cb(null, file, fileData);
});
}
function updateModel (file, fileData, cb) {
file.set(fileData);
InfraCodeVersion.update({
Expand All @@ -290,7 +309,6 @@ InfraCodeVersionSchema.methods.updateFile = function (fullpath, body, cb) {
}, {
$set: {
'files.$': file.toJSON(),
'hash': hashString(body.toString()),
edited: true
}
}, function (err) {
Expand Down Expand Up @@ -544,13 +562,66 @@ InfraCodeVersionSchema.methods.copyFilesFromSource = function (sourceInfraCodeVe
sourceVersion.files,
function (file, cb) {
// this protects the scope of bucket
bucket.copyFileFrom(file, cb);
bucket.copyFileFrom(file, function(err, newFile) {
if (err) { return cb(err); }
newFile.hash = file.hash;
cb(null, newFile);
});
},
callback);
}
};

function hashString(data) {
return crypto.createHash('md5').update(data.toString().trim()).digest('hex');
/**
* create a map of file hashes with filepath as key
* @param {Function} cb callback
*/
InfraCodeVersionSchema.methods.getHash = function (cb) {
InfraCodeVersion.findOne({
_id: this._id
}, function (err, infraCodeVersion) {
if (err) { return cb(err); }
var hashMap = {};
var invalidate = false;
infraCodeVersion.files.forEach(function(item) {
var filePath = item.Key.substr(item.Key.indexOf('/'));
if (item.isDir) {
// ensure dirs have some hash
hashMap[filePath] = '1';
} else if (item.hash) {
hashMap[filePath] = item.hash;
} else {
// file without hash. this should not happen.
// skip dedup by returning something that will never match
invalidate = true;
}
});

if (invalidate) {
cb(null, uuid());
} else {
hashString(jsonHash(hashMap), cb);
}
});
};


function hashString(data, cb) {
// salt from require('bcrypt'.enSaltSync(1);
var salt = '$2a$04$fLg/VU5eeDAUARmPVfyUo.';
var start = new Date();
bcrypt.hash(data
.replace(/[\s\uFEFF\xA0]+\n/g, '\n') // trim whitespace after line
.replace(/\n[\s\uFEFF\xA0]*\n/g, '\n') // remove blank lines
.replace(/^[\s\uFEFF\xA0]*\n/g, '') // remove start of file blank lines
.replace(/[\s\uFEFF\xA0]+$/g, '\n'), salt, function(err, hash) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unicode 👍

if (err) { return cb(err); }
dogstatsd.timing('api.infraCodeVersion.hashTime', new Date()-start, 1,
['length:'+data.length]);
cb(null, hash);
});
}

var InfraCodeVersion = module.exports = mongoose.model('InfraCodeVersion', InfraCodeVersionSchema);


Loading