Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Adding onEach, support for EventEmitters #111

Closed
wants to merge 1 commit into from

2 participants

evanderkoogh Caolan McMahon
evanderkoogh

I have added a method onEach that takes a couple of parameters to ease working with events.
I have to do a lot of parsing of cvs and xml files and most streaming libraries use events for this.

Internally is registers some events on the emitters and then uses a queue internally to keep track of work.
I have added some tests and some documentation.

Normally I do my programming in Coffeescript in Node, so I might not be aware of some Javascript conventions. This is also my first github pull request, so the same goes for that.

As for my code, there are 2 things I do not like.

  • I think like the number of arguments, but I can't think of a good way to make it less
  • Internally I use JSON.stringify and JSON.parse, because whenqueue.push sees an Array it assumes there are multiple tasks. Also here I can't find a good way to work around this.
Caolan McMahon
Owner

I don't think we should get into handling data from event emitters

Caolan McMahon caolan closed this March 28, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 1 unique commit by 1 author.

Mar 08, 2012
evanderkoogh Added onEach method, test and documentation e2313b2
This page is out of date. Refresh to see the latest.
43  README.md
Source Rendered
@@ -88,6 +88,7 @@ So far its been tested in IE6, IE7, IE8, FF3.6 and Chrome 5. Usage:
88 88
 * [until](#until)
89 89
 * [waterfall](#waterfall)
90 90
 * [queue](#queue)
  91
+* [onEach](#onEach)
91 92
 * [auto](#auto)
92 93
 * [iterator](#iterator)
93 94
 * [apply](#apply)
@@ -743,6 +744,48 @@ __Example__
743 744
 
744 745
 ---------------------------------------
745 746
 
  747
+<a name="onEach" />
  748
+### onEach(eventemitter, concurrency, process_event_name, process_func, end_event_name, all_done
  749
+
  750
+Executes process_func on every process_event_name event that the eventemitter emits.
  751
+Process_func is called with every argument it would usually be called with an extra callback
  752
+argument.
  753
+After the end event is emitted and all outstanding process_funcs are done, the final all_done
  754
+callback will be called.
  755
+
  756
+__Arguments__
  757
+
  758
+* eventemitter - An EventEmitter that has an 'on' method to register listeners taking an event name
  759
+  and a callback.
  760
+* concurrency - An integer for determining how many worker functions should be
  761
+  run in parallel.
  762
+* process_event_name - The name of the event that we should register the working function for
  763
+* process_func(arguments..., callback) - The function that does the work. It accepts the
  764
+  same arguments as the original eventlistener, with an added callback for completion.
  765
+* end_event_name - The name of the event that signals the last processing event has been emitted.
  766
+* all_done() - Callback when the end event has been emitted and all outstanding processes have
  767
+  completed
  768
+
  769
+__Example__
  770
+
  771
+    var http = require('http');
  772
+
  773
+    http.createServer(function (req, res) {
  774
+
  775
+      var saveChunk = function(chunk, callback) {
  776
+        saveToDB(chunk, callback);
  777
+      }
  778
+
  779
+      async.onEach(req, 2, 'data', saveChunk, 'end', function() {
  780
+        console.log 'All done';
  781
+      });
  782
+
  783
+    }).listen(8080);
  784
+
  785
+    console.log("Server started on http://localhost:8080/");
  786
+
  787
+---------------------------------------
  788
+
746 789
 <a name="auto" />
747 790
 ### auto(tasks, [callback])
748 791
 
23  lib/async.js
@@ -625,6 +625,29 @@
625 625
         return q;
626 626
     };
627 627
 
  628
+    async.onEach = function(eventemitter, concurrency, process_event_name, process_func, end_event_name, all_done) {
  629
+        callProcessFunc = function(json, callback) {
  630
+            var args = JSON.parse(json);
  631
+            process_func.apply(null, Array.prototype.slice.call(args).concat([callback]));
  632
+        };
  633
+
  634
+        var queue = async.queue(callProcessFunc, concurrency);
  635
+        var events_done = false;
  636
+
  637
+        queue.drain = function() {
  638
+            if (events_done) return all_done();
  639
+        };
  640
+
  641
+        eventemitter.on(process_event_name, function() {
  642
+            var args = 1 <= arguments.length ? Array.prototype.slice.call(arguments, 0) : [];
  643
+            queue.push(JSON.stringify(args));
  644
+        });
  645
+
  646
+        eventemitter.on(end_event_name, function() {
  647
+            events_done = true;
  648
+        });
  649
+    };
  650
+
628 651
     var _console_fn = function (name) {
629 652
         return function (fn) {
630 653
             var args = Array.prototype.slice.call(arguments, 1);
55  test/test-async.js
@@ -1602,3 +1602,58 @@ exports['queue events'] = function(test) {
1602 1602
     q.push('poo', function () {calls.push('poo cb');});
1603 1603
     q.push('moo', function () {calls.push('moo cb');});
1604 1604
 };
  1605
+
  1606
+exports['onEach'] = function(test) {
  1607
+    var nextEvent = 0;
  1608
+    var texts = ['Zero', 'One', 'Two', 'Three'];
  1609
+
  1610
+    var eventemitter = {
  1611
+        processListener: null,
  1612
+        endListener: null,
  1613
+        on: function (event, fn) {
  1614
+            if (event === 'process') {
  1615
+                processListener = fn;
  1616
+            }
  1617
+            else if (event === 'end') {
  1618
+                endListener = fn;
  1619
+            }
  1620
+            else {
  1621
+                test.ok(false, 'Registering wrong listener');
  1622
+            }
  1623
+        },
  1624
+        start: function () {
  1625
+            setTimeout(function() {
  1626
+                processListener(0, 'Zero');
  1627
+            }, 40);
  1628
+            setTimeout(function() {
  1629
+                processListener(1, 'One');
  1630
+            }, 80);
  1631
+            setTimeout(function() {
  1632
+                processListener(2, 'Two');
  1633
+            }, 120);
  1634
+            setTimeout(function() {
  1635
+                processListener(3, 'Three');
  1636
+            }, 160);
  1637
+            setTimeout(function() {
  1638
+                endListener();
  1639
+            }, 200)
  1640
+        }
  1641
+    };
  1642
+
  1643
+    var processFunc = function(number, text, callback) {
  1644
+        test.equal(number, nextEvent);
  1645
+        test.equal(text, texts[number]);
  1646
+        nextEvent++;
  1647
+        setTimeout(function() {
  1648
+            callback();
  1649
+        }, number*20);
  1650
+    };
  1651
+
  1652
+    var done = function() {
  1653
+        test.ok(nextEvent, 4);
  1654
+        test.done();
  1655
+    };
  1656
+
  1657
+    async.onEach(eventemitter, 1, 'process', processFunc, 'end', done);
  1658
+    eventemitter.start();
  1659
+}
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.