diff --git a/lib/banzai-server.js b/lib/banzai-server.js index eff3126..0d23c6b 100644 --- a/lib/banzai-server.js +++ b/lib/banzai-server.js @@ -94,7 +94,13 @@ function Banzai( ){ } ); socket.on( 'getLocationsOf', function( key, callback ){ - callback( data_director.getLocations( key ) ); + callback( + data_director.getLocations( key ) + .map( function( banzai_loc ){ + console.log( connection_manager.getEd( banzai_loc ) ); + return connection_manager.getEd( banzai_loc ).shenzi_ip; + } ) + ); } ); socket.on( 'getKeysUnder', function( key, callback ){ diff --git a/lib/connection-manager.js b/lib/connection-manager.js index fd6a550..8a5e28f 100644 --- a/lib/connection-manager.js +++ b/lib/connection-manager.js @@ -50,7 +50,8 @@ function ConnectionManager( ){ platform: data.platform, socket: socket, ip_address: ip_address, - state: 'connected' + state: 'connected', + shenzi_ip: data.shenzi_ip }; LOGGER.info( 'Registered new ' + data.type + ' @ ' + ip_address + ' : ' + JSON.stringify( @@ -79,14 +80,16 @@ function ConnectionManager( ){ return undefined; }; - this.getEd = function( ){ + this.getEd = function( ip_address ){ // TODO: make smarter then random - var keys = Object.keys( nodes.ed ).filter( function( ip_address ){ - return ( nodes.ed[ ip_address ].state === 'connected' ); - } ); + if( ip_address === undefined ){ + var keys = Object.keys( nodes.ed ).filter( function( ip_address ){ + return ( nodes.ed[ ip_address ].state === 'connected' ); + } ); - var ip_address = keys[ Math.floor( Math.random() * keys.length ) ]; + ip_address = keys[ Math.floor( Math.random() * keys.length ) ]; + } return nodes.ed[ ip_address ]; }; diff --git a/lib/ed-server.js b/lib/ed-server.js index 8a50915..4f8a12a 100644 --- a/lib/ed-server.js +++ b/lib/ed-server.js @@ -9,9 +9,10 @@ var config = require( './utils/config' ); function Ed( ){ var shenzi_socket = require( 'socket.io-client' )( 'http://' + config.get( 'shenzi.host' ) + ':' + config.get( 'shenzi.port' ), config.get( 'ed.connection.options' ) ); - var banzai_socket = require( 'socket.io-client' )( 'http://' + config.get( 'banzai.host' ) + ':' + config.get( 'banzai.port' ), config.get( 'ed.connection.options' ) ); + var ed_shenzi_socket_address = ''; + var banzai_socket = null; var data_manager = new DataManager( ); - var file_system = new FileSystem( data_manager, banzai_socket ); + var file_system = null; /*********************************************************************************************************************/ /* Shenzi Communication */ @@ -28,6 +29,11 @@ function Ed( ){ platform: process.platform, memory: os.totalmem(), cpus: os.cpus() + }, function( location ){ + ed_shenzi_socket_address = location; + banzai_socket = require( 'socket.io-client' )( 'http://' + config.get( 'banzai.host' ) + ':' + config.get( 'banzai.port' ), config.get( 'ed.connection.options' ) ); + file_system = new FileSystem( data_manager, banzai_socket ); + setupBanzai(); } ); } ); @@ -53,6 +59,10 @@ function Ed( ){ LOGGER.debug( JSON.stringify( step, null, '\t' ) ); switch( joule.deploy ){ + case 'named': + // TODO + case 'saved-function': + //TODO case 'anonymous': LOGGER.debug( 'Running Anonymous Joule.' ); LOGGER.debug( joule.func ); @@ -70,9 +80,6 @@ function Ed( ){ vm.runInContext( '(' + joule.func + ')()', context, joule.name + '.vm' ); break; - case 'named': - // TODO - break; default: shenzi_socket.emit( 'joule:complete', { success: false } ); shenzi_socket.emit( 'err', { 'type': 'joule', 'msg': 'Can not run Joule of deployment type ' + joule.deploy + '.' } ); @@ -85,81 +92,84 @@ function Ed( ){ /* Banzai Communication */ /*********************************************************************************************************************/ - banzai_socket.on( 'connect', function( ){ - LOGGER.debug( 'Established Connection to Banzai.' ); - - banzai_socket.emit( 'register', { - type: 'ed', - engine: 'nodejs', - datakeys: data_manager.listKeys( ) + function setupBanzai(){ + banzai_socket.on( 'connect', function( ){ + LOGGER.debug( 'Established Connection to Banzai.' ); + + banzai_socket.emit( 'register', { + type: 'ed', + engine: 'nodejs', + datakeys: data_manager.listKeys( ), + shenzi_ip: ed_shenzi_socket_address + } ); } ); - } ); - - //Banzai Error Handler - banzai_socket.on( 'err', function( msg ){ - LOGGER.error( msg ); - } ); - - // Banzai Disconnect Handler - banzai_socket.on( 'disconnect', function( ){ - LOGGER.warn( 'Disconnection' ); - } ); - - banzai_socket.on( 'hold-data', function( metadata ){ - LOGGER.debug( 'Recieved Hold Data Request: ' + metadata.key ); - switch( metadata.type ){ - case 'text': - data_manager.store( metadata.key, metadata.value, function( error ){ - if( error ){ - LOGGER.error( 'Failed to hold data for key ' + metadata.key + ' with type ' + metadata.type + '.' ); - banzai_socket.emit( 'hold-confirmation', { success: false, key: metadata.key, message: error } ); - } else { - banzai_socket.emit( 'hold-confirmation', { success: true, key: metadata.key, } ); - } - } ); - break; - default: - LOGGER.error( 'Unrecognized data type: ' + metadata.type ); - banzai_socket.emit( 'hold-confirmation', { success: false, key: metadata.key, message: 'Unrecognized data type: ' + metadata.type } ); - banzai_socket.emit( 'err', { message: 'Unrecognized data type: ' + metadata.type } ); - break; - } - } ); - - banzai_socket.on( 'lose-data', function( metadata ){ - LOGGER.debug( 'Recieved Lose Data Request: ' + metadata.key ); - data_manager.lose( metadata.key, function( error ){ - if( error ){ - LOGGER.error( 'Failed to lose data for key ' + metadata.key + '.' ); - banzai_socket.emit( 'lose-confirmation', { success: false, key: metadata.key, message: error } ); - } else { - banzai_socket.emit( 'lose-confirmation', { success: true, key: metadata.key } ); - } + + //Banzai Error Handler + banzai_socket.on( 'err', function( msg ){ + LOGGER.error( msg ); } ); - } ); - - banzai_socket.on( 'get-data', function( metadata ){ - LOGGER.debug( 'Recieved Get Data Request: ' + metadata.key ); - data_manager.get( metadata.key, function( error, data ){ - if( error ){ - LOGGER.error( 'Failed to get data for key ' + metadata.key + '.' ); - banzai_socket.emit( 'get-confirmation', { success: false, key: metadata.key, message: error } ); - } else { - banzai_socket.emit( 'get-confirmation', { success: true, key: metadata.key, data: data.toString() } ); + + // Banzai Disconnect Handler + banzai_socket.on( 'disconnect', function( ){ + LOGGER.warn( 'Disconnection' ); + } ); + + banzai_socket.on( 'hold-data', function( metadata ){ + LOGGER.debug( 'Recieved Hold Data Request: ' + metadata.key ); + switch( metadata.type ){ + case 'text': + data_manager.store( metadata.key, metadata.value, function( error ){ + if( error ){ + LOGGER.error( 'Failed to hold data for key ' + metadata.key + ' with type ' + metadata.type + '.' ); + banzai_socket.emit( 'hold-confirmation', { success: false, key: metadata.key, message: error } ); + } else { + banzai_socket.emit( 'hold-confirmation', { success: true, key: metadata.key, } ); + } + } ); + break; + default: + LOGGER.error( 'Unrecognized data type: ' + metadata.type ); + banzai_socket.emit( 'hold-confirmation', { success: false, key: metadata.key, message: 'Unrecognized data type: ' + metadata.type } ); + banzai_socket.emit( 'err', { message: 'Unrecognized data type: ' + metadata.type } ); + break; } } ); - } ); - - banzai_socket.on( 'list-data', function( ){ - LOGGER.debug( 'Recieved List Data Request' ); - banzai_socket.emit( 'list-confirmation', { success: true, keys: data_manager.listKeys( ) } ); - } ); - - // Banzai Spawned New Shenzi - banzai_socket.on( 'shenzi moved', function( ){ - LOGGER.debug( 'Shenzi has moved' ); - //TODO: handle when shenzi comes up somewhere else - } ); + + banzai_socket.on( 'lose-data', function( metadata ){ + LOGGER.debug( 'Recieved Lose Data Request: ' + metadata.key ); + data_manager.lose( metadata.key, function( error ){ + if( error ){ + LOGGER.error( 'Failed to lose data for key ' + metadata.key + '.' ); + banzai_socket.emit( 'lose-confirmation', { success: false, key: metadata.key, message: error } ); + } else { + banzai_socket.emit( 'lose-confirmation', { success: true, key: metadata.key } ); + } + } ); + } ); + + banzai_socket.on( 'get-data', function( metadata ){ + LOGGER.debug( 'Recieved Get Data Request: ' + metadata.key ); + data_manager.get( metadata.key, function( error, data ){ + if( error ){ + LOGGER.error( 'Failed to get data for key ' + metadata.key + '.' ); + banzai_socket.emit( 'get-confirmation', { success: false, key: metadata.key, message: error } ); + } else { + banzai_socket.emit( 'get-confirmation', { success: true, key: metadata.key, data: data.toString() } ); + } + } ); + } ); + + banzai_socket.on( 'list-data', function( ){ + LOGGER.debug( 'Recieved List Data Request' ); + banzai_socket.emit( 'list-confirmation', { success: true, keys: data_manager.listKeys( ) } ); + } ); + + // Banzai Spawned New Shenzi + banzai_socket.on( 'shenzi moved', function( ){ + LOGGER.debug( 'Shenzi has moved' ); + //TODO: handle when shenzi comes up somewhere else + } ); + } /*********************************************************************************************************************/ /* Utility */ diff --git a/lib/shenzi-server.js b/lib/shenzi-server.js index 637fa05..d76176b 100644 --- a/lib/shenzi-server.js +++ b/lib/shenzi-server.js @@ -22,9 +22,10 @@ function Shenzi( ){ if( next_index < job.joules.length ){ LOGGER.debug( 'Stepping Job ' + job.name + ' at joule #' + joule_index + '.' ); - var worker = connection_manager.getEd( ); var prev_joule_type = ( joule_index === -1 ? 'default' : job.joules[ joule_index ].type ); + var prev_joule_name = ( joule_index === -1 ? '' : job.joules[ joule_index ].name ); var next_joule_type = job.joules[ next_index ].type; + var next_joule_name = job.joules[ next_index ].name; var joule = job.joules[ next_index ]; // TODO: move out of here we shouldn't be doing it every time we step. @@ -35,6 +36,12 @@ function Shenzi( ){ throw new Error( 'Can not have an o2m joule immediatly followed by an m2o joule.' ); } + if( next_joule_name === 'key2loc' ){ + LOGGER.debug( 'key2loc Joule being called' ); + stepJob( job_id, next_index, result, input_key ); + return; + } + if( prev_joule_type === 'o2m' ){ LOGGER.debug( 'Just Completed an o2m joule.' ); @@ -46,21 +53,39 @@ function Shenzi( ){ return result.hasOwnProperty( key ); } ).forEach( function( key ){ - job.parallel.push( key ); + var worker = null; + var input = null; + var input_key = null; + + if( prev_joule_name === 'key2loc' ){ + console.log( result ); + var ip = result[ key ][ Math.floor( Math.random() * result[ key ].length ) ]; + worker = connection_manager.getEd( ip ); + input = key; + input_key = key; + } else { + worker = connection_manager.getEd( ); + input = result[ key ]; + input_key = key; + } + + job.parallel.push( input_key ); worker.socket.emit( 'joule', { joule: joule, job_id: job_id, - input: result[ key ], - input_key: ( input_key === '' ? key : key /* TODO: input_key + ':' + key */ ) + input: input, + input_key: input_key } ); } ); return; } + var worker = connection_manager.getEd( ); + if( next_joule_type === 'm2o' ){ LOGGER.debug( 'About to Start an m2o joule.' ); @@ -98,10 +123,12 @@ function Shenzi( ){ LOGGER.debug( ip_address + ' # ' + 'Recieved connection.' ); - socket.on( 'register', function( data ){ + socket.on( 'register', function( data, callback ){ LOGGER.info( ip_address + ' # ' + 'Registration request' ); try{ connection_manager.register( ip_address, data, socket ); + if( callback ) + callback( ip_address ); } catch ( exception ){ LOGGER.warn( ip_address + ' # ' + exception.message ); socket.emit( 'err', exception.message ); @@ -120,10 +147,10 @@ function Shenzi( ){ compiled_job.push( joule ); break; case 'saved-function': - compiled_job.push( {} /* TODO */ ); + compiled_job.push( joule /* TODO */ ); break; case 'named': - compiled_job.push( {} /* TODO */ ); + compiled_job.push( joule /* TODO */ ); break; default: LOGGER.error( 'Unrecognized Joule deployment type ' + joule.deploy + '.' ); @@ -139,7 +166,7 @@ function Shenzi( ){ job.id = job_counter++; LOGGER.info( 'Starting job "' + job.name + '" with id ' + job.id + '.' ); - console.log( job ); + console.log( JSON.stringify( job, null, '\t' ) ); job.start = new Date(); job.client = socket; diff --git a/tests/ed-server-tests.js b/tests/ed-server-tests.js index c1c6945..fa220d5 100644 --- a/tests/ed-server-tests.js +++ b/tests/ed-server-tests.js @@ -83,8 +83,14 @@ module.exports.initialConnectionTests = { } - test_shenzi.on( 'connection', function( /* socket */ ){ + test_shenzi.on( 'connection', function( socket ){ unit.ok( true, 'Connected to Shenzi.' ); + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + stageCompleted( ); } ); @@ -130,7 +136,9 @@ module.exports.initialConnectionTests = { unit.ok( true, 'Connected to Shenzi.' ); stageCompleted( ); - socket.on( 'register', function( /* data */ ){ + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); unit.ok( true, 'Registered with Shenzi.' ); stageCompleted( ); } ); @@ -187,7 +195,9 @@ module.exports.initialConnectionTests = { unit.ok( true, 'Connected to Shenzi.' ); stageCompleted( ); - socket.on( 'register', function( /* data */ ){ + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); unit.ok( true, 'Registered with Shenzi.' ); stageCompleted( ); } ); @@ -219,8 +229,13 @@ module.exports.stopTests = { var test_banzai = new Server( ); var ed_server = null; - test_shenzi.on( 'connection', function( /* socket */ ){ + test_shenzi.on( 'connection', function( socket ){ unit.ok( true, 'Connected to Shenzi.' ); + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); } ); test_banzai.on( 'connection', function( /* socket */ ){ @@ -260,7 +275,9 @@ module.exports.processJouleTests = { test_shenzi.on( 'connection', function( socket ){ unit.ok( true, 'Connected to Shenzi.' ); - socket.on( 'register', function( /* data */ ){ + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); unit.ok( true, 'Registered with Shenzi.' ); socket.emit( 'joule', { @@ -313,7 +330,9 @@ module.exports.processJouleTests = { test_shenzi.on( 'connection', function( socket ){ unit.ok( true, 'Connected to Shenzi.' ); - socket.on( 'register', function( /* data */ ){ + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); unit.ok( true, 'Registered with Shenzi.' ); socket.emit( 'joule', { @@ -357,7 +376,9 @@ module.exports.processJouleTests = { test_shenzi.on( 'connection', function( socket ){ unit.ok( true, 'Connected to Shenzi.' ); - socket.on( 'register', function( /* data */ ){ + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); unit.ok( true, 'Registered with Shenzi.' ); socket.emit( 'joule', { @@ -415,14 +436,14 @@ module.exports.listDataTests = { listDataWhenUnInitialized: function( unit ){ unit.expect(5); - //var test_shenzi = new Server( ); + var test_shenzi = new Server( ); var test_banzai = new Server( ); var ed_server = null; var timeout = setTimeout( function( ){ unit.ok( 'false' ); ed_server.stop( ); - //test_shenzi.close( ); + test_shenzi.close( ); test_banzai.close( ); unit.done( ); }, 5000 ); @@ -432,21 +453,19 @@ module.exports.listDataTests = { if( --expected_number_of_completions <= 0 ){ clearTimeout( timeout ); ed_server.stop( ); - //test_shenzi.close( ); + test_shenzi.close( ); test_banzai.close( ); unit.done(); } } -// test_shenzi.on( 'connection', function( socket ){ -// unit.ok( true, 'Connected to Shenzi.' ); -// stageCompleted( ); -// -// socket.on( 'register', function( /* data */ ){ -// unit.ok( true, 'Registered with Shenzi.' ); -// stageCompleted( ); -// } ); -// } ); + test_shenzi.on( 'connection', function( socket ){ + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + } ); var banzai_connected = false; test_banzai.on( 'connection', function( socket ){ @@ -472,7 +491,7 @@ module.exports.listDataTests = { } ); } ); - //test_shenzi.listen( 2102 ); + test_shenzi.listen( 2102 ); test_banzai.listen( 2103 ); ed_server = new Ed( ); @@ -485,14 +504,14 @@ module.exports.listDataTests = { fs.writeFileSync( path.join( data_root, 'json' ), '{ "something": false, "else": 1 }' ); fs.writeFileSync( path.join( data_root, 'nested.file' ), 'Cool' ); - //var test_shenzi = new Server( ); + var test_shenzi = new Server( ); var test_banzai = new Server( ); var ed_server = null; var timeout = setTimeout( function( ){ unit.ok( 'false' ); ed_server.stop( ); - //test_shenzi.close( ); + test_shenzi.close( ); test_banzai.close( ); unit.done( ); }, 5000 ); @@ -502,21 +521,19 @@ module.exports.listDataTests = { if( --expected_number_of_completions <= 0 ){ clearTimeout( timeout ); ed_server.stop( ); - //test_shenzi.close( ); + test_shenzi.close( ); test_banzai.close( ); unit.done(); } } -// test_shenzi.on( 'connection', function( socket ){ -// unit.ok( true, 'Connected to Shenzi.' ); -// stageCompleted( ); -// -// socket.on( 'register', function( /* data */ ){ -// unit.ok( true, 'Registered with Shenzi.' ); -// stageCompleted( ); -// } ); -// } ); + test_shenzi.on( 'connection', function( socket ){ + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + } ); var banzai_connected = false; test_banzai.on( 'connection', function( socket ){ @@ -542,7 +559,7 @@ module.exports.listDataTests = { } ); } ); - //test_shenzi.listen( 2102 ); + test_shenzi.listen( 2102 ); test_banzai.listen( 2103 ); ed_server = new Ed( ); @@ -557,12 +574,14 @@ module.exports.getDataTests = { fs.writeFileSync( path.join( data_root, 'json' ), '{ "something": false, "else": 1 }' ); fs.writeFileSync( path.join( data_root, 'nested.file' ), 'Cool' ); + var test_shenzi = new Server( ); var test_banzai = new Server( ); var ed_server = null; var timeout = setTimeout( function( ){ unit.ok( 'false' ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done( ); }, 10000 ); @@ -571,12 +590,21 @@ module.exports.getDataTests = { function stageCompleted( ){ if( --expected_number_of_completions <= 0 ){ clearTimeout( timeout ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done(); } } + test_shenzi.on( 'connection', function( socket ){ + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + } ); + var banzai_connected = false; test_banzai.on( 'connection', function( socket ){ if( !banzai_connected ){ @@ -602,6 +630,7 @@ module.exports.getDataTests = { } ); } ); + test_shenzi.listen( 2102 ); test_banzai.listen( 2103 ); ed_server = new Ed( ); @@ -614,12 +643,14 @@ module.exports.getDataTests = { fs.writeFileSync( path.join( data_root, 'json' ), '{ "something": false, "else": 1 }' ); fs.writeFileSync( path.join( data_root, 'nested.file' ), 'Cool' ); + var test_shenzi = new Server( ); var test_banzai = new Server( ); var ed_server = null; var timeout = setTimeout( function( ){ unit.ok( 'false' ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done( ); }, 10000 ); @@ -628,12 +659,21 @@ module.exports.getDataTests = { function stageCompleted( ){ if( --expected_number_of_completions <= 0 ){ clearTimeout( timeout ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done(); } } + test_shenzi.on( 'connection', function( socket ){ + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + } ); + var banzai_connected = false; test_banzai.on( 'connection', function( socket ){ if( !banzai_connected ){ @@ -675,6 +715,7 @@ module.exports.getDataTests = { } ); } ); + test_shenzi.listen( 2102 ); test_banzai.listen( 2103 ); ed_server = new Ed( ); @@ -687,12 +728,14 @@ module.exports.getDataTests = { fs.writeFileSync( path.join( data_root, 'json' ), '{ "something": false, "else": 1 }' ); fs.writeFileSync( path.join( data_root, 'nested.file' ), 'Cool' ); + var test_shenzi = new Server( ); var test_banzai = new Server( ); var ed_server = null; var timeout = setTimeout( function( ){ unit.ok( 'false' ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done( ); }, 10000 ); @@ -701,12 +744,21 @@ module.exports.getDataTests = { function stageCompleted( ){ if( --expected_number_of_completions <= 0 ){ clearTimeout( timeout ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done(); } } + test_shenzi.on( 'connection', function( socket ){ + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + } ); + var banzai_connected = false; test_banzai.on( 'connection', function( socket ){ if( !banzai_connected ){ @@ -731,6 +783,7 @@ module.exports.getDataTests = { } ); } ); + test_shenzi.listen( 2102 ); test_banzai.listen( 2103 ); ed_server = new Ed( ); @@ -745,12 +798,14 @@ module.exports.loseDataTests = { fs.writeFileSync( path.join( data_root, 'json' ), '{ "something": false, "else": 1 }' ); fs.writeFileSync( path.join( data_root, 'nested.file' ), 'Cool' ); + var test_shenzi = new Server( ); var test_banzai = new Server( ); var ed_server = null; var timeout = setTimeout( function( ){ unit.ok( 'false' ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done( ); }, 10000 ); @@ -759,12 +814,21 @@ module.exports.loseDataTests = { function stageCompleted( ){ if( --expected_number_of_completions <= 0 ){ clearTimeout( timeout ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done(); } } + test_shenzi.on( 'connection', function( socket ){ + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + } ); + var banzai_connected = false; test_banzai.on( 'connection', function( socket ){ if( !banzai_connected ){ @@ -798,6 +862,7 @@ module.exports.loseDataTests = { } ); } ); + test_shenzi.listen( 2102 ); test_banzai.listen( 2103 ); ed_server = new Ed( ); @@ -810,12 +875,14 @@ module.exports.loseDataTests = { fs.writeFileSync( path.join( data_root, 'json' ), '{ "something": false, "else": 1 }' ); fs.writeFileSync( path.join( data_root, 'nested.file' ), 'Cool' ); + var test_shenzi = new Server( ); var test_banzai = new Server( ); var ed_server = null; var timeout = setTimeout( function( ){ unit.ok( 'false' ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done( ); }, 10000 ); @@ -824,12 +891,21 @@ module.exports.loseDataTests = { function stageCompleted( ){ if( --expected_number_of_completions <= 0 ){ clearTimeout( timeout ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done(); } } + test_shenzi.on( 'connection', function( socket ){ + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + } ); + var banzai_connected = false; test_banzai.on( 'connection', function( socket ){ if( !banzai_connected ){ @@ -863,6 +939,7 @@ module.exports.loseDataTests = { } ); } ); + test_shenzi.listen( 2102 ); test_banzai.listen( 2103 ); ed_server = new Ed( ); @@ -877,12 +954,14 @@ module.exports.holdDataTests = { fs.writeFileSync( path.join( data_root, 'json' ), '{ "something": false, "else": 1 }' ); fs.writeFileSync( path.join( data_root, 'nested.file' ), 'Cool' ); + var test_shenzi = new Server( ); var test_banzai = new Server( ); var ed_server = null; var timeout = setTimeout( function( ){ unit.ok( 'false' ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done( ); }, 10000 ); @@ -891,12 +970,21 @@ module.exports.holdDataTests = { function stageCompleted( ){ if( --expected_number_of_completions <= 0 ){ clearTimeout( timeout ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done(); } } + test_shenzi.on( 'connection', function( socket ){ + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + } ); + var banzai_connected = false; test_banzai.on( 'connection', function( socket ){ if( !banzai_connected ){ @@ -930,6 +1018,7 @@ module.exports.holdDataTests = { } ); } ); + test_shenzi.listen( 2102 ); test_banzai.listen( 2103 ); ed_server = new Ed( ); @@ -942,12 +1031,14 @@ module.exports.holdDataTests = { fs.writeFileSync( path.join( data_root, 'json' ), '{ "something": false, "else": 1 }' ); fs.writeFileSync( path.join( data_root, 'nested.file' ), 'Cool' ); + var test_shenzi = new Server( ); var test_banzai = new Server( ); var ed_server = null; var timeout = setTimeout( function( ){ unit.ok( 'false' ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done( ); }, 10000 ); @@ -956,12 +1047,21 @@ module.exports.holdDataTests = { function stageCompleted( ){ if( --expected_number_of_completions <= 0 ){ clearTimeout( timeout ); - ed_server.stop( ); + ed_server.stop( ); + test_shenzi.close( ); test_banzai.close( ); unit.done(); } } + test_shenzi.on( 'connection', function( socket ){ + + socket.on( 'register', function( data, callback ){ + data = data; + callback( '' ); + } ); + } ); + var banzai_connected = false; test_banzai.on( 'connection', function( socket ){ if( !banzai_connected ){ @@ -1001,6 +1101,7 @@ module.exports.holdDataTests = { } ); } ); + test_shenzi.listen( 2102 ); test_banzai.listen( 2103 ); ed_server = new Ed( ); diff --git a/tests/local-integration-tests.js b/tests/local-integration-tests.js index ea22389..38389c7 100644 --- a/tests/local-integration-tests.js +++ b/tests/local-integration-tests.js @@ -27,6 +27,7 @@ * - Finish the current test function, and move on to the next. ALL tests should call this! */ +var fs = require( 'fs' ); var path = require( 'path' ); var config = require( '../lib/utils/config' ); config.reinitialize( path.resolve( __dirname, './config/temptest.config.json' ) ); @@ -378,124 +379,135 @@ module.exports.fileSystemTests = { } }; -//module.exports.wordCountExampleTest = function( unit ){ -// -// var Crocuta = require( '../lib/client' ); -// var crocuta = new Crocuta( ); -// -// // Timeout is not part of example. -// var timeout = setTimeout( function( ){ -// unit.ok( false, 'Test Timed Out.' ); -// crocuta.stop(); -// unit.done(); -// }, 10000 ); -// // End timeout -// -// //crocuta.uploadFile( /*key*/, /*file_location*/, /*force*/, /*callback*/ ); -// //crocuta.uploadDir( /*key*/, /*dir_location*/, /*recursive*/, /*force*/, /*callback*/ ); -// //crocuta.upload( /*key*/, /*contents*/, /*force*/, /*callback*/ ); -// //crocuta.download( /*key*/, /*callback*/ ); -// //crocuta.delete( /*key*/, /*force*/, /*callback*/ ); -// // + sync versions of all the previous -// -// var job = crocuta.createJob( 'word count' ) -// .joule( function( input /*, output, reporter */ ){ -// console.log( arguments ); -// return fileSystem.getKeysUnder( input ); -// }, 'directory contents' ) -// .joule( function( input /*, output, reporter */ ){ -// console.log( arguments ); -// var mapping = {}; -// input.forEach( function( key ){ -// mapping[ key ] = fileSystem.getLocationsOf( key ); -// } ); -// return mapping; -// }, 'file mapper' ) -// .o2mjoule( 'direct key to server' ) // Proprietary Joule. -// .joule( function( input /*, output, reporter */ ){ -// console.log( arguments ); -// return fileSystem.readSync( input ).split( ' ' ); -// }, 'content splitter' ) -// .m2ojoule( function( inputs /*, output, reporter */ ){ -// console.log( arguments ); -// var result = { }; -// var keys = Object.keys( inputs ); -// keys.forEach( function( key ){ -// inputs[ key ].forEach( function( input ){ -// result[ input ] = ( result[ input ] === undefined ? 1 : result[ input ] + 1 ); -// } ); -// } ); -// return result; -// }, 'word counter' ); -// -// var files_to_upload = 2; -// function file_uploaded( ){ -// if( --files_to_upload <= 0 ){ -// unit.ok( true, 'Finished uploading all files.' ); -// -// job.send( function( err, job ){ -// unit.ok( !err, 'No error on send' ); -// -// job.start( -// 'input', -// function( err, result ){ -// unit.deepEqual( -// result, -// { -// 'Hello': 1, -// 'World': 2, -// 'Bye': 1, -// 'Crocuta': 2, -// 'Goodbye': 1 -// }, -// 'Job returned correct result.' -// ); -// clearTimeout( timeout ); -// crocuta.stop(); -// unit.done( ); -// } -// ); -// } ); -// -// } -// } -// -// crocuta.onReady( function( ){ -// unit.ok( true, 'onReady fired.' ); -// -// crocuta.upload( -// 'input.file0', -// fs.readFileSync( path.resolve( __dirname, '../examples/word-count/input/input-000.txt' ) ), -// function( err ){ -// if( err ){ -// unit.ok( false, 'Upload failed' ); -// clearTimeout( timeout ); -// console.log( err ); -// unit.done(); -// } else { -// unit.ok( true, 'Finished uploading file input-000.txt.' ); -// file_uploaded(); -// } -// -// } -// ); -// -// crocuta.upload( -// 'input.file1', -// fs.readFileSync( path.resolve( __dirname, '../examples/word-count/input/input-001.txt' ) ), -// function( err ){ -// if( err ){ -// unit.ok( false, 'Upload failed' ); -// clearTimeout( timeout ); -// console.log( err ); -// unit.done(); -// } else { -// unit.ok( true, 'Finished uploading file input-001.txt.' ); -// file_uploaded(); -// } -// -// } -// ); -// } ); -//}; +module.exports.wordCountExampleTest = function( unit ){ + + var Crocuta = require( '../lib/client' ); + var crocuta = new Crocuta( ); + + // Timeout is not part of example. + var timeout = setTimeout( function( ){ + unit.ok( false, 'Test Timed Out.' ); + crocuta.stop(); + unit.done(); + }, 10000 ); + // End timeout + + //crocuta.uploadFile( /*key*/, /*file_location*/, /*force*/, /*callback*/ ); + //crocuta.uploadDir( /*key*/, /*dir_location*/, /*recursive*/, /*force*/, /*callback*/ ); + //crocuta.upload( /*key*/, /*contents*/, /*force*/, /*callback*/ ); + //crocuta.download( /*key*/, /*callback*/ ); + //crocuta.delete( /*key*/, /*force*/, /*callback*/ ); + // + sync versions of all the previous + + var job = crocuta.createJob( 'word count' ) + .joule( function( ){ + fileSystem.getKeysUnder( input, function( keys ){ + done( keys.map( function( key ){ + return input + '.' + key; + } ) ); + } ); + }, 'directory contents' ) + .joule( function( ){ + var mapping = {}; + var count = input.length; + + function callback( key, locations ){ + mapping[ key ] = locations; + if( --count <= 0 ){ + done( mapping ); + } + } + + input.forEach( function( key ){ + fileSystem.getLocationsOf( key, function( locations ){ + callback( key, locations ); + } ); + } ); + }, 'file mapper' ) + .o2mjoule( 'key2loc' ) // Proprietary Joule. + .joule( function( ){ + done( fileSystem.readSync( input ).toString().split( ' ' ) ); + }, 'content splitter' ) + .m2ojoule( function( ){ + var result = { }; + var keys = Object.keys( input ); + keys.forEach( function( key ){ + input[ key ].forEach( function( input ){ + result[ input ] = ( result[ input ] === undefined ? 1 : result[ input ] + 1 ); + } ); + } ); + done( result ); + }, 'word counter' ); + + + var files_to_upload = 2; + function file_uploaded( ){ + if( --files_to_upload <= 0 ){ + unit.ok( true, 'Finished uploading all files.' ); + + job.send( function( err, job ){ + unit.ok( !err, 'No error on send' ); + + job.start( + 'input', + function( err, result ){ + unit.deepEqual( + result, + { + 'Hello': 2, + 'World': 2, + 'Bye': 1, + 'Crocuta': 2, + 'Goodbye': 1 + }, + 'Job returned correct result.' + ); + clearTimeout( timeout ); + crocuta.stop(); + unit.done( ); + } + ); + } ); + + } + } + + crocuta.onReady( function( ){ + unit.ok( true, 'onReady fired.' ); + + crocuta.upload( + 'input.file0', + fs.readFileSync( path.resolve( __dirname, '../examples/word-count/input/input-000.txt' ) ), + function( err ){ + if( err ){ + unit.ok( false, 'Upload failed' ); + clearTimeout( timeout ); + console.log( err ); + unit.done(); + } else { + unit.ok( true, 'Finished uploading file input-000.txt.' ); + file_uploaded(); + } + + } + ); + + crocuta.upload( + 'input.file1', + fs.readFileSync( path.resolve( __dirname, '../examples/word-count/input/input-001.txt' ) ), + function( err ){ + if( err ){ + unit.ok( false, 'Upload failed' ); + clearTimeout( timeout ); + console.log( err ); + unit.done(); + } else { + unit.ok( true, 'Finished uploading file input-001.txt.' ); + file_uploaded(); + } + + } + ); + } ); +};