Skip to content

Commit

Permalink
Add an array data stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Graber committed Nov 18, 2015
1 parent eace1ac commit acdbd88
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 4 deletions.
15 changes: 15 additions & 0 deletions index.html
Expand Up @@ -44,6 +44,21 @@ <h3>Custom consumer</h3>
<button ng-click="controller.finishCustom()">Finish</button>
<button ng-click="controller.generateCustomError()">Error</button>
</div>
<div>
<h3>Array consumer</h3>
<div>
<label>Current value: </label>
<span>{{controller.arrayCurrentValue}}</span>
</div>
<div>
<label>Finished: </label>
<span>{{controller.arrayFinished}}</span>
</div>
<div>
<label>Error: </label>
<span>{{controller.arrayError}}</span>
</div>
</div>
</div>

<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/1.4.7/angular.min.js"></script>
Expand Down
37 changes: 33 additions & 4 deletions js/controllers/eventConsumer/eventConsumer.controller.js
@@ -1,13 +1,16 @@
'use strict';

var angular = require('angular');
var Rx = require('rx');

var customStream = require('../../services/customStream/customStream.service');
var arraySource = require('../../services/arraySource/arraySource.service');

exports.moduleName = 'exp-ang.controllers.eventConsumer';
exports.controllerName = 'EventConsumerController';

EventConsumerController.$inject = ['$scope', customStream.serviceName];
function EventConsumerController($scope, customStream) {
EventConsumerController.$inject = ['$scope', customStream.serviceName, arraySource.serviceName];
function EventConsumerController($scope, customStream, arraySource) {
var self = this;

// --- public interface ---
Expand All @@ -25,9 +28,15 @@ function EventConsumerController($scope, customStream) {
self.finishCustom = customStream.end;
self.generateCustomError = customStream.error;

// Array
self.arrayCurrentValue = null;
self.arrayFinished = false;
self.arrayError = null;

// --- initialize ---
consumeEventStream();
consumeCustomStream();
consumeArrayStream();

function consumeEventStream() {
var oldSubscription;
Expand Down Expand Up @@ -56,7 +65,6 @@ function EventConsumerController($scope, customStream) {
}

function consumeCustomStream() {
// Use controlled() to cause stream to only supply values on request
var customSourceStream = customStream.get();

customSourceStream.subscribe(function(value) {
Expand All @@ -69,7 +77,28 @@ function EventConsumerController($scope, customStream) {
self.customFinished = true;
});
}

function consumeArrayStream() {
// Use controlled() to cause stream to only supply values on request
var controlledArraySource = arraySource.get().controlled();
var pullData;

// Display each value from the stream
controlledArraySource.subscribe(function(value) {
self.arrayCurrentValue = value;
}, function(error) {
self.arrayError = error;
}, function() {
self.arrayFinished = true;
pullData.dispose();
});

// Request values at regular intervals
pullData = Rx.Observable.interval(1000).subscribe(function() {
$scope.$apply(function() { controlledArraySource.request(1); });
});
}
}

angular.module(exports.moduleName, [customStream.moduleName])
angular.module(exports.moduleName, [customStream.moduleName, arraySource.moduleName])
.controller(exports.controllerName, EventConsumerController);
26 changes: 26 additions & 0 deletions js/services/arraySource/arraySource.service.js
@@ -0,0 +1,26 @@
'use strict';

var angular = require('angular');
var Rx = require('rx');

exports.moduleName = 'exp-ang.services.arraySource';
exports.serviceName = 'arraySource';

function arraySourceFactory() {
var data;

// public interface
var service = {
get: function() {
return Rx.Observable.from(data);
},
};

// initialization
data = [ 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100 ];

return service;
}

angular.module(exports.moduleName, [])
.factory(exports.serviceName, arraySourceFactory);

0 comments on commit acdbd88

Please sign in to comment.