forked from cgrand/xforms
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.cljs
31 lines (29 loc) · 1.23 KB
/
stream.cljs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
(ns net.cgrand.xforms.nodejs.stream)
(def ^:private Transform (.-Transform (js/require "stream")))
(defn transformer
"Returns a stream.Transform object that performs the specified transduction.
options is a js object as per stream.Transform -- however :readableObjectMode and :writableObjectMode are set to true by default."
([xform] (transformer #js {} xform))
([options xform]
(let [xrf (xform (fn
([transform] (doto transform .end))
([transform x]
(when-not (.push transform x)
(throw (js/Error. "Transformer's internal buffer is full, try passing a larger :highWaterMark option.")))
transform)))]
(specify! (Transform. (.assign js/Object #js {:readableObjectMode true
:writableObjectMode true} options))
Object
(_transform [this x _ cb]
(try
(when (reduced? (xrf this x))
(.push this nil))
(cb)
(catch :default err
(cb err))))
(_flush [this cb]
(try
(xrf this)
(cb)
(catch :default err
(cb err))))))))