Skip to content

Commit

Permalink
optimize test
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidCai1111 committed Apr 22, 2016
1 parent 7e9804f commit 6fc5b43
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 46 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ node_modules
lib
jsconfig.json
typings
example.js
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
language: node_js
node_js:
- '5'
- '4'

script: npm test
after_script: "npm install coveralls@2 && cat ./coverage/lcov.info | coveralls"
61 changes: 60 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,60 @@
# billow
# billow
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg)](http://standardjs.com/)
[![Build Status](https://travis-ci.org/DavidCai1993/billow.svg?branch=master)](https://travis-ci.org/DavidCai1993/billow)
[![Coverage Status](https://coveralls.io/repos/github/DavidCai1993/billow/badge.svg?branch=master)](https://coveralls.io/github/DavidCai1993/billow?branch=master)

A stream pipeline based message processing framework.

## Install

```
npm install billow
```

## Usage

```js
'use strict'
const { Billow, Flow, Droplet } = require('billow')

let billow = new Billow({ separator: '\r\n' })
let flowOne = new Flow({ events: ['error', 'dropletError'] })
let flowTwo = new Flow({ events: ['error', 'dropletError'] })

flowOne.on('error', console.error).on('dropletError', console.error)
flowTwo.on('error', console.error).on('dropletError', console.error)

flowOne.addDroplets([
new Droplet({
handler: async function (chunk, encoding) {
return await Promise.resolve(`${chunk.toString()}==`)
}
}),
new Droplet({
handler: function (chunk, encoding) {
console.log(chunk.toString())
}
})
])

flowTwo.addDroplets([
new Droplet({
handler: async function (chunk, encoding) {
return await Promise.resolve(`${chunk.toString()}~~`)
}
}),
new Droplet({
handler: function (chunk, encoding) {
console.log(chunk.toString())
}
})
])

billow.addFlow(flowOne).addFlow(flowTwo).write('billow!\r\nbillow!\r\n')
// billow!==
// billow!~~
// billow!==
// billow!~~
```

## How it works
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "billow",
"version": "1.0.0",
"description": "",
"main": "lib/ndex.js",
"description": "A stream pipeline based message processing framework",
"main": "lib/index.js",
"scripts": {
"test": "./node_modules/.bin/babel-node ./node_modules/.bin/istanbul cover ./node_modules/.bin/_mocha",
"compile": "rm -rf lib && ./node_modules/.bin/babel src --out-dir lib",
Expand Down
44 changes: 44 additions & 0 deletions src/billow.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict'
const { Writable } = require('stream')
const Flow = require('./flow')
const BlackHole = require('./black-hole')
const { last } = require('./utils')

class Billow extends Writable {
constructor ({ separator = '\r\n', highWaterMark = 16384 } = {}) {
super({ highWaterMark })
this.separator = separator
this.blackHole = new BlackHole()
this.flows = []
}

_write (chunk, encoding, next) {
if (!this.separator) for (let flow of this.flows) { flow.droplets[0].write(chunk) }
if (this._buffer) chunk = Buffer.concat([this._buffer, chunk])

let start = 0
let index = chunk.indexOf(this.separator)

while (~index) {
let buffer = chunk.slice(start, index)
for (let flow of this.flows) { flow.droplets[0].write(buffer) }

start = index + Buffer.byteLength(this.separator)
index = chunk.indexOf(this.separator, start)
}

this._buffer = chunk.slice(start)
next()
}

addFlow (flow) {
if (!(flow instanceof Flow)) throw new TypeError(`${flow} should be an instance of Flow`)

last(flow.droplets).pipe(this.blackHole)
this.flows.push(flow)

return this
}
}

module.exports = Billow
8 changes: 8 additions & 0 deletions src/black-hole.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'
const { Writable } = require('stream')

class BlackHole extends Writable {
_write (chunk, encoding, next) { next() }
}

module.exports = BlackHole
4 changes: 2 additions & 2 deletions src/droplet.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const { Transform } = require('stream')
const { ensureBuffer } = require('./utils')

class Droplet extends Transform {
constructor ({ handler, highWaterMark }) {
constructor ({ handler, highWaterMark = 16384 } = {}) {
if (!handler || typeof handler !== 'function') throw new TypeError(`${handler} should be a function`)

super({ highWaterMark })
Expand All @@ -12,7 +12,7 @@ class Droplet extends Transform {

async _transform (chunk, encoding, next) {
try {
next(null, ensureBuffer(await this.handler(chunk, encoding)))
next(null, ensureBuffer(await this.handler(chunk, encoding) || chunk))
} catch (error) {
this.emit('dropletError', error)
next(null, chunk)
Expand Down
9 changes: 5 additions & 4 deletions src/flow.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'
const EventEmitter = require('events')
const concatStream = require('concat-stream')
const concatStream = require('concat-streams')
const Droplet = require('./droplet')
const { last, proxyEvents } = require('./utils')

Expand All @@ -14,15 +14,16 @@ class Flow extends EventEmitter {
addDroplets (droplets) {
if (!Array.isArray(droplets)) droplets = [droplets]
for (let droplet of droplets) {
if (!(droplet instanceof Droplet)) throw new TypeError(`${droplet} should be an isntance of Droplet`)
if (!(droplet instanceof Droplet)) throw new TypeError(`${droplet} should be an instance of Droplet`)
}

let source
if (this.droplets.length === 0) source = concatStream(droplets, 'error')
else source = concatStream([last(this.droplets)].concat(droplets), 'error')
if (this.droplets.length === 0) source = concatStream(droplets, this.events)
else source = concatStream([last(this.droplets)].concat(droplets), this.events)

proxyEvents(source, this, this.events)
this.droplets = this.droplets.concat(droplets)

return this
}
}
Expand Down
37 changes: 3 additions & 34 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,6 @@
'use strict'
const { Writable } = require('stream')
const Billow = require('./billow')
const Flow = require('./flow')
const Droplet = require('./droplet')

class Billow extends Writable {
constructor ({ separator = '\r\n', highWaterMark = 16384 } = {}) {
super({ highWaterMark })
this.separator = separator
this.flows = []
}

_write (chunk, encoding, next) {
if (!this.separator) for (let flow of this.flows) { flow[0].write(chunk) }
if (this._buffer) chunk = Buffer.concat([this._buffer, chunk])

let start = 0
let index = chunk.indexOf(this.separator)

while (~index) {
let buffer = chunk.slice(start, index)
for (let flow of this.flows) { flow[0].write(buffer) }

start = index + Buffer.byteLength(this.separator)
index = chunk.indexOf(this.separator, start)
}

this._buffer = chunk.slice(start)
next()
}

addFlow (flow) {
if (!(flow instanceof Flow)) throw new TypeError(`${flow} should be an instance of Flow`)
this.flows.push(flow)
}
}

module.exports = Billow
module.exports = { Billow, Flow, Droplet }
39 changes: 39 additions & 0 deletions test/droplet.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* global describe, it */
require('should')
const Flow = require('../src/flow')
const Droplet = require('../src/droplet')

describe('Droplet test', function () {
it('Should throw when pass wrong handler', function () {
(function () {
let _ = new Droplet()
_
}).should.throw(/should be a function/)
})

it('Should handle async function correctly', function (done) {
let droplet1 = new Droplet({
handler: async function (chunk) {
return await Promise.resolve(`${chunk.toString()}__`)
}
})

let droplet2 = new Droplet({
handler: async function (chunk) {
await Promise.resolve(null)
return `${chunk.toString()}__`
}
})

let droplet3 = new Droplet({
handler: async function (chunk) {
chunk.toString().should.eql('test____')
return done()
}
})

let flow = new Flow()
flow.addDroplets([droplet1, droplet2, droplet3])
flow.droplets[0].write('test')
})
})
25 changes: 24 additions & 1 deletion test/flow.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
/* global describe, it */
require('should')
const Flow = require('../src/flow')
const Droplet = require('../src/droplet')

describe('Flow test', function () {
it('Should throw when add wrong droplets', function () {
let flow = new Flow();
(function () { flow.addDroplets('wrongDroplets') }).should.throw(/should be an instance of Droplets/)
(function () { flow.addDroplets('wrongDroplets') }).should.throw(/should be an instance of Droplet/)
})

it('Should pipe data through streams', function () {
let flow = new Flow()
let count = 0
flow.addDroplets(new Droplet({
handler: async function (chunk) {
chunk.toString().should.eql('test')
count.should.eql(0)
count++
}
}))

flow.addDroplets(new Droplet({
handler: async function (chunk) {
chunk.toString().should.eql('test')
count.should.eql(1)
count++
}
}))

flow.droplets[0].write('test')
})
})
4 changes: 2 additions & 2 deletions test/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* global describe, it */
require('should')
const { Writable } = require('stream')
const Billow = require('../src')
const Billow = require('../src/billow')

describe('Billow test', function () {
it('Should init with right members', function () {
Expand All @@ -24,7 +24,7 @@ describe('Billow test', function () {
}
})

billow.flows = [[writeable]]
billow.flows = [{ droplets: [writeable] }]
billow.write(`${testString}${testString}`)
})

Expand Down

0 comments on commit 6fc5b43

Please sign in to comment.