Skip to content

Commit

Permalink
[Closes #3] added key2loc o2mjoule instead of "key to worker"
Browse files Browse the repository at this point in the history
Required a small refactor of ed-server and thus the ed-server tests.
  • Loading branch information
doctorrustynelson committed Dec 21, 2014
1 parent 02d915e commit ff632c5
Show file tree
Hide file tree
Showing 6 changed files with 417 additions and 258 deletions.
8 changes: 7 additions & 1 deletion lib/banzai-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ function Banzai( ){
} );

socket.on( 'getLocationsOf', function( key, callback ){
callback( data_director.getLocations( key ) );
callback(
data_director.getLocations( key )
.map( function( banzai_loc ){
console.log( connection_manager.getEd( banzai_loc ) );
return connection_manager.getEd( banzai_loc ).shenzi_ip;
} )
);
} );

socket.on( 'getKeysUnder', function( key, callback ){
Expand Down
15 changes: 9 additions & 6 deletions lib/connection-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ function ConnectionManager( ){
platform: data.platform,
socket: socket,
ip_address: ip_address,
state: 'connected'
state: 'connected',
shenzi_ip: data.shenzi_ip
};

LOGGER.info( 'Registered new ' + data.type + ' @ ' + ip_address + ' : ' + JSON.stringify(
Expand Down Expand Up @@ -79,14 +80,16 @@ function ConnectionManager( ){
return undefined;
};

this.getEd = function( ){
this.getEd = function( ip_address ){
// TODO: make smarter then random

var keys = Object.keys( nodes.ed ).filter( function( ip_address ){
return ( nodes.ed[ ip_address ].state === 'connected' );
} );
if( ip_address === undefined ){
var keys = Object.keys( nodes.ed ).filter( function( ip_address ){
return ( nodes.ed[ ip_address ].state === 'connected' );
} );

var ip_address = keys[ Math.floor( Math.random() * keys.length ) ];
ip_address = keys[ Math.floor( Math.random() * keys.length ) ];
}

return nodes.ed[ ip_address ];
};
Expand Down
162 changes: 86 additions & 76 deletions lib/ed-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ var config = require( './utils/config' );
function Ed( ){

var shenzi_socket = require( 'socket.io-client' )( 'http://' + config.get( 'shenzi.host' ) + ':' + config.get( 'shenzi.port' ), config.get( 'ed.connection.options' ) );
var banzai_socket = require( 'socket.io-client' )( 'http://' + config.get( 'banzai.host' ) + ':' + config.get( 'banzai.port' ), config.get( 'ed.connection.options' ) );
var ed_shenzi_socket_address = '';
var banzai_socket = null;
var data_manager = new DataManager( );
var file_system = new FileSystem( data_manager, banzai_socket );
var file_system = null;

/*********************************************************************************************************************/
/* Shenzi Communication */
Expand All @@ -28,6 +29,11 @@ function Ed( ){
platform: process.platform,
memory: os.totalmem(),
cpus: os.cpus()
}, function( location ){
ed_shenzi_socket_address = location;
banzai_socket = require( 'socket.io-client' )( 'http://' + config.get( 'banzai.host' ) + ':' + config.get( 'banzai.port' ), config.get( 'ed.connection.options' ) );
file_system = new FileSystem( data_manager, banzai_socket );
setupBanzai();
} );
} );

Expand All @@ -53,6 +59,10 @@ function Ed( ){
LOGGER.debug( JSON.stringify( step, null, '\t' ) );

switch( joule.deploy ){
case 'named':
// TODO
case 'saved-function':
//TODO
case 'anonymous':
LOGGER.debug( 'Running Anonymous Joule.' );
LOGGER.debug( joule.func );
Expand All @@ -70,9 +80,6 @@ function Ed( ){

vm.runInContext( '(' + joule.func + ')()', context, joule.name + '.vm' );
break;
case 'named':
// TODO
break;
default:
shenzi_socket.emit( 'joule:complete', { success: false } );
shenzi_socket.emit( 'err', { 'type': 'joule', 'msg': 'Can not run Joule of deployment type ' + joule.deploy + '.' } );
Expand All @@ -85,81 +92,84 @@ function Ed( ){
/* Banzai Communication */
/*********************************************************************************************************************/

banzai_socket.on( 'connect', function( ){
LOGGER.debug( 'Established Connection to Banzai.' );

banzai_socket.emit( 'register', {
type: 'ed',
engine: 'nodejs',
datakeys: data_manager.listKeys( )
function setupBanzai(){
banzai_socket.on( 'connect', function( ){
LOGGER.debug( 'Established Connection to Banzai.' );

banzai_socket.emit( 'register', {
type: 'ed',
engine: 'nodejs',
datakeys: data_manager.listKeys( ),
shenzi_ip: ed_shenzi_socket_address
} );
} );
} );

//Banzai Error Handler
banzai_socket.on( 'err', function( msg ){
LOGGER.error( msg );
} );

// Banzai Disconnect Handler
banzai_socket.on( 'disconnect', function( ){
LOGGER.warn( 'Disconnection' );
} );

banzai_socket.on( 'hold-data', function( metadata ){
LOGGER.debug( 'Recieved Hold Data Request: ' + metadata.key );
switch( metadata.type ){
case 'text':
data_manager.store( metadata.key, metadata.value, function( error ){
if( error ){
LOGGER.error( 'Failed to hold data for key ' + metadata.key + ' with type ' + metadata.type + '.' );
banzai_socket.emit( 'hold-confirmation', { success: false, key: metadata.key, message: error } );
} else {
banzai_socket.emit( 'hold-confirmation', { success: true, key: metadata.key, } );
}
} );
break;
default:
LOGGER.error( 'Unrecognized data type: ' + metadata.type );
banzai_socket.emit( 'hold-confirmation', { success: false, key: metadata.key, message: 'Unrecognized data type: ' + metadata.type } );
banzai_socket.emit( 'err', { message: 'Unrecognized data type: ' + metadata.type } );
break;
}
} );

banzai_socket.on( 'lose-data', function( metadata ){
LOGGER.debug( 'Recieved Lose Data Request: ' + metadata.key );
data_manager.lose( metadata.key, function( error ){
if( error ){
LOGGER.error( 'Failed to lose data for key ' + metadata.key + '.' );
banzai_socket.emit( 'lose-confirmation', { success: false, key: metadata.key, message: error } );
} else {
banzai_socket.emit( 'lose-confirmation', { success: true, key: metadata.key } );
}

//Banzai Error Handler
banzai_socket.on( 'err', function( msg ){
LOGGER.error( msg );
} );
} );

banzai_socket.on( 'get-data', function( metadata ){
LOGGER.debug( 'Recieved Get Data Request: ' + metadata.key );
data_manager.get( metadata.key, function( error, data ){
if( error ){
LOGGER.error( 'Failed to get data for key ' + metadata.key + '.' );
banzai_socket.emit( 'get-confirmation', { success: false, key: metadata.key, message: error } );
} else {
banzai_socket.emit( 'get-confirmation', { success: true, key: metadata.key, data: data.toString() } );

// Banzai Disconnect Handler
banzai_socket.on( 'disconnect', function( ){
LOGGER.warn( 'Disconnection' );
} );

banzai_socket.on( 'hold-data', function( metadata ){
LOGGER.debug( 'Recieved Hold Data Request: ' + metadata.key );
switch( metadata.type ){
case 'text':
data_manager.store( metadata.key, metadata.value, function( error ){
if( error ){
LOGGER.error( 'Failed to hold data for key ' + metadata.key + ' with type ' + metadata.type + '.' );
banzai_socket.emit( 'hold-confirmation', { success: false, key: metadata.key, message: error } );
} else {
banzai_socket.emit( 'hold-confirmation', { success: true, key: metadata.key, } );
}
} );
break;
default:
LOGGER.error( 'Unrecognized data type: ' + metadata.type );
banzai_socket.emit( 'hold-confirmation', { success: false, key: metadata.key, message: 'Unrecognized data type: ' + metadata.type } );
banzai_socket.emit( 'err', { message: 'Unrecognized data type: ' + metadata.type } );
break;
}
} );
} );

banzai_socket.on( 'list-data', function( ){
LOGGER.debug( 'Recieved List Data Request' );
banzai_socket.emit( 'list-confirmation', { success: true, keys: data_manager.listKeys( ) } );
} );

// Banzai Spawned New Shenzi
banzai_socket.on( 'shenzi moved', function( ){
LOGGER.debug( 'Shenzi has moved' );
//TODO: handle when shenzi comes up somewhere else
} );

banzai_socket.on( 'lose-data', function( metadata ){
LOGGER.debug( 'Recieved Lose Data Request: ' + metadata.key );
data_manager.lose( metadata.key, function( error ){
if( error ){
LOGGER.error( 'Failed to lose data for key ' + metadata.key + '.' );
banzai_socket.emit( 'lose-confirmation', { success: false, key: metadata.key, message: error } );
} else {
banzai_socket.emit( 'lose-confirmation', { success: true, key: metadata.key } );
}
} );
} );

banzai_socket.on( 'get-data', function( metadata ){
LOGGER.debug( 'Recieved Get Data Request: ' + metadata.key );
data_manager.get( metadata.key, function( error, data ){
if( error ){
LOGGER.error( 'Failed to get data for key ' + metadata.key + '.' );
banzai_socket.emit( 'get-confirmation', { success: false, key: metadata.key, message: error } );
} else {
banzai_socket.emit( 'get-confirmation', { success: true, key: metadata.key, data: data.toString() } );
}
} );
} );

banzai_socket.on( 'list-data', function( ){
LOGGER.debug( 'Recieved List Data Request' );
banzai_socket.emit( 'list-confirmation', { success: true, keys: data_manager.listKeys( ) } );
} );

// Banzai Spawned New Shenzi
banzai_socket.on( 'shenzi moved', function( ){
LOGGER.debug( 'Shenzi has moved' );
//TODO: handle when shenzi comes up somewhere else
} );
}

/*********************************************************************************************************************/
/* Utility */
Expand Down
43 changes: 35 additions & 8 deletions lib/shenzi-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ function Shenzi( ){
if( next_index < job.joules.length ){
LOGGER.debug( 'Stepping Job ' + job.name + ' at joule #' + joule_index + '.' );

var worker = connection_manager.getEd( );
var prev_joule_type = ( joule_index === -1 ? 'default' : job.joules[ joule_index ].type );
var prev_joule_name = ( joule_index === -1 ? '' : job.joules[ joule_index ].name );
var next_joule_type = job.joules[ next_index ].type;
var next_joule_name = job.joules[ next_index ].name;
var joule = job.joules[ next_index ];

// TODO: move out of here we shouldn't be doing it every time we step.
Expand All @@ -35,6 +36,12 @@ function Shenzi( ){
throw new Error( 'Can not have an o2m joule immediatly followed by an m2o joule.' );
}

if( next_joule_name === 'key2loc' ){
LOGGER.debug( 'key2loc Joule being called' );
stepJob( job_id, next_index, result, input_key );
return;
}

if( prev_joule_type === 'o2m' ){
LOGGER.debug( 'Just Completed an o2m joule.' );

Expand All @@ -46,21 +53,39 @@ function Shenzi( ){
return result.hasOwnProperty( key );
} ).forEach( function( key ){

job.parallel.push( key );
var worker = null;
var input = null;
var input_key = null;

if( prev_joule_name === 'key2loc' ){
console.log( result );
var ip = result[ key ][ Math.floor( Math.random() * result[ key ].length ) ];
worker = connection_manager.getEd( ip );
input = key;
input_key = key;
} else {
worker = connection_manager.getEd( );
input = result[ key ];
input_key = key;
}

job.parallel.push( input_key );

worker.socket.emit(
'joule',
{
joule: joule,
job_id: job_id,
input: result[ key ],
input_key: ( input_key === '' ? key : key /* TODO: input_key + ':' + key */ )
input: input,
input_key: input_key
}
);
} );
return;
}

var worker = connection_manager.getEd( );

if( next_joule_type === 'm2o' ){
LOGGER.debug( 'About to Start an m2o joule.' );

Expand Down Expand Up @@ -98,10 +123,12 @@ function Shenzi( ){

LOGGER.debug( ip_address + ' # ' + 'Recieved connection.' );

socket.on( 'register', function( data ){
socket.on( 'register', function( data, callback ){
LOGGER.info( ip_address + ' # ' + 'Registration request' );
try{
connection_manager.register( ip_address, data, socket );
if( callback )
callback( ip_address );
} catch ( exception ){
LOGGER.warn( ip_address + ' # ' + exception.message );
socket.emit( 'err', exception.message );
Expand All @@ -120,10 +147,10 @@ function Shenzi( ){
compiled_job.push( joule );
break;
case 'saved-function':
compiled_job.push( {} /* TODO */ );
compiled_job.push( joule /* TODO */ );
break;
case 'named':
compiled_job.push( {} /* TODO */ );
compiled_job.push( joule /* TODO */ );
break;
default:
LOGGER.error( 'Unrecognized Joule deployment type ' + joule.deploy + '.' );
Expand All @@ -139,7 +166,7 @@ function Shenzi( ){
job.id = job_counter++;

LOGGER.info( 'Starting job "' + job.name + '" with id ' + job.id + '.' );
console.log( job );
console.log( JSON.stringify( job, null, '\t' ) );

job.start = new Date();
job.client = socket;
Expand Down

0 comments on commit ff632c5

Please sign in to comment.