diff --git a/.codeclimate.yml b/.codeclimate.yml
index 0e443ca..c889eb8 100644
--- a/.codeclimate.yml
+++ b/.codeclimate.yml
@@ -1,10 +1,10 @@
engines:
eslint:
enabled: true
- channel: "eslint-8"
+ channel: 'eslint-8'
config:
- config: ".eslintrc.yaml"
+ config: '.eslintrc.yaml'
ratings:
- paths:
- - "**.js"
+ paths:
+ - '**.js'
diff --git a/.eslintrc.yaml b/.eslintrc.yaml
index fe947ea..b749c3e 100644
--- a/.eslintrc.yaml
+++ b/.eslintrc.yaml
@@ -2,24 +2,9 @@ env:
node: true
es6: true
mocha: true
- es2020: true
+ es2022: true
-plugins:
- - haraka
-
-extends:
- - eslint:recommended
- - plugin:haraka/recommended
+extends: ['@haraka']
rules:
- indent: [2, 2, {"SwitchCase": 1}]
-
-root: true
-
-globals:
- OK: true
- CONT: true
- DENY: true
- DENYSOFT: true
- DENYDISCONNECT: true
- DENYSOFTDISCONNECT: true
+ no-unused-vars: ['warn']
diff --git a/.github/dependabot.yml b/.github/dependabot.yml
index 0449e4a..d450132 100644
--- a/.github/dependabot.yml
+++ b/.github/dependabot.yml
@@ -2,9 +2,9 @@
version: 2
updates:
- - package-ecosystem: "npm"
- directory: "/"
+ - package-ecosystem: 'npm'
+ directory: '/'
schedule:
- interval: "weekly"
+ interval: 'weekly'
allow:
- dependency-type: production
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 452832a..3393b1d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -1,12 +1,11 @@
name: CI
-on: [ push ]
+on: [push]
env:
CI: true
jobs:
-
lint:
uses: haraka/.github/.github/workflows/lint.yml@master
@@ -14,28 +13,10 @@ jobs:
uses: haraka/.github/.github/workflows/coverage.yml@master
secrets: inherit
- test:
- needs: [ lint, get-lts ]
- runs-on: ${{ matrix.os }}
- strategy:
- matrix:
- os: [ ubuntu-latest, windows-latest ]
- node-version: ${{ fromJson(needs.get-lts.outputs.active) }}
- fail-fast: false
- steps:
- - uses: actions/checkout@v3
- - uses: actions/setup-node@v3
- name: Node ${{ matrix.node-version }} on ${{ matrix.os }}
- with:
- node-version: ${{ matrix.node-version }}
- - run: npm install
- - run: npm test
+ ubuntu:
+ needs: [lint]
+ uses: haraka/.github/.github/workflows/ubuntu.yml@master
- get-lts:
- runs-on: ubuntu-latest
- steps:
- - id: get
- uses: msimerson/node-lts-versions@v1
- outputs:
- active: ${{ steps.get.outputs.active }}
- lts: ${{ steps.get.outputs.lts }}
+ windows:
+ needs: [lint]
+ uses: haraka/.github/.github/workflows/windows.yml@master
diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml
index 383aca2..816e8c3 100644
--- a/.github/workflows/codeql.yml
+++ b/.github/workflows/codeql.yml
@@ -1,10 +1,10 @@
-name: "CodeQL"
+name: 'CodeQL'
on:
push:
- branches: [ master ]
+ branches: [master]
pull_request:
- branches: [ master ]
+ branches: [master]
schedule:
- cron: '18 7 * * 4'
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index 42a9bb9..d97a994 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -11,4 +11,4 @@ env:
jobs:
publish:
uses: haraka/.github/.github/workflows/publish.yml@master
- secrets: inherit
\ No newline at end of file
+ secrets: inherit
diff --git a/.npmignore b/.npmignore
deleted file mode 100644
index ccd3d82..0000000
--- a/.npmignore
+++ /dev/null
@@ -1,15 +0,0 @@
-.github
-.DS_Store
-.editorconfig
-.gitignore
-.gitmodules
-.lgtm.yml
-appveyor.yml
-.travis.yml
-.eslintrc.yaml
-.eslintrc.json
-.nyc_output
-coverage
-.codeclimate.yml
-codecov.yml
-.release
\ No newline at end of file
diff --git a/.prettierrc.yml b/.prettierrc.yml
new file mode 100644
index 0000000..8ded5e0
--- /dev/null
+++ b/.prettierrc.yml
@@ -0,0 +1,2 @@
+singleQuote: true
+semi: false
diff --git a/.release b/.release
index 954197d..afb1db8 160000
--- a/.release
+++ b/.release
@@ -1 +1 @@
-Subproject commit 954197dae07b32c4476ff87ec9ae7371311ec97d
+Subproject commit afb1db801607dda5e859f39b600f0dd0111e4651
diff --git a/Changes.md b/CHANGELOG.md
similarity index 62%
rename from Changes.md
rename to CHANGELOG.md
index ec72488..42fc052 100644
--- a/Changes.md
+++ b/CHANGELOG.md
@@ -1,30 +1,34 @@
-
### Unreleased
+### [1.2.2] - 2024-08-05
+
+- chore: populate [files] in package.json. Delete .npmignore.
+- chore: automated code formatting
+- ci: updated to shared configs
+- dep: eslint-plugin-haraka -> @haraka/eslint-config
+- doc: added CONTRIBUTORS.md
+- doc: consistent naming of "special" files like CHANGELOG.md.
### [1.2.1] - 2024-04-03
- es6: use optional chaining (?.), for safety
- es6: use default function params
-
### [1.2.0] - 2022-06-24
- merged in ChunkEmitter, only used here
- copied in indexOfLF, removed haraka-utils dependency
-
### [1.1.0] - 2022-06-23
- fix: boundary marker corruption issue haraka/Haraka#3068
-
## 1.0.0 - 2022-06-23
- Import from Haraka
- convert tests to mocha
-
[1.1.0]: https://github.com/haraka/message-stream/releases/tag/1.1.0
[1.2.0]: https://github.com/haraka/message-stream/releases/tag/1.2.0
[1.2.1]: https://github.com/haraka/message-stream/releases/tag/1.2.1
+[1.2.2]: https://github.com/haraka/message-stream/releases/tag/1.2.2
diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md
new file mode 100644
index 0000000..ed8887d
--- /dev/null
+++ b/CONTRIBUTORS.md
@@ -0,0 +1,8 @@
+# Contributors
+
+This handcrafted artisinal software is brought to you by:
+
+| 
msimerson (5) |
+| :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: |
+
+this file is maintained by [.release](https://github.com/msimerson/.release)
diff --git a/README.md b/README.md
index 48f83cf..5ec113f 100644
--- a/README.md
+++ b/README.md
@@ -15,8 +15,8 @@
new MessageStream(cfg, uuid, header_list)
```
-
+
[ci-img]: https://github.com/haraka/message-stream/actions/workflows/ci.yml/badge.svg
[ci-url]: https://github.com/haraka/message-stream/actions/workflows/ci.yml
[clim-img]: https://codeclimate.com/github/haraka/message-stream/badges/gpa.svg
diff --git a/index.js b/index.js
index 173b3af..6937576 100644
--- a/index.js
+++ b/index.js
@@ -1,210 +1,219 @@
-'use strict';
+'use strict'
-const EventEmitter = require('events').EventEmitter;
-const fs = require('fs');
-const Stream = require('stream').Stream;
+const EventEmitter = require('events').EventEmitter
+const fs = require('fs')
+const Stream = require('stream').Stream
const STATE = {
HEADERS: 1,
- BODY : 2,
-};
+ BODY: 2,
+}
class MessageStream extends Stream {
- constructor (cfg = {}, id, headers) {
- super();
- if (!id) throw new Error('id required');
- this.uuid = id;
- this.write_ce = null;
- this.read_ce = null;
- this.bytes_read = 0;
- this.state = STATE.HEADERS;
- this.idx = {};
- this.end_called = false;
- this.end_callback = null;
- this.buffered = 0;
- this.total_buffered = 0;
- this._queue = [];
- this.max_data_inflight = 0;
- this.buffer_max = (!isNaN(cfg?.main?.spool_after) ? Number(cfg.main.spool_after) : -1);
- this.spooling = false;
- this.fd = null;
- this.open_pending = false;
- this.spool_dir = cfg?.main?.spool_dir || '/tmp';
- this.filename = `${this.spool_dir}/${id}.eml`;
- this.write_pending = false;
-
- this.readable = true;
- this.paused = false;
- this.headers = headers || [];
- this.headers_done = false;
- this.headers_found_eoh = false;
- this.line_endings = "\r\n";
- this.dot_stuffing = false;
- this.ending_dot = false;
- this.buffer_size = (1024 * 64);
- this.start = 0;
- this.write_complete = false;
- this.ws = null;
- this.rs = null;
- this.in_pipe = false;
+ constructor(cfg = {}, id, headers) {
+ super()
+ if (!id) throw new Error('id required')
+ this.uuid = id
+ this.write_ce = null
+ this.read_ce = null
+ this.bytes_read = 0
+ this.state = STATE.HEADERS
+ this.idx = {}
+ this.end_called = false
+ this.end_callback = null
+ this.buffered = 0
+ this.total_buffered = 0
+ this._queue = []
+ this.max_data_inflight = 0
+ this.buffer_max = !isNaN(cfg?.main?.spool_after)
+ ? Number(cfg.main.spool_after)
+ : -1
+ this.spooling = false
+ this.fd = null
+ this.open_pending = false
+ this.spool_dir = cfg?.main?.spool_dir || '/tmp'
+ this.filename = `${this.spool_dir}/${id}.eml`
+ this.write_pending = false
+
+ this.readable = true
+ this.paused = false
+ this.headers = headers || []
+ this.headers_done = false
+ this.headers_found_eoh = false
+ this.line_endings = '\r\n'
+ this.dot_stuffing = false
+ this.ending_dot = false
+ this.buffer_size = 1024 * 64
+ this.start = 0
+ this.write_complete = false
+ this.ws = null
+ this.rs = null
+ this.in_pipe = false
}
- add_line (line) {
- const self = this;
+ add_line(line) {
+ const self = this
if (typeof line === 'string') {
- line = Buffer.from(line);
+ line = Buffer.from(line)
}
// create a ChunkEmitter
if (!this.write_ce) {
- this.write_ce = new ChunkEmitter();
- this.write_ce.on('data', chunk => {
- self._write(chunk);
- });
+ this.write_ce = new ChunkEmitter()
+ this.write_ce.on('data', (chunk) => {
+ self._write(chunk)
+ })
}
- this.bytes_read += line.length;
+ this.bytes_read += line.length
// Build up an index of 'interesting' data on the fly
if (this.state === STATE.HEADERS) {
// Look for end of headers line
if (line.length === 2 && line[0] === 0x0d && line[1] === 0x0a) {
- this.idx.headers = { start: 0, end: this.bytes_read - line.length };
- this.state = STATE.BODY;
- this.idx.body = { start: this.bytes_read };
+ this.idx.headers = { start: 0, end: this.bytes_read - line.length }
+ this.state = STATE.BODY
+ this.idx.body = { start: this.bytes_read }
}
}
if (this.state === STATE.BODY) {
// Look for MIME boundaries
if (line.length > 4 && line[0] === 0x2d && line[1] == 0x2d) {
- let boundary = line.slice(2).toString().replace(/\s*$/,'');
+ let boundary = line.slice(2).toString().replace(/\s*$/, '')
if (/--\s*$/.test(line)) {
// End of boundary?
- boundary = boundary.slice(0, -2);
+ boundary = boundary.slice(0, -2)
if (this.idx[boundary]) {
- this.idx[boundary].end = this.bytes_read;
+ this.idx[boundary].end = this.bytes_read
}
- }
- else {
+ } else {
// Start of boundary?
if (!this.idx[boundary]) {
- this.idx[boundary] = { start: this.bytes_read - line.length };
+ this.idx[boundary] = { start: this.bytes_read - line.length }
}
}
}
}
- this.write_ce.fill(line);
+ this.write_ce.fill(line)
}
- add_line_end (cb) {
+ add_line_end(cb) {
// Record body end position
if (this.idx.body) {
- this.idx.body.end = this.bytes_read;
+ this.idx.body.end = this.bytes_read
}
- this.end_called = true;
+ this.end_called = true
if (cb && typeof cb === 'function') {
- this.end_callback = cb;
+ this.end_callback = cb
}
// Call _write() only if no new data was emitted
// This might happen if the message size matches
// the size of the chunk buffer.
if (!this.write_ce.end()) {
- this._write();
+ this._write()
}
}
- _write (data) {
- const self = this;
+ _write(data) {
+ const self = this
if (data) {
- this.buffered += data.length;
- this.total_buffered += data.length;
- this._queue.push(data);
+ this.buffered += data.length
+ this.total_buffered += data.length
+ this._queue.push(data)
}
// Stats
if (this.buffered > this.max_data_inflight) {
- this.max_data_inflight = this.buffered;
+ this.max_data_inflight = this.buffered
}
// Abort if we have pending disk operations
- if (this.open_pending || this.write_pending) return false;
+ if (this.open_pending || this.write_pending) return false
// Do we need to spool to disk?
if (this.buffer_max !== -1 && this.total_buffered > this.buffer_max) {
- this.spooling = true;
+ this.spooling = true
}
// Have we completely finished writing all data?
- if (this.end_called && (!this.spooling || (this.spooling && !this._queue.length))) {
- if (this.end_callback) this.end_callback();
+ if (
+ this.end_called &&
+ (!this.spooling || (this.spooling && !this._queue.length))
+ ) {
+ if (this.end_callback) this.end_callback()
// Do we have any waiting readers?
if (this.listeners('data').length && !this.write_complete) {
- this.write_complete = true;
+ this.write_complete = true
setImmediate(() => {
- if (self.readable && !self.paused)
- self._read();
- });
- }
- else {
- this.write_complete = true;
+ if (self.readable && !self.paused) self._read()
+ })
+ } else {
+ this.write_complete = true
}
- return true;
- }
- if (this.buffer_max === -1 || (this.buffered < this.buffer_max && !this.spooling)) {
- return true;
- }
- else {
+ return true
+ }
+ if (
+ this.buffer_max === -1 ||
+ (this.buffered < this.buffer_max && !this.spooling)
+ ) {
+ return true
+ } else {
// We're spooling to disk
if (!this._queue.length) {
- return false;
+ return false
}
}
// Open file descriptor if needed
if (!this.fd && !this.open_pending) {
- this.open_pending = true;
- this.ws = fs.createWriteStream(this.filename, { flags: 'wx+', end: false })
- this.ws.on('open', fd => {
- self.fd = fd;
- self.open_pending = false;
+ this.open_pending = true
+ this.ws = fs.createWriteStream(this.filename, {
+ flags: 'wx+',
+ end: false,
+ })
+ this.ws.on('open', (fd) => {
+ self.fd = fd
+ self.open_pending = false
setImmediate(() => {
- self._write();
- });
- });
- this.ws.on('error', error => {
- self.emit('error', error);
- });
+ self._write()
+ })
+ })
+ this.ws.on('error', (error) => {
+ self.emit('error', error)
+ })
}
- if (!this.fd) return false;
- const to_send = this._queue.shift();
- this.buffered -= to_send.length;
+ if (!this.fd) return false
+ const to_send = this._queue.shift()
+ this.buffered -= to_send.length
// TODO: try and implement backpressure
if (!this.ws.write(to_send)) {
- this.write_pending = true;
+ this.write_pending = true
this.ws.once('drain', () => {
- self.write_pending = false;
+ self.write_pending = false
setImmediate(() => {
- self._write();
- });
- });
- return false;
- }
- else {
- if (this.end_called && (!this.spooling || (this.spooling && !this._queue.length))) {
- return self._write();
+ self._write()
+ })
+ })
+ return false
+ } else {
+ if (
+ this.end_called &&
+ (!this.spooling || (this.spooling && !this._queue.length))
+ ) {
+ return self._write()
}
- return true;
+ return true
}
}
// READABLE STREAM
- _read () {
- const self = this;
+ _read() {
+ const self = this
if (!this.end_called) {
- throw new Error('end not called!');
+ throw new Error('end not called!')
}
- if (!this.readable || this.paused || !this.write_complete) return;
+ if (!this.readable || this.paused || !this.write_complete) return
// Buffer and send headers first.
//
@@ -216,299 +225,297 @@ class MessageStream extends Stream {
// so we do all of them in one operation before we
// loop around again (and check for pause).
if (this.headers.length && !this.headers_done) {
- this.headers_done = true;
- for (let i=0; i {
- if (self.readable && !self.paused) self._read();
- });
- }
- else {
+ if (self.readable && !self.paused) self._read()
+ })
+ } else {
// Read the message body by line. If we have queued entries, then
// we didn't create a queue file, so read from memory.
if (this._queue.length > 0) {
// TODO: implement start/end offsets
- for (let i=0; i {};
- this.rs.on('error', error => {
- self.emit('error', error);
- });
- this.rs.on('data', chunk => {
- self.process_buf(chunk);
- });
+ this.rs.destroy = () => {}
+ this.rs.on('error', (error) => {
+ self.emit('error', error)
+ })
+ this.rs.on('data', (chunk) => {
+ self.process_buf(chunk)
+ })
this.rs.on('end', () => {
- self._read_finish();
- });
+ self._read_finish()
+ })
}
}
}
- process_buf (buf) {
- let offset = 0;
+ process_buf(buf) {
+ let offset = 0
while ((offset = indexOfLF(buf)) !== -1) {
- let line = buf.slice(0, offset+1);
- buf = buf.slice(line.length);
+ let line = buf.slice(0, offset + 1)
+ buf = buf.slice(line.length)
// Don't output headers if they where sent already
if (this.headers_done && !this.headers_found_eoh) {
// Allow \r\n or \n here...
if (
(line.length === 2 && line[0] === 0x0d && line[1] === 0x0a) ||
- (line.length === 1 && line[0] === 0x0a)
+ (line.length === 1 && line[0] === 0x0a)
) {
- this.headers_found_eoh = true;
+ this.headers_found_eoh = true
}
- continue;
+ continue
}
// Remove dot-stuffing if required
- if (!this.dot_stuffing && line.length >= 4 &&
- line[0] === 0x2e && line[1] === 0x2e
+ if (
+ !this.dot_stuffing &&
+ line.length >= 4 &&
+ line[0] === 0x2e &&
+ line[1] === 0x2e
) {
- line = line.slice(1);
+ line = line.slice(1)
}
// We store lines in native CRLF format; so strip CR if requested
- if (this.line_endings === '\n' && line.length >= 2 &&
- line[line.length-1] === 0x0a && line[line.length-2] === 0x0d
+ if (
+ this.line_endings === '\n' &&
+ line.length >= 2 &&
+ line[line.length - 1] === 0x0a &&
+ line[line.length - 2] === 0x0d
) {
// copy the line to a new buffer before modifying the copy
- line = Buffer.from(line);
- line[line.length-2] = 0x0a;
- line = line.slice(0, line.length-1);
+ line = Buffer.from(line)
+ line[line.length - 2] = 0x0a
+ line = line.slice(0, line.length - 1)
}
- this.read_ce.fill(line);
+ this.read_ce.fill(line)
}
// Check for data left in the buffer
if (buf.length > 0 && this.headers_found_eoh) {
- this.read_ce.fill(buf);
+ this.read_ce.fill(buf)
}
}
- _read_finish () {
- const self = this;
+ _read_finish() {
+ const self = this
// End dot required?
if (this.ending_dot) {
- this.read_ce.fill(`.${this.line_endings}`);
+ this.read_ce.fill(`.${this.line_endings}`)
}
// Tell the chunk emitter to send whatever is left
// We don't close the fd here so we can re-use it later.
this.read_ce.end(() => {
if (self.clamd_style) {
// Add 0 length to notify end
- const buf = Buffer.alloc(4);
- buf.writeUInt32BE(0, 0);
- self.emit('data', buf);
+ const buf = Buffer.alloc(4)
+ buf.writeUInt32BE(0, 0)
+ self.emit('data', buf)
}
- self.in_pipe = false;
- self.emit('end');
- });
+ self.in_pipe = false
+ self.emit('end')
+ })
}
- pipe (destination, options = {}) {
- const self = this;
+ pipe(destination, options = {}) {
+ const self = this
if (this.in_pipe) {
- throw new Error('Cannot pipe while currently piping');
+ throw new Error('Cannot pipe while currently piping')
}
- Stream.prototype.pipe.call(this, destination, options);
+ Stream.prototype.pipe.call(this, destination, options)
// Options
- this.line_endings = options?.line_endings ?? '\r\n';
- this.dot_stuffing = options?.dot_stuffing ?? false;
- this.ending_dot = options?.ending_dot ?? false;
- this.clamd_style = !!options?.clamd_style;
- this.buffer_size = options?.buffer_size ?? 1024 * 64;
- this.start = (parseInt(options?.start) ? parseInt(options.start) : 0);
+ this.line_endings = options?.line_endings ?? '\r\n'
+ this.dot_stuffing = options?.dot_stuffing ?? false
+ this.ending_dot = options?.ending_dot ?? false
+ this.clamd_style = !!options?.clamd_style
+ this.buffer_size = options?.buffer_size ?? 1024 * 64
+ this.start = parseInt(options?.start) ? parseInt(options.start) : 0
// Reset
- this.in_pipe = true;
- this.readable = true;
- this.paused = false;
- this.headers_done = (options && options.skip_headers);
- this.headers_found_eoh = false;
- this.rs = null;
- this.read_ce = new ChunkEmitter(this.buffer_size);
- this.read_ce.on('data', chunk => {
+ this.in_pipe = true
+ this.readable = true
+ this.paused = false
+ this.headers_done = options && options.skip_headers
+ this.headers_found_eoh = false
+ this.rs = null
+ this.read_ce = new ChunkEmitter(this.buffer_size)
+ this.read_ce.on('data', (chunk) => {
if (self.clamd_style) {
// Prefix data length to the beginning of line
- const buf = Buffer.alloc(chunk.length+4);
- buf.writeUInt32BE(chunk.length, 0);
- chunk.copy(buf, 4);
- self.emit('data', buf);
+ const buf = Buffer.alloc(chunk.length + 4)
+ buf.writeUInt32BE(chunk.length, 0)
+ chunk.copy(buf, 4)
+ self.emit('data', buf)
+ } else {
+ self.emit('data', chunk)
}
- else {
- self.emit('data', chunk);
- }
- });
+ })
// Stream won't be readable until we've finished writing and add_line_end() has been called.
// As we've registered for events above, the _write() function can now detect that we
// are waiting for the data and will call _read() automatically when it is finished.
- if (!this.write_complete) return destination;
+ if (!this.write_complete) return destination
// Create this.fd only if it doesn't already exist
// This is so we can re-use the already open descriptor
if (!this.fd && !(this._queue.length > 0)) {
fs.open(this.filename, 'r', null, (err, fd) => {
- if (err) throw err;
- self.fd = fd;
- self._read();
- });
- }
- else {
- self._read();
- }
- return destination;
+ if (err) throw err
+ self.fd = fd
+ self._read()
+ })
+ } else {
+ self._read()
+ }
+ return destination
}
- pause () {
- this.paused = true;
- if (this.rs) this.rs.pause();
+ pause() {
+ this.paused = true
+ if (this.rs) this.rs.pause()
}
- resume () {
- this.paused = false;
+ resume() {
+ this.paused = false
if (this.rs) {
- this.rs.resume();
- }
- else {
- this._read();
+ this.rs.resume()
+ } else {
+ this._read()
}
}
- destroy () {
- const self = this;
+ destroy() {
+ const self = this
try {
if (this.fd) {
- fs.close(this.fd, err => {
- fs.unlink(self.filename, () => {});
- });
+ fs.close(this.fd, (err) => {
+ fs.unlink(self.filename, () => {})
+ })
+ } else {
+ fs.unlink(this.filename, () => {})
}
- else {
- fs.unlink(this.filename, () => {});
- }
- }
- catch (err) {
+ } catch (err) {
// Ignore any errors
}
}
- get_data (options, cb) { // Or: (cb)
+ get_data(options, cb) {
+ // Or: (cb)
if (arguments.length === 1) {
- cb = arguments[0];
- options = {};
+ cb = arguments[0]
+ options = {}
}
- const ws = new GetDataStream(cb);
- this.pipe(ws, options);
+ const ws = new GetDataStream(cb)
+ this.pipe(ws, options)
}
}
-function indexOfLF (buf) {
- for (let i=0; i this.buffer_size) {
- this.buf = Buffer.alloc(this.buffer_size);
- const in_new = Buffer.concat(this.bufs, this.bufs_size);
- input = in_new;
+ this.bufs.push(input)
+ this.bufs_size += input.length
+ if (input.length + this.bufs_size > this.buffer_size) {
+ this.buf = Buffer.alloc(this.buffer_size)
+ const in_new = Buffer.concat(this.bufs, this.bufs_size)
+ input = in_new
// Reset
- this.bufs = [];
- this.bufs_size = 0;
- }
- else {
- return;
+ this.bufs = []
+ this.bufs_size = 0
+ } else {
+ return
}
}
while (input.length > 0) {
- let remaining = this.buffer_size - this.pos;
+ let remaining = this.buffer_size - this.pos
if (remaining === 0) {
- this.emit('data', this.buf); //.slice(0));
- this.buf = Buffer.alloc(this.buffer_size);
- this.pos = 0;
- remaining = this.buffer_size;
+ this.emit('data', this.buf) //.slice(0));
+ this.buf = Buffer.alloc(this.buffer_size)
+ this.pos = 0
+ remaining = this.buffer_size
}
- const to_write = ((remaining > input.length) ? input.length : remaining);
- input.copy(this.buf, this.pos, 0, to_write);
- this.pos += to_write;
- input = input.slice(to_write);
+ const to_write = remaining > input.length ? input.length : remaining
+ input.copy(this.buf, this.pos, 0, to_write)
+ this.pos += to_write
+ input = input.slice(to_write)
}
}
- end (cb) {
- let emitted = false;
+ end(cb) {
+ let emitted = false
if (this.bufs_size > 0) {
- this.emit('data', Buffer.concat(this.bufs, this.bufs_size));
- emitted = true;
- }
- else if (this.pos > 0) {
- this.emit('data', this.buf.slice(0, this.pos));
- emitted = true;
+ this.emit('data', Buffer.concat(this.bufs, this.bufs_size))
+ emitted = true
+ } else if (this.pos > 0) {
+ this.emit('data', this.buf.slice(0, this.pos))
+ emitted = true
}
// Reset
- this.buf = null;
- this.pos = 0;
- this.bufs = [];
- this.bufs_size = 0;
- if (cb && typeof cb === 'function') cb();
- return emitted;
+ this.buf = null
+ this.pos = 0
+ this.bufs = []
+ this.bufs_size = 0
+ if (cb && typeof cb === 'function') cb()
+ return emitted
}
}
diff --git a/package.json b/package.json
index 319821d..1cd9ac8 100644
--- a/package.json
+++ b/package.json
@@ -1,13 +1,20 @@
{
"name": "haraka-message-stream",
- "version": "1.2.1",
+ "version": "1.2.2",
"description": "Haraka email message stream",
"main": "index.js",
+ "files": [
+ "CHANGELOG.md"
+ ],
"scripts": {
+ "format": "npm run prettier:fix && npm run lint:fix",
"lint": "npx eslint@^8 *.js test",
"lintfix": "npx eslint@^8 --fix *.js test",
+ "prettier": "npx prettier . --check",
+ "prettier:fix": "npx prettier . --write --log-level=warn",
+ "test": "npx mocha@^10",
"versions": "npx dependency-version-checker check",
- "test": "npx mocha@^10"
+ "versions:fix": "npx dependency-version-checker update && npm run prettier:fix"
},
"repository": {
"type": "git",
@@ -25,7 +32,7 @@
},
"homepage": "https://github.com/haraka/message-stream#readme",
"devDependencies": {
- "eslint-plugin-haraka": "^1.0.15",
+ "@haraka/eslint-config": "^1.1.3",
"haraka-test-fixtures": "^1.3.3"
},
"dependencies": {}
diff --git a/test/chunk-emitter.js b/test/chunk-emitter.js
index f709e50..705e728 100644
--- a/test/chunk-emitter.js
+++ b/test/chunk-emitter.js
@@ -1,12 +1,10 @@
-
-const assert = require('assert')
-const fs = require('fs')
-const path = require('path')
+const assert = require('assert')
+const fs = require('fs')
+const path = require('path')
const ChunkEmitter = require('../index').ChunkEmitter
describe('chunk-emitter', function () {
-
beforeEach(function () {
this.ce = new ChunkEmitter()
this._written = 0
@@ -18,7 +16,7 @@ describe('chunk-emitter', function () {
it('emits all unbuffered bytes', function (done) {
const msgPath = path.join(__dirname, 'fixtures', 'haraka-icon-attach.eml')
- const eml = fs.readFileSync(msgPath, 'utf8');
+ const eml = fs.readFileSync(msgPath, 'utf8')
this._write = (data) => {
this._written = (this._written || 0) + data.length
@@ -28,8 +26,8 @@ describe('chunk-emitter', function () {
}
}
- this.ce.on('data', chunk => {
- this._write(chunk);
+ this.ce.on('data', (chunk) => {
+ this._write(chunk)
})
this.ce.fill(eml)
@@ -37,9 +35,13 @@ describe('chunk-emitter', function () {
})
it('emits all bigger than buffer bytes', function (done) {
- const msgPath = path.join(__dirname, 'fixtures', 'haraka-tarball-attach.eml')
+ const msgPath = path.join(
+ __dirname,
+ 'fixtures',
+ 'haraka-tarball-attach.eml',
+ )
// console.log(`msgPath: ${msgPath}`)
- const eml = fs.readFileSync(msgPath, 'utf8');
+ const eml = fs.readFileSync(msgPath, 'utf8')
// console.log(`length: ${eml.length}`)
this._write = (data) => {
@@ -53,9 +55,9 @@ describe('chunk-emitter', function () {
}
}
- this.ce.on('data', chunk => {
+ this.ce.on('data', (chunk) => {
// console.log(`ce.on.data: ${chunk.length} bytes`)
- this._write(chunk);
+ this._write(chunk)
})
this.ce.fill(eml)
diff --git a/test/message-stream.js b/test/message-stream.js
index 1bef4d1..de11d14 100644
--- a/test/message-stream.js
+++ b/test/message-stream.js
@@ -1,28 +1,26 @@
-
const assert = require('assert')
const stream = require('stream')
const MessageStream = require('../index')
-function _set_up () {
- this.ms = new MessageStream({ main: { } }, 'msg', []);
+function _set_up() {
+ this.ms = new MessageStream({ main: {} }, 'msg', [])
}
describe('message-stream', function () {
-
beforeEach(_set_up)
it('is a Stream', function (done) {
- assert.ok(this.ms instanceof MessageStream);
- assert.ok(this.ms instanceof stream.Stream);
+ assert.ok(this.ms instanceof MessageStream)
+ assert.ok(this.ms instanceof stream.Stream)
done()
})
it('gets message data', function (done) {
- this.ms.add_line('Header: test\r\n');
- this.ms.add_line('\r\n');
- this.ms.add_line('I am body text\r\n');
- this.ms.add_line_end();
+ this.ms.add_line('Header: test\r\n')
+ this.ms.add_line('\r\n')
+ this.ms.add_line('I am body text\r\n')
+ this.ms.add_line_end()
this.ms.get_data((data) => {
assert.ok(/^[A-Za-z]+: /.test(data.toString()))
done()