Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading from RabbitMQ queue #28

Closed
ardian-c opened this issue Oct 29, 2020 · 4 comments
Closed

Reading from RabbitMQ queue #28

ardian-c opened this issue Oct 29, 2020 · 4 comments

Comments

@ardian-c
Copy link

Hi,

Thank you for your work. I'm planning to use this package to get the queue messages from a RabbitMQ instance and run another Node program with message as an argument. From the documentation, I'm struggling to understand the following:

  • How to connect to a specific queue and read the payloads from there?
  • Generate a task to run a worker with that message as an argument.

Thank you.

@actumn
Copy link
Owner

actumn commented Oct 30, 2020

Hi.
I'm so glad that you're planning to use it :)

If you want to use with RabbitMQ, try below.

  • Worker-side
const celery = require("celery-node");

const worker = celery.createWorker("amqp://", "amqp://", "queue_name");
worker.register("tasks.add", (a, b) => a + b);
worker.start();
  • Client-side
const celery = require("celery-node");

const client = celery.createClient("amqp://", "amqp://", "queue_name");

const task = client.createTask("tasks.add");
const result = task.applyAsync([1, 2]);
result.get().then(data => {
  console.log(data);
  client.disconnect();
});

I hope this could be helpful.

@ardian-c
Copy link
Author

Thanks, I can now connect to a particular queue however I don't understand how to read its payload and send that as a parameter to the task.

In your example you create a task tasks.add, and then send params [1,2], but I want to get the payload from a queue, which in my case is a file url in S3, and then pass that url to the worker, ideally in this way node worker.js --url some_file.csv.

I see the implementation in the official documentation which uses channel.consume to read the messages from queue, is there an equivalent with this library?

@actumn
Copy link
Owner

actumn commented Nov 7, 2020

Sorry for late.

I attached sample codes that I suppose you need.

  • worker
const celery = require("../../dist");

const client = celery.createClient("amqp://", "amqp://");

const url = `s3://s3url_here`
const task = client.createTask('upload-task'); 
const result = task.applyAsync([url]);
result.get().then(data => {
  console.log(data);
  client.disconnect();
});
  • client
const celery = require("../../dist");

const worker = celery.createWorker("amqp://", "amqp://");
worker.register('upload-task', (url) => {
  console.log('s3upload URL here: ', url);
  return url;
});
worker.start();

Just try executing node client.js after executing node worker.js.
Then the console output would be below.

  • node worker.js
celery.node worker start...
registed task: upload-task
celery.node Received task: upload-task[a3ac8ec8-7924-4db1-a61a-e2a906efe797], args: s3://s3url_here, kwargs: {}
s3upload url:  s3://s3url_here
celery.node Task upload-task[a3ac8ec8-7924-4db1-a61a-e2a906efe797] succeeded in 0.000260853s: s3://s3url_here
  • node client.js
s3://s3url_here

If you have any questions, I answer it as soon as possible.
Thank you

@ardian-c
Copy link
Author

Thank you, that's similar to what I did.

@actumn actumn closed this as completed Nov 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants