Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I'm encountering an "Unexpected close" error & rabbitMq getting disconnected #59

Closed
kundan8239 opened this issue Feb 15, 2024 · 9 comments

Comments

@kundan8239
Copy link
Contributor

kundan8239 commented Feb 15, 2024

I'm am currently utilizing the rabbit-queue library for establishing a connection with RabbitMQ. However, I've encountered an issue marked by the error message "Unexpected close." I have a worker basically which task is to find different technology & dns info for domain with help of wappalyzer here is my code & env detail shared error log as well, it's quite difficult to to find from where this issues is coming it's comming from rabbit-queue, amqplib , mycode or wapplaizer

Library Version: 5.5.0
Environment: ubuntu 18
Node Version: v18.2.0

wappalyzer.js

const Wappalyzer = require('wappalyzer')
let runInit = true;
let closeBrowser = true;
const options = {
  debug: true,
  delay: 500,
  headers: {},
  maxDepth: 3,
  maxWait: 20000,
  recursive: false,
  probe: 'basic',//fix for SPM-150
  proxy: false,
  noScripts: false,
  noRedirect: false,
  };
const wappalyzer = new Wappalyzer(options);
async function wappalyzerBase(url) {
  try {
    //await wappalyzer.init()
    if ( runInit ) {
     await wappalyzer.init();
     runInit = false;
    }
    const site = await wappalyzer.open(url)
    console.log(`Wappalyzer running for: ${url}`);
    const results = await site.analyze();
    console.log(`Got result from wappalyzer: ${url}`);
    return results
  } catch (error) {
    console.log("debug: wappalyzer error: "+ error);
    throw new Error(error);
  }
      
}
async function closeWappBrowser(){
  if ( closeBrowser && !runInit ) {
    try{
      await wappalyzer.destroy();
      runInit = true;
      closeBrowser = true;
    }
    catch(error){
      console.log("debug: wappalyzer error: "+ error);
      throw error;
    }
  }
}
module.exports = { wappalyzerBase, closeWappBrowser }

baseworker.js

const { BaseQueueHandler } = require('rabbit-queue');
const {
  rabbit
} = require('../services');

class BaseWorkerClone extends BaseQueueHandler {
  constructor(queueName) {
    super(queueName, rabbit, {
      prefetch: 3,
      retries: 0,
      retryDelay: 1000,
      logEnabled: true,
      scope: 'SINGLETON',
      createAndSubscribeToQueue: true,
    });
  }

  async createQueues() {
    this.queue = await this.rabbit
      .createQueue(this.queueName, { ...this.getQueueOptions(), prefetch: this.prefetch, priority: 20 }, (msg, ack) => {
        this.tryHandle(0, msg, ack).catch((e) => this.logger.error(e));
      })
      .catch((error) => this.logger.error(error));

    this.dlq = await this.rabbit
      .createQueue(this.dlqName, this.getDlqOptions())
      .catch((error) => this.logger.error(error));
  }

  async handleError(err, msg) {
    console.log("********* err, msg **********");
    console.log(err, msg);
  }
}

module.exports = { BaseWorkerClone };

content.js

const { wappalyzerBase, closeWappBrowser } = require('./wappalyzer')
class ContentWorker extends BaseWorkerClone {
    constructor(queueName) {
    super(queueName);
    this.wappalyzerRefCollection = {};
  }
   async  handle({ event, correlationId, startTime, signal }) {
   try {
    const { target, fileId = null, optimiseTime, consumerTeam } = event;
      this.wappalyzerRefCollection[`${correlationId}`] = true;
      [wappalyzerResult] = await Promise.allSettled([wappalyzerBase(target)]);
      try {
       del this.wappalyzerRefCollection[`${correlationId}`] 
        await closeWappBrowser();
      } catch(err) {
        console.log(err)
      }
     return Promise.resolve("done")
  } catch(err) {
   console.log(err);
   return Promise.reject("error")
  }
 }

key heightlight [wappalyzerResult] = await Promise.allSettled([wappalyzerBase(target)]); is giving timeout error

outputlog log:

service:rabbit [warn] rabbit-queue.content [756f4ef3-51e3-47e5-953c-3c5a1c6caaae] Adding to dlq: content_dlq after 0 retries +0ms
service:rabbit [debug] rabbit-queue [4c834124-a471-4235-a2fb-2b6ba7a88a38] -> Publishing to queue sauron_content_dlq 73 bytes +0ms
service:rabbit [debug] rabbit-queue [81073eea-9638-4fa0-bf25-a4c76fa8e5a5] -> Publishing to queue sauron_content_dlq 73 bytes +0ms
service:rabbit [debug] rabbit-queue [756f4ef3-51e3-47e5-953c-3c5a1c6caaae] -> Publishing to queue sauron_content_dlq 73 bytes +0ms
service:rabbit [error] rabbit-queue.content IllegalOperationError: Channel closed
  at ConfirmChannel.<anonymous> (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/channel.js:159:11)
  at ConfirmChannel.publish (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/channel_model.js:152:17)
  at ConfirmChannel.publish (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/channel_model.js:267:38)
  at ConfirmChannel.sendToQueue (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/channel_model.js:271:17)
  at /Users/kundank/one/workspace/sauron/node_modules/rabbit-queue/js/queue.js:163:25
  at new Promise (<anonymous>)
  at Function.<anonymous> (/Users/kundank/one/workspace/sauron/node_modules/rabbit-queue/js/queue.js:158:20)
  at Generator.next (<anonymous>)
  at fulfilled (/Users/kundank/one/workspace/sauron/node_modules/rabbit-queue/js/queue.js:5:58) {
stackAtStateChange: 'Stack capture: Socket error\n' +
  '    at C.onSocketError (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:362:13)\n' +
  '    at Socket.emit (node:events:539:35)\n' +
  '    at endReadableNT (node:internal/streams/readable:1344:12)\n' +
  '    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)'
@nikostoulas
Copy link
Collaborator

Hi,

The issue exists when the library is trying to publish to dlq. One possible reason could be that you overwrite handleError without calling super.handleError. Can you try calling

 super.handleError(err, msg);

and see if the error persists?

@kundan8239
Copy link
Contributor Author

Hi @nikostoulas

Implement as you suggested

 async handleError(err, msg) {
    super.handleError(err, msg);
  }

but still getting same issues

Error: Unexpected close
    at succeed (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:280:13)
    at onOpenOk (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:262:5)
    at /Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:165:32
    at /Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:159:12
    at Socket.recv (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:507:12)
    at Object.onceWrapper (node:events:641:28)
    at Socket.emit (node:events:527:28)
    at emitReadable_ (node:internal/streams/readable:590:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:81:21)
  service:rabbit Trying to reconnect to amqp://localhost +1ms
  service:rabbit RabbitMQ disconnected +0ms
Error: RabbitMQ disconnected
    at Rabbit.<anonymous> (/Users/kundank/one/workspace/sauron/src/services/rabbit.js:25:38)
    at Rabbit.emit (node:events:527:28)
    at Rabbit.emitDisconnected (/Users/kundank/one/workspace/sauron/node_modules/rabbit-queue/js/rabbit.js:62:14)
    at ConfirmChannel.<anonymous> (/Users/kundank/one/workspace/sauron/node_modules/rabbit-queue/js/rabbit.js:74:49)
    at ConfirmChannel.emit (node:events:539:35)
    at C.toClosed (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/channel.js:174:8)
    at C._closeChannels (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:402:18)
    at C.toClosed (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:409:8)
    at C.onSocketError (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:363:10)
    at Socket.emit (node:events:539:35)
  service:rabbit Trying to reconnect to amqp://localhost +0ms
  service:rabbit Unexpected close +1ms
Error: Unexpected close
    at succeed (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:280:13)
    at onOpenOk (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:262:5)
    at /Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:165:32
    at /Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:159:12
    at Socket.recv (/Users/kundank/one/workspace/sauron/node_modules/amqplib/lib/connection.js:507:12)
    at Object.onceWrapper (node:events:641:28)
    at Socket.emit (node:events:527:28)
    at emitReadable_ (node:internal/streams/readable:590:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:81:21)
  service:rabbit Trying to reconnect to amqp://localhost +0ms
  service:rabbit Unexpected close +0ms

@nikostoulas
Copy link
Collaborator

Hi @kundan8239 ,
That's weird.
You could overwrite addToDlq to see where the issue exactly is:

  async addToDLQ(retries, msg: amqp.Message, ack) {
    try {
      const correlationId = this.getCorrelationId(msg);
      const event = decode(msg);
      this.logger.warn(`[${correlationId}] Adding to dlq: ${this.dlqName} after ${retries} retries`);
      await this.rabbit.publish(this.dlqName, event, msg.properties);
      const response = await this.afterDlq({ msg, event });
      ack(msg.properties.headers.errors.message, response);
    } catch (err) {
      this.logger.error(err);
      await this.rabbit.publish(this.dlqName, msg.content.toString(), msg.properties);
      ack(err.message, null);
    }
  }

Try commenting out lines to see which one produces the error. Most probably something that is published with this.rabbit.publish or an ack is causing your issue.

@kundan8239
Copy link
Contributor Author

Hi @nikostoulas

Which ### decode function you are referring here it's coming from some library or we have support for decode function.
const event = decode(msg);

Even after attempting without the decode function, I couldn't capture any information relevant to the error. The error is being caught only after RabbitMQ gets disconnected. Anything related to heartbeat I have to takecare as suggest by amqlib here

@nikostoulas
Copy link
Collaborator

Hi @kundan8239,

Decode is this https://github.com/Workable/rabbit-queue/blob/master/ts/encode-decode.ts#L8.
Did you manage to find out which line causes the error however? Did you try commenting out publish or ack to find out?

@kundan8239
Copy link
Contributor Author

kundan8239 commented Feb 21, 2024

Hi @nikostoulas

tried with commenting public and ack but still facing same problem by looking log & library code also not find any thing relevant which help to fix this. Can you check once how we can pass heartbeat & heartbeat interval.
Plz have a look, How we handling this issues here amqp-node/amqplib#151
Screenshot 2024-02-21 at 11 49 05 AM

  async addToDLQ(retries, msg, ack) {
    try {
      const correlationId = this.getCorrelationId(msg);
      const event = decode(msg);
      console.log(`[${correlationId}] Adding to dlq: ${this.dlqName} after ${retries} retries`);
       await this.rabbit.publish(this.dlqName, event, msg.properties);
       const response = await this.afterDlq({ msg, event });
      console.error("**************** afterdlq response", response);
      ack(msg.properties.headers.errors.message, response);
    } catch (err) {
       console.error("********Error*********", err);
      await this.rabbit.publish(this.dlqName, msg.content.toString(), msg.properties);
      ack(err.message, null);
    }
  }
  async afterDlq({ msg, event }) {
    console.log("IN DLQ start ..........");
    logger.debug(`In DLQ for ${event ? event.target : ''} - CorrId:${msg && msg.properties ? msg.properties.correlationId : ''} -
    Error: ${msg && msg.properties && msg.properties.headers && msg.properties.headers.errors ? msg.properties.headers.errors.message : ''}`);
    let correlationId = msg.properties.correlationId;
    //await this.garbageUnusedRef(correlationId, event.target, event.fileId || false, event.consumerTeam);
    metrices.recordInDLQueueCount({
      operation: "dlqueue",
      source: "content"
    });
    console.log("IN DLQ end ..........");
  }

@nikostoulas
Copy link
Collaborator

Hi @kundan8239 ,

The error you posted doesn't seem the same as before.
Failing to respond to heartbeat happens because you probably do a heavy CPU bound task that blocks the event loop for a lot of time.
You should consider either increasing the heartbeat interval a lot so that you always respond to it (this can be done through the connection url to the rabbitMQ cluster or through a cluster configuration)
You could also consider breaking your cpu intensive task into smaller chunks so that event loop is not blocked.

@kundan2403
Copy link

kundan2403 commented Mar 12, 2024

Hi @nikostoulas ,

The issue remains the same - 'unexpected close.' Max execution time problems are custom issues that we manually trigger. You can view the complete code [here]

@nikostoulas
Copy link
Collaborator

Hi @kundan2403 ,

I don't believe this is the library's issue but your implementation's. Have you tried the suggestions regarding heartbeat from my previous message?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants