Skip to content

Commit

Permalink
Update public libs
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai committed Apr 23, 2014
1 parent 8499e4d commit ea3558d
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 38 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "caf_core",
"description": "Cloud Assistant Framework Core",
"version": "0.0.6",
"version": "0.0.7",
"author": "Antonio Lain <antlai@cafjs.com>",
"dependencies": {
"express" : "2.X",
Expand Down
224 changes: 187 additions & 37 deletions public/conduit/conduit.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ limitations under the License.
* a map in a graph traversal that respects the graph topological order.
* Map entries are labelled with the originating task, enabling communication
* between tasks. Unique task labels can be chosen by the application or
* assigned by the library.
* assigned by the library. Tasks can explicitly declare these dependencies and
* 'conduit' will check for data races or dangling references at build time.
* Labels can be made unique by scoping with a prefix, and 'conduit' will
* rewrite the dependencies for you.
*
* An error in any of the tasks aborts the traversal, returning in a callback
* this error and previous results already in the map.
Expand All @@ -37,25 +40,31 @@ limitations under the License.
* two operators __seq__(n) and __par__(n). For example:
*
* c = newInstance(['foo','bar'])
* c = c.foo({'arg':1}, 'fx0')
* .foo({'arg':2, 'prev': 'fx0'}, 'fx1')
* .foo({'arg':4, 'prev': 'fx1'}, 'fx3')
* c = c.foo({'arg':1}, null, 'fx0')
* .foo({'arg':2}, {'prev': 'fx0'}, 'fx1')
* .foo({'arg':4}, {'prev': 'fx1'}, 'fx3')
* .__seq__(3)
* .bar({'arg':2}, 'bx')
* .bar({'arg':4}, 'b1x')
* .bar({'arg':2}, null, 'bx')
* .bar({'arg':4}, null, 'b1x')
* .__seq__()
* .__par__()
*
* will execute in parallel two sequences, one with three foos and the other
* one with two bars. Note how custom labels allow a task in a sequence to know
* about its predecessor.
*
* And this can be composed with another conduit as follows:
*
* b = newInstance(['foo','bar'])
* b = b.foo({'arg':1}, 'ffx')
* b = b.foo({'arg':1}, null, 'ffx')
* .__push__(c)
* .__seq__()
*
* Or we can use it as a template, and prefix labels to avoid collisions:
*
* cClone = c.__scope__('myspace/')
* // and now labels are of the form 'myspace/fx0', 'myspace/fx1'...
*
* And serialized with:
*
* var st = b.__stringify__()
Expand All @@ -64,13 +73,13 @@ limitations under the License.
* It is easy to add meaning to `foo` and `bar`:
*
* var actions = {
* 'foo' : function(acc, args, cb) {
* var whatever = (args.prev && acc[args.prev] &&
* acc[args.prev].data &&
* acc[args.prev].data.whatever) || 0;
* 'foo' : function(acc, args, deps, label, cb) {
* var whatever = (deps.prev && acc[deps.prev] &&
* acc[deps.prev].data &&
* acc[deps.prev].data.whatever) || 0;
* cb(null, {'whatever': whatever + 1})
* },
* 'bar' : function(acc, args, cb) {
* 'bar' : function(acc, args, deps, label, cb) {
* ...
* }
* }
Expand All @@ -86,7 +95,7 @@ limitations under the License.
*
* var acc = {}
* b.__fold__(acc, function(err, data) {
* // data refers to `acc` with a new entry for each
* // data refers to `acc` with an entry for each
* // task (key is the given label or a unique string,
* // value is {err: <err>, data: <whatever>})
* ...
Expand Down Expand Up @@ -219,14 +228,14 @@ limitations under the License.
}
};
var isOK =
(typeof token.type === 'string' || token.type === null) &&
(typeof token.type === 'string') &&
(typeof token.name === 'string' || token.name === null) &&
// token.args is any JSON serializable type (no functions)
(typeof token.args === 'string' ||
typeof token.args === 'number' ||
typeof token.args === 'object' ||
typeof token.args === 'boolean'||
token.args === null) &&
typeof token.args === 'boolean') &&
(typeof token.deps === 'object') && // null is 'object'...
(typeof token.label === 'string' || token.label === null) &&
((Array.isArray(token.children) &&
childrenOK(token.children)) || token.children === null);
Expand All @@ -236,27 +245,109 @@ limitations under the License.
}
return token;
};

return deepFreeze(deepClone(deepType(inToken)));
};

var traverseToken = function(acc, token, f) {
switch(token.type) {
case SEQ:
case PAR:
Array.isArray(token.children) &&
token.children.forEach(function(x) {
traverseToken(acc, x, f);
});
break;
case METHOD:
f(acc, token);
break;
default:
throw new Error('Invalid type in token' + JSON.stringify(token));
}
};

var getDefs = function(acc, token) {
var f = function(acc, tk) {
tk.label && (acc[tk.label] = true);
};
acc = acc || {};
traverseToken(acc, token, f);
return acc;
};

var getUses = function(acc, token) {
var f = function(acc, tk) {
if (tk.deps && (typeof tk.deps === 'object')) {
var keys = Object.keys(tk.deps);
keys.forEach(function(key) {
var value = tk.deps[key];
if (typeof value === 'string') {
acc[value] = true;
}
});
}
};
acc = acc || {};
traverseToken(acc, token, f);
return acc;
};

var setIntersection = function(s1, s2) {
var result = {};
Object.keys(s1).forEach(function(key) {
if (s1[key] && s2[key]) {
result[key] = true;
}
});
return result;
};

var newToken = function(type, name, args, label, children) {
var setUnion = function(s1, s2) {
var result = {};
Object.keys(s1).forEach(function(key) {
if (s1[key]) {
result[key] = true;
}
});
Object.keys(s2).forEach(function(key) {
if (s2[key]) {
result[key] = true;
}
});
return result;
};

// s1 - s2
var setDiff = function(s1, s2) {
var result = {};
Object.keys(s1).forEach(function(key) { result[key] = s1[key];});
Object.keys(s2).forEach(function(key) {
if (s2[key]) {
delete result[key];
}
});
return result;
};

var isSetEmpty = function(s) {
return s && typeof s === 'object' && Object.keys(s).length === 0;
};

var newToken = function(type, name, args, deps, label, children) {
var that = { 'type': type, 'name' : name, 'args' : args,
'label' : label, 'children' : children};
'label' : label, 'deps': deps, 'children' : children};
return that;
};

var newMethod = function(name, args, label) {
return newToken(METHOD, name, args, label, null);
var newMethod = function(name, args, deps, label) {
return newToken(METHOD, name, args, deps, label, null);
};

var newSeq = function(mArray) {
return newToken(SEQ, null, null, null, mArray);
return newToken(SEQ, null, null, null, null, mArray);
};

var newPar = function(mArray) {
return newToken(PAR, null, null, null, mArray);
return newToken(PAR, null, null, null, null, mArray);
};


Expand All @@ -274,8 +365,9 @@ limitations under the License.
var that = initStack || newStack();

methodNames.forEach(function (name) {
that[name] = function(args, label) {
var m = newMethod(name, args, label);
that[name] = function(args, deps, label) {
label = label || nextId();
var m = newMethod(name, args, deps, label);
return that.__push__(m);
};
});
Expand All @@ -298,29 +390,85 @@ limitations under the License.
}
};

that.__defs__ = function() {
var result = {};
that.__forEach__(function(x) { getDefs(result, x);});
return result;
};

that.__uses__ = function() {
var result = {};
that.__forEach__(function(x) { getUses(result, x);});
return result;
};


that.__seq__ = function(n) {
return seqPar(n, function(mArray) { return newSeq(mArray); });
};

that.__par__ = function(n) {
return seqPar(n, function(mArray) { return newPar(mArray); });
var checkForRaces = function(mArray) {
// This is very inefficient, use for just a few tasks.
var allDefs = [];
mArray.forEach(function(x) {
allDefs.push(getDefs(null, x));
});
var checkOneF = function(x, i) {
var uses = getUses(null, x);
var f = function(def, j) {
if (i !== j) {
var races = setIntersection(uses, def);
if (!isSetEmpty(races)) {
throw new Error('Adding a data race' +
JSON.stringify(races));
}
}
};
allDefs.forEach(f);
};
mArray.forEach(checkOneF);
};
return seqPar(n, function(mArray) {
checkForRaces(mArray);
return newPar(mArray);
});
};

that.__push__ = function(data) {
var checkConflicts = function(cleanData) {
var newDefs = getDefs(null, cleanData);
var oldDefs = that.__defs__();
var conflicts = setIntersection(newDefs, oldDefs);
if (!isSetEmpty(conflicts)) {
throw new Error('Label already defined:' +
JSON.stringify(conflicts));
}
var newUses = getUses(null, cleanData);
var allDefs = setUnion(newDefs, oldDefs);
var freeVars = setDiff(newUses, allDefs);
if (!isSetEmpty(freeVars)) {
throw new Error('Dangling dependencies:' +
JSON.stringify(freeVars));
}
};
var inData;
if ((typeof data === 'object') && data.__peek__) {
// assume `data` is a conduit.
if (data.__length__() == 1) {
var inData = sanitize(data.__peek__());
return newConduit(methodNames, behavior,
that.__push_internal__(inData));
inData = sanitize(data.__peek__());
} else {
throw new Error('Trying to push a not fully resolved '+
'conduit:' + data.__stringify__());
}
} else {
return newConduit(methodNames, behavior,
that.__push_internal__(sanitize(data)));
inData = sanitize(data);
}
checkConflicts(inData);
return newConduit(methodNames, behavior,
that.__push_internal__(inData));


};

that.__pop__ = function() {
Expand All @@ -338,6 +486,9 @@ limitations under the License.
return newConduit(methodNames, newActions, that);
};

that.__scope__ = function(prefix) {
throw new Error('Not implemented');
};

/**
* Serializes the structure and configuration of this conduit
Expand Down Expand Up @@ -371,17 +522,16 @@ limitations under the License.
throw new Error('behaviour not defined for method ' +
data.name);
}
var id = data.label || nextId();
if (acc[id]) {
throw new Error('Duplicate label in the task graph: ' +
id);
if (typeof data.label !== 'string') {
throw new Error('undefined label' + JSON.stringify(data));
}
var cb1 = function(err, res) {
acc[id] = {'err': err, 'data' : res};
acc[data.label] = {'err': err, 'data' : res};
// res propagated with acc
cb(err);
};
behavior[data.name](acc, data.args, cb1);
behavior[data.name](acc, data.args, data.deps, data.label,
cb1);
break;
default:
throw new Error('Unknown type ' + data.type);
Expand Down

0 comments on commit ea3558d

Please sign in to comment.