Skip to content

Commit

Permalink
Support parse error (JSON.parse try/catch)
Browse files Browse the repository at this point in the history
  • Loading branch information
FGRibreau committed Sep 17, 2012
1 parent 809df5f commit 4e0eb8a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -9,3 +9,4 @@ dump.json
# mac
DS_STORE
todo.txt
.nide
1 change: 1 addition & 0 deletions .npmignore
@@ -0,0 +1 @@
.nide
33 changes: 26 additions & 7 deletions bin/amqp-tool
Expand Up @@ -23,8 +23,8 @@ var VERSION = '0.0.3'
.demand('queue')
.describe('passive', 'set it to true if the queue already exist')
.default('passive', true)
.describe('durable', 'if specified the queue will survive a borker restart')
.describe('autoDelete', 'if specified the queue will be deleted when there is no more subscriptions')
.describe('durable', 'if specified the queue will survive a broker restart')
.describe('autoDelete', 'if specified the queue will be deleted when there are no more subscriptions')
.describe('export', 'export [filename], export queue\'s content to filename')
.describe('import', 'import [filename], export file content into the queue')
.describe('count', 'limit the number of message to export/import')
Expand Down Expand Up @@ -54,10 +54,11 @@ if(!argv.import && !argv.export){
}

// Toolbox
Function.prototype.curry = Function.prototype.curry || function(arg){
var fn = this;
Function.prototype.curry = Function.prototype.curry || function(/* args */){
var args = Array.prototype.slice.call(arguments)
, fn = this;
return function(){
return fn.apply(this, [arg].concat(Array.prototype.slice.call(arguments)));
return fn.apply(this, args.concat(Array.prototype.slice.call(arguments)));
}
}

Expand All @@ -84,6 +85,13 @@ function importQueue(conn, exchange, streamController){
function startImport(_stream){
stream = _stream;


process.on('SIGINT', streamController.close.curry(stream, function(){
process.exit(0);
}));



// When no more data come from stdin or from the file
stream.once('end', stopImport);

Expand All @@ -96,7 +104,13 @@ function importQueue(conn, exchange, streamController){

function importMsg(msg){
// N33D ES6 : {message, header, deliveryInfo} = JSON.parse(msg);
var message = JSON.parse(msg);
var message;
try{
message = JSON.parse(msg);
} catch(err){
console.error("JSON PARSE ERROR", msg, err);
return;
}

exchange.publish(message[2].routingKey, message[0]);

Expand All @@ -119,6 +133,11 @@ function exportQueue(conn, queue, streamController){

function startExport(_stream){
stream = _stream;

process.on('SIGINT', streamController.close.curry(stream, function(){
process.exit(0);
}));

queue.subscribe({ack:true}, exportMsg);
}

Expand All @@ -145,7 +164,7 @@ function exportQueue(conn, queue, streamController){
// Build queue options
['passive','durable', 'autoDelete'].forEach(function(o){
if(argv[o]){
queueOptions[o]=argv[o];
queueOptions[o]= argv[o] == 'true' ? true : false;
}
});

Expand Down
5 changes: 3 additions & 2 deletions package.json
Expand Up @@ -2,7 +2,7 @@
"name" : "amqp-tool",
"description" : "Rabbitmq-tool - import/export data from a RabbitMQ broker",
"keywords": ["amqp","rabbitmq", "tool", "export", "import"],
"version": "0.0.3",
"version": "0.0.4",
"homepage": "https://github.com/FGRibreau/node-amqp-tool",
"repository": {
"type": "git",
Expand All @@ -12,7 +12,8 @@
"dependencies": {
"optimist":"0.3.0",
"amqp-dsl":"*",
"lazy":"1.0.7"
"lazy":"1.0.7",
"coffeescript":"*"
},

"bin":{
Expand Down

0 comments on commit 4e0eb8a

Please sign in to comment.