Skip to content

Commit

Permalink
Merge pull request #63 from canjs/major
Browse files Browse the repository at this point in the history
4.0
  • Loading branch information
matthewp authored Jan 29, 2018
2 parents 172d760 + aeeade3 commit ef0c1e4
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 25 deletions.
44 changes: 32 additions & 12 deletions can-stream-kefir.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
var Kefir = require('kefir');
var Kefir = require('can-kefir');
var compute = require('can-compute');
var canStream = require('can-stream');
var canSymbol = require('can-symbol');
var namespace = require('can-namespace');

var getValueDependenciesSymbol = canSymbol.for('can.getValueDependencies');
var getKeyDependenciesSymbol = canSymbol.for('can.getKeyDependencies');

var canStreamKefir = {};

/*
Expand All @@ -10,7 +15,7 @@ var canStreamKefir = {};
* argument is optionally a function.
*/
canStreamKefir.toStream = function (compute) {
return Kefir.stream(function (emitter) {
var stream = Kefir.stream(function (emitter) {
var changeHandler = function (ev, newVal) {
emitter.emit(newVal);
};
Expand All @@ -22,34 +27,36 @@ canStreamKefir.toStream = function (compute) {
emitter.emit(currentValue);
}


return function () {
compute.off('change', changeHandler);
};
});
};

stream[getValueDependenciesSymbol] = function getValueDependencies() {
return {
valueDependencies: new Set([compute])
};
};

canStreamKefir.toCompute = function(makeStream, context){
return stream;
};

canStreamKefir.toCompute = function(makeStream, context){
var emitter,
lastValue,
streamHandler,
lastSetValue;

var setterStream = Kefir.stream(function (e) {
emitter = e;
if(lastSetValue !== undefined) {
if (lastSetValue !== undefined) {
emitter.emit(lastSetValue);
}

}),
valueStream = makeStream.call(context, setterStream);

});
var valueStream = makeStream.call(context, setterStream);

// Create a compute that will bind to the resolved stream when bound
return compute(undefined, {

var streamCompute = compute(undefined, {
// When the compute is read, use that last value
get: function () {
return lastValue;
Expand Down Expand Up @@ -80,6 +87,19 @@ canStreamKefir.toCompute = function(makeStream, context){
valueStream.offValue(streamHandler);
}
});

// the symbol should ideally be implemented in the compute wrapper instead of
// the internal instance, this should be fixed once can-compute is removed
var _compute = streamCompute.computeInstance;
_compute[getKeyDependenciesSymbol] = function getKeyDependencies(key) {
if (key === 'change') {
return {
valueDependencies: new Set([valueStream])
};
}
};

return streamCompute;
};

if (!namespace.streamKefir) {
Expand Down
39 changes: 34 additions & 5 deletions can-stream-kefir_test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var QUnit = require('steal-qunit');
var canStream = require('can-stream-kefir');
var compute = require('can-compute');
var canReflect = require('can-reflect');
var DefineList = require('can-define/list/list');

QUnit.module('can-stream-kefir');
Expand All @@ -26,14 +27,14 @@ test('Compute changes can be streamed', function () {
QUnit.equal(computeVal, 3);
});

test('Compute streams do not bind to the compute unless activated', function () {
QUnit.test('Compute streams do not bind to the compute unless activated', function(assert) {
var c = compute(0);
var stream = canStream.toStream(c);
QUnit.equal(c.computeInstance.__bindEvents, undefined);
stream.onValue(function () {});
QUnit.equal(c.computeInstance.__bindEvents._lifecycleBindings, 1);
});
assert.notOk(c.computeInstance.bound, "should not be bound");

stream.onValue(function() {});
assert.ok(c.computeInstance.bound, "should be bound");
});

test('Compute stream values can be piped into a compute', function () {
var expected = 0;
Expand Down Expand Up @@ -194,3 +195,31 @@ test('Computes with an initial value of undefined do not emit', function() {
expectedLength = 2;
people.pop();
});

QUnit.test('getValueDependencies - stream from compute', function(assert) {
var c = compute(0);
var stream = canStream.toStream(c);

assert.deepEqual(canReflect.getValueDependencies(stream), {
valueDependencies: new Set([c])
});
});

QUnit.test('getValueDependencies - streamedCompute', function(assert) {
var mergeStream;
var c = compute("a");
var letterStream = canStream.toStream(c);

var makeStream = function makeStream(setStream){
return (mergeStream = setStream.merge(letterStream));
};

var streamedCompute = canStream.toCompute(makeStream);

assert.deepEqual(
canReflect.getKeyDependencies(streamedCompute.computeInstance, "change"),
{
valueDependencies: new Set([ mergeStream ])
}
);
});
17 changes: 9 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "can-stream-kefir",
"version": "0.3.3",
"version": "1.0.0-pre.1",
"description": "Stream values into and out of computes",
"homepage": "http://canjs.com",
"author": {
Expand Down Expand Up @@ -35,16 +35,17 @@
]
},
"dependencies": {
"can-compute": "^3.1.0",
"can-event": "^3.5.0",
"can-compute": "^4.0.0",
"can-kefir": "^1.0.0",
"can-namespace": "^1.0.0",
"can-observation": "^3.2.0",
"can-stream": "^0.3.0",
"can-util": "^3.9.0",
"kefir": "^3.5.1"
"can-observation": "^4.0.0",
"can-stream": "^1.0.0",
"can-symbol": "^1.5.0",
"can-util": "^3.9.0"
},
"devDependencies": {
"can-define": "^1.2.3",
"can-define": "^2.0.0",
"can-reflect": "^1.11.1",
"detect-cyclic-packages": "^1.1.0",
"done-serve": "^1.3.0",
"donejs-cli": "^1.0.1",
Expand Down

0 comments on commit ef0c1e4

Please sign in to comment.