Skip to content

Commit

Permalink
Got two simple job tests working.
Browse files Browse the repository at this point in the history
  • Loading branch information
doctorrustynelson committed Dec 17, 2014
1 parent 7249568 commit 9f72995
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 75 deletions.
43 changes: 41 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ var Job = require( './job' );
var config = require( './utils/config' );

function Client( ){
var socket = require( 'socket.io-client' )( 'http://' + config.get( 'shenzi.host' ) + ':' + config.get( 'shenzi.port' ) );
var socket = require( 'socket.io-client' )( 'http://' + config.get( 'shenzi.host' ) + ':' + config.get( 'shenzi.port' ), { multiplex: false } );
var ready = false;
var jobs = { };

socket.on( 'connect', function( ){
LOGGER.debug( 'Established connection.' );
Expand All @@ -19,6 +20,38 @@ function Client( ){
platform: process.platform
} );
} );

socket.on( 'job:new-confirmation', function( result ){
if( result.success ){
LOGGER.debug( 'New Job Registered ' + result.name + '.' );
jobs[ result.name ].compiled = result.joules;
} else {
LOGGER.error( 'Failed to register new Job ' + result.name + '.' );
}

if( jobs[ result.name ].new_callback ){
jobs[ result.name ].new_callback(
( result.success ? undefined : result.msg ),
jobs[ result.name ]
);
}
} );

socket.on( 'job:done', function( result ){
console.log( result );
if( result.success ){
LOGGER.debug( 'Job Complete ' + result.name + '.' );
} else {
LOGGER.error( 'Failed durring Job ' + result.name + '.' );
}

if( jobs[ result.name ].run_callback ){
jobs[ result.name ].run_callback(
( result.success ? undefined : result.msg ),
result.result
);
}
} );

socket.on( 'err', function( msg ){
LOGGER.error( msg );
Expand Down Expand Up @@ -48,7 +81,13 @@ function Client( ){
};

this.createJob = function( name ){
return new Job( this, name );
jobs[ name ] = new Job( this, name );
return jobs[ name ];
};

this.stop = function( ){
LOGGER.info( 'Closing Client.' );
socket.disconnect( true );
};

this._socket = socket;
Expand Down
15 changes: 14 additions & 1 deletion lib/connection-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ function ConnectionManager( ){
engine: data.engine,
engine_version: data.engine_version,
platform: data.platform,
socket: socket
socket: socket,
state: 'connected'
};

LOGGER.info( 'Registered new ' + data.type + ' @ ' + ip_address + ' : ' + JSON.stringify(
Expand Down Expand Up @@ -77,6 +78,18 @@ function ConnectionManager( ){
return undefined;
};

this.getEd = function( ){
// TODO: make smarter then random

var keys = Object.keys( nodes.ed ).filter( function( ip_address ){
return ( nodes.ed[ ip_address ].state === 'connected' );
} );

var ip_address = keys[ Math.floor( Math.random() * keys.length ) ];

return nodes.ed[ ip_address ];
};

this.disconnect = function( ip_address ){
var type = this.getConnectionType( ip_address );

Expand Down
19 changes: 10 additions & 9 deletions lib/ed-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,31 @@ function Ed( ){

// Shenzi Spawned New Banzai
shenzi_socket.on( 'banzai moved', function( ){
//TODO: handle when banzai comes up somewhere else
// TODO: handle when banzai comes up somewhere else
} );

// Shenzi Joule request
shenzi_socket.on( 'joule', function( joule ){
shenzi_socket.on( 'joule', function( step ){
var joule = step.joule;
LOGGER.debug( 'Recieved Joule' );
LOGGER.debug( JSON.stringify( step, null, '\t' ) );

switch( joule.deploy ){
case 'anonymous':
LOGGER.debug( 'Running Anonymous Joule.' );
LOGGER.debug( joule.func );
var func = eval( '(' + joule.func + ')' );

func( joule.input, {
'return': function( result ){
shenzi_socket.emit( 'joule-result', { success: true, result: result } );
}
}, 'other' );
var result = func( step.input, { /* TODO */ }, 'other' );
step.success = true;
step.output = result;
shenzi_socket.emit( 'joule:complete', step );
break;
case 'named':
//TODO
// TODO
break;
default:
shenzi_socket.emit( 'joule-result', { success: false } );
shenzi_socket.emit( 'joule:complete', { success: false } );
shenzi_socket.emit( 'err', { 'type': 'joule', 'msg': 'Can not run Joule of deployment type ' + joule.deploy + '.' } );
break;
}
Expand Down
51 changes: 34 additions & 17 deletions lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,50 @@ function Job( crocuta, name ){
//TODO: remove
sent = sent;

this.joule = function( unit ){
this.joule = function( unit, joule_name ){

if( typeof unit === 'function' ){
LOGGER.debug( 'Added raw joule to ' + name + ' job.' );
joules.push( {
func: unit.toString()
} );
switch( typeof unit ){
case 'function':
if( joule_name !== undefined ){
LOGGER.debug( 'Added saved-function joule ' + joule_name + ' to ' + name + ' job.' );
joules.push( {
deploy: 'saved-function',
name: joule_name,
func: unit.toString()
} );
} else {
LOGGER.debug( 'Added anonymous joule to ' + name + ' job.' );
joules.push( {
deploy: 'anonymous',
func: unit.toString()
} );
}
break;
case 'string':
LOGGER.debug( 'Added named joule ' + unit + ' to ' + name + ' job.' );
joules.push( {
deploy: 'named',
name: unit
} );
break;
default:
LOGGER.error( 'Can not determine type of joule to add to the job.' );
break;
}

return this;
};

this.send = function( input, callback ){
this.send = function( callback ){
if( !crocuta.isReady() ){
LOGGER.error( 'Can not send job to Crocuta. Server is not ready.' );
throw new Error( 'Can not send job to Crocuta. Server is not ready.' );
}

LOGGER.info( 'Sending ' + name + ' job to Crocuta.' );
crocuta._socket.emit( 'new job', JSON.stringify( { name: name, joules: joules } ) );

if( input !== undefined ){
this.start( input, callback );
}

return this;
crocuta._socket.emit( 'job:new', { name: name, joules: joules } );

this.new_callback = callback;
};

this.start = function( input, callback ){
Expand All @@ -44,9 +62,8 @@ function Job( crocuta, name ){

LOGGER.info( 'Starting ' + name + ' job.' );

input = input;
callback = callback;

this.run_callback = callback;
crocuta._socket.emit( 'job:start', { name: name, joules: this.compiled, input: input } );
};

this.getName = function( ){
Expand Down
83 changes: 66 additions & 17 deletions lib/shenzi-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,32 @@ function Shenzi( ){

var io = require( 'socket.io' )( );
var connection_manager = new ConnectionManager();
var jobs = {};
var job_counter = 0;

function stepJob( job_id, joule_index, result ){
var job = jobs[ job_id ];
var next_index = joule_index + 1;

if( next_index < job.joules.length ){
LOGGER.debug( 'Stepping Job ' + job.name + ' at joule #' + joule_index + '.' );

var worker = connection_manager.getEd( );
var joule = job.joules[ next_index ];

// TODO: move out of here we shouldn't be doing it every time we step.
joule.index = next_index;

worker.socket.emit( 'joule', { joule: joule, job_id: job_id, input: result } );
} else {
LOGGER.debug( 'Finished Job ' + job.name + '.' );

jobs[ job_id ] = undefined;
job.end = new Date();
job.duration = job.end - job.start;
job.client.emit( 'job:done', { success: true, name: job.name, id: job.name + ':' + job.id, result: result, duration: job.duration } );
}
}

io.on( 'connection', function( socket ){
var ip_address = socket.request.connection._peername.address + ':' + socket.request.connection._peername.port;
Expand All @@ -28,26 +54,49 @@ function Shenzi( ){
}
} );

socket.on( 'job:new', function( data ){
var job = JSON.parse( data );
socket.on( 'job:new', function( job ){
LOGGER.info( 'New job "' + job.name + '".' );
// TODO: Send Joules that need saving to Banzai
// job.joules.forEach( function( joule ){
// if( joule.func !== undefined ){
// LOGGER.debug( 'Raw joule.' );
// LOGGER.debug( joule.func );
// var func = eval( "(" + joule.func + ")" );
//
// func( "inputs", "outputs", "other" );
// }
// } );

var compiled_job = [ ];

job.joules.forEach( function( joule ){
switch( joule.deploy ){
case 'anonymous':
compiled_job.push( joule );
break;
case 'saved-function':
compiled_job.push( {} /* TODO */ );
break;
case 'named':
compiled_job.push( {} /* TODO */ );
break;
default:
LOGGER.error( 'Unrecognized Joule deployment type ' + joule.deploy + '.' );
break;
}
} );

socket.emit( 'job:new-confirmation', { success: true, name: job.name, joules: compiled_job } );
} );

socket.on( 'job:start', function( job ){

job.id = job_counter++;

LOGGER.info( 'Starting job "' + job.name + '" with id ' + job.id + '.' );
console.log( job );

job.start = new Date();
job.client = socket;
jobs[ job.id ] = job;

socket.emit( 'job:starting', { success: true, id: job.name + ':' + job.id } );
stepJob( job.id, -1, job.input );
} );

socket.on( 'job:start', function( data ){
var job = JSON.parse( data );
LOGGER.info( 'Start job "' + job.name + '".' );
// TODO: Schedule Job
// TODO: Run Job
socket.on( 'joule:complete', function( result ){
LOGGER.info( 'Completed Joule ' + result.joule.name + ' for job ' + result.job_id + '.' );
stepJob( result.job_id, result.joule.index, result.output );
} );

socket.on( 'disconnect', function( ){
Expand Down
46 changes: 27 additions & 19 deletions tests/ed-server-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,18 @@ module.exports.processJouleTests = {
socket.on( 'register', function( /* data */ ){
unit.ok( true, 'Registered with Shenzi.' );

socket.emit( 'joule', {
deploy: 'anonymous',
func: 'function( input, output, utils ){ output.return( 12345 ); }'
socket.emit( 'joule', {
input: undefined,
joule: {
deploy: 'anonymous',
func: 'function( input, output, utils ){ return 12345; }'
}
} );
} );

socket.on( 'joule-result', function( result ){
socket.on( 'joule:complete', function( result ){
unit.ok( result.success, 'Joule returned successfully.' );
unit.equal( result.result, 12345, 'Joule returned correct result.' );
unit.equal( result.output, 12345, 'Joule returned correct result.' );
clearTimeout( timeout );
ed_server.stop( );
test_shenzi.close( );
Expand Down Expand Up @@ -314,12 +317,15 @@ module.exports.processJouleTests = {
unit.ok( true, 'Registered with Shenzi.' );

socket.emit( 'joule', {
deploy: 'bad-deploy-type',
func: 'function( input, output, utils ){ output.return( 12345 ); }'
input: undefined,
joule: {
deploy: 'bad-deploy-type',
func: 'function( input, output, utils ){ return 12345; }'
}
} );
} );

socket.on( 'joule-result', function( result ){
socket.on( 'joule:complete', function( result ){
unit.ok( !result.success, 'Joule returned unsuccessfully.' );
stageComplete();
} );
Expand Down Expand Up @@ -355,15 +361,17 @@ module.exports.processJouleTests = {
unit.ok( true, 'Registered with Shenzi.' );

socket.emit( 'joule', {
deploy: 'anonymous',
func: [ 'function( input, output, utils ){',
'var result = {};',
'input.value.split( " " ).forEach( function( word ){',
'result[ word ] = ( result[ word ] === undefined ? 1 : result[ word ] + 1 );',
'} );',
'output.return( result );',
'}'
].join( '\n' ),
joule: {
deploy: 'anonymous',
func: [ 'function( input, output, utils ){',
'var result = {};',
'input.value.split( " " ).forEach( function( word ){',
'result[ word ] = ( result[ word ] === undefined ? 1 : result[ word ] + 1 );',
'} );',
'return result;',
'}'
].join( '\n' ),
},
input: {
value: 'Hello Crocuta . This is a Test .'
}
Expand All @@ -378,9 +386,9 @@ module.exports.processJouleTests = {
unit.done();
} );

socket.on( 'joule-result', function( result ){
socket.on( 'joule:complete', function( result ){
unit.ok( result.success, 'Joule returned successfully.' );
unit.deepEqual( result.result, {
unit.deepEqual( result.output, {
Hello: 1,
Crocuta: 1,
'.': 2,
Expand Down
Loading

0 comments on commit 9f72995

Please sign in to comment.