Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

add filter and rewrite flatmap

  • Loading branch information...
北川
北川 committed Sep 16, 2018
1 parent 5999958 commit 908fee5c65d4eb02809a84a1ebc3e5df1f935cd1
Showing with 45 additions and 4 deletions.
  1. +26 −2 index.js
  2. +17 −0 test/filter.asynct.js
  3. +2 −2 test/flatmap.asynct.js
@@ -8,7 +8,6 @@ var Stream = require('stream').Stream
, es = exports
, through = require('through')
, from = require('from')
, flatmap = require('flatmap-stream')
, duplex = require('duplexer')
, map = require('map-stream')
, pause = require('pause-stream')
@@ -19,7 +18,6 @@ var Stream = require('stream').Stream
es.Stream = Stream //re-export Stream from core
es.through = through
es.from = from
es.flatmap = flatmap
es.duplex = duplex
es.map = map
es.pause = pause
@@ -213,6 +211,32 @@ es.mapSync = function (sync) {
})
}

//
// filterSync
//

es.filterSync = function (test) {
return es.through(function(data){
var s = this
if (test(data)) {
s.queue(data)
}
});
}

//
// flatmapSync
//

es.flatmapSync = function (mapper) {
return es.through(function(data) {
var s = this
data.forEach(function(e) {
s.queue(mapper(e))
})
})
}

//
// log just print out what is coming through the stream, for debugging
//
@@ -0,0 +1,17 @@
'use strict';

var es = require('../')
, it = require('it-is')

exports ['filter'] = function (test) {
es.readArray([1, 2, 3, 4])
.pipe(es.filterSync(function(e) {
return e > 2
}))
.pipe(es.writeArray(function(error, array) {
test.deepEqual([3, 4], array)
test.end()
}))
}

require('./helper')(module)
@@ -5,8 +5,8 @@ var es = require('../')

exports ['flatmap'] = function (test) {
es.readArray([[1], [1, 2], [1, 2, 3]])
.pipe(es.flatmap(function(e, cb) {
cb(null, e + 1)
.pipe(es.flatmapSync(function(e) {
return e + 1
}))
.pipe(es.writeArray(function(error, array) {
test.deepEqual([2, 2, 3, 2, 3, 4], array)

0 comments on commit 908fee5

Please sign in to comment.
You can’t perform that action at this time.