Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to 2 issues #3

Open
gavinklfong opened this issue Apr 6, 2019 · 0 comments
Open

Fixes to 2 issues #3

gavinklfong opened this issue Apr 6, 2019 · 0 comments

Comments

@gavinklfong
Copy link

2 issues:

  1. Duplicated message sent to SQS as buffer is not cleaned up
  2. Message is not sent to SQS when buffer size equals to batch size

Please find amended version of _write() and _final() below

async _write(obj, enc, cb) {
this.emit('msgReceived', obj);
try {
if (!this.queueUrl) {
await this.getQueueUrl();
}

  // construct message
  const sqsMsg = _.clone(obj);
  if (obj.Id && obj.MessageBody) {
    this.buffer.push(sqsMsg);
  } else {
    const msg = { MessageBody: JSON.stringify(obj), Id: uuidv4() };
    if (this.options.MessageGroupId) {
      msg.MessageGroupId = this.options.MessageGroupId;
    }
    this.buffer.push(msg);
  }

  // send message by batch
  if (this.buffer.length >= this.options.batchSize) {
    console.log("_write() - sending sqs message: " + JSON.stringify(this.buffer));
    await this.sqs.sendMessageBatch({ Entries: this.buffer, QueueUrl: this.queueUrl }).promise();
    this.buffer = [];
  } 

  this.emit('msgProcessed', obj);
  return cb();
} catch (err) {
  this.emit('msgProcessingErr', obj, err);
  return cb(err);
}

}

async _final(cb) {
try {
if (this.buffer.length > 0) {
console.log("_final() - sending sqs message: " + JSON.stringify(this.buffer));
await this.sqs.sendMessageBatch({ Entries: this.buffer, QueueUrl: this.queueUrl }).promise();
this.buffer = [];
}

  return cb();
} catch (err) {
  this.emit('streamFinishingErr', err);
  return cb(err);
}

}

lawrence-pressinnov added a commit to lawrence-pressinnov/sqs-write-stream that referenced this issue Apr 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant