diff --git a/can-kefir-test.js b/can-kefir-test.js index 54aeb3e..b651e27 100644 --- a/can-kefir-test.js +++ b/can-kefir-test.js @@ -1,23 +1,23 @@ -var QUnit = require("steal-qunit"); var Kefir = require("can-kefir"); +var QUnit = require("steal-qunit"); +var queues = require("can-queues"); var canReflect = require("can-reflect"); -var batch = require("can-event/batch/batch"); -QUnit.module("can-kefir", { +QUnit.module("can-kefir"); -}); - -QUnit.test("basics", 5, function() { +QUnit.test("basics", function() { var EMITTER; - var stream = Kefir.stream(function(emitter){ + + QUnit.expect(5); + + var stream = Kefir.stream(function(emitter) { EMITTER = emitter; }); - var valueEventCount = 0; - function valueHandler(value){ + function valueHandler(value) { valueEventCount++; - if(valueEventCount === 1) { + if (valueEventCount === 1) { QUnit.equal(value, 1, "produced a value"); } else if (valueEventCount === 2) { QUnit.equal(value, 2, "produced a value"); @@ -25,78 +25,107 @@ QUnit.test("basics", 5, function() { QUnit.ok(false, "should not be called"); } } - canReflect.onKeyValue(stream,"value", valueHandler); + canReflect.onKeyValue(stream, "value", valueHandler); EMITTER.value(1); - - QUnit.equal( canReflect.getKeyValue(stream,"value"), 1, "got initial value"); + QUnit.equal(canReflect.getKeyValue(stream, "value"), 1, "got initial value"); EMITTER.value(2); - canReflect.offKeyValue(stream,"value", valueHandler); - EMITTER.value(3); + canReflect.offKeyValue(stream, "value", valueHandler); + EMITTER.value(3); var errorEventCount = 0; - function errorHandler(value){ + function errorHandler(value) { errorEventCount++; - if(errorEventCount === 1) { + if (errorEventCount === 1) { QUnit.equal(value, "a", "produced an error"); } else { QUnit.ok(false, "no more errors"); } } - canReflect.onKeyValue(stream,"error", errorHandler); + canReflect.onKeyValue(stream, "error", errorHandler); EMITTER.error("a"); + QUnit.equal( + canReflect.getKeyValue(stream, "error"), + "a", + "got initial value" + ); - QUnit.equal( canReflect.getKeyValue(stream,"error"), "a", "got initial value"); - - canReflect.offKeyValue(stream,"error", errorHandler); + canReflect.offKeyValue(stream, "error", errorHandler); EMITTER.error("b"); - -}); - -QUnit.test("streams have a cid", function(){ - - var stream = Kefir.stream(function(){}); - QUnit.ok(stream._cid, "streams have a cid"); }); -QUnit.test("callbacks are within a batch", function(){ +QUnit.test("properties can be read without binding", function() { var EMITTER; - var stream = Kefir.stream(function(emitter){ + + var property = Kefir.stream(function(emitter) { EMITTER = emitter; - }); + }).toProperty(); - function valueHandler(){ - QUnit.ok(batch.batchNum, "batchNum exists"); - } - canReflect.onKeyValue(stream,"value", valueHandler); + property.onValue(function() {}); + EMITTER.value(10); - EMITTER.value(1); + QUnit.equal( + canReflect.getKeyValue(property, "value"), + 10, + "got property value" + ); }); -QUnit.test("properties can be read without binding", function(){ - var EMITTER; - var property = Kefir.stream(function(emitter){ - EMITTER = emitter; +QUnit.test("properties caches value/error correctly when unbound", function(assert) { + var emitter; + + var stream = Kefir.stream(function(e) { + emitter = e; }).toProperty(); - property.onValue(function(){}); - EMITTER.value(10); + var handler = function noop() {}; + canReflect.onKeyValue(stream, "value", handler); + emitter.value(10); + canReflect.offKeyValue(stream, "value", handler); - QUnit.equal( canReflect.getKeyValue(property,"value"), 10, "got property value" ); + assert.equal(canReflect.getKeyValue(stream, "value"), 10); + assert.equal(canReflect.getKeyValue(stream, "error"), undefined); + canReflect.onKeyValue(stream, "value", handler); + assert.equal(canReflect.getKeyValue(stream, "value"), 10, "should be cached"); + canReflect.offKeyValue(stream, "value", handler); }); -QUnit.test("Kefir.emitterProperty", function(){ - var stream = new Kefir.emitterProperty(); +QUnit.test("callbacks are within a batch", function(assert) { + var emitter; + assert.expect(2); + + var stream = Kefir.stream(function(e) { + emitter = e; + }); + var valueChangeCounter = 0; + canReflect.onKeyValue(stream, "value", function onValueChange() { + valueChangeCounter += 1; + }); + + queues.batch.start(); + emitter.value(1); + assert.equal( + valueChangeCounter, + 0, + "handler should not be called while flushing is prevented" + ); + queues.batch.stop(); + + assert.equal(valueChangeCounter, 1); +}); + +QUnit.test("Kefir.emitterProperty", function() { + var stream = new Kefir.emitterProperty(); var valueEventCount = 0; - function valueHandler(value){ + function valueHandler(value) { valueEventCount++; - if(valueEventCount === 1) { + if (valueEventCount === 1) { QUnit.equal(value, 1, "produced a value"); } else if (valueEventCount === 2) { QUnit.equal(value, 2, "produced a value"); @@ -104,49 +133,54 @@ QUnit.test("Kefir.emitterProperty", function(){ QUnit.ok(false, "should not be called"); } } - canReflect.onKeyValue(stream,"value", valueHandler); - + canReflect.onKeyValue(stream, "value", valueHandler); stream.emitter.emit(1); - QUnit.equal( canReflect.getKeyValue(stream,"value"), 1, "got initial value"); + QUnit.equal(canReflect.getKeyValue(stream, "value"), 1, "got initial value"); - canReflect.setKeyValue( stream, "value", 2); - canReflect.offKeyValue(stream,"value", valueHandler); + canReflect.setKeyValue(stream, "value", 2); + canReflect.offKeyValue(stream, "value", valueHandler); stream.emitter.value(3); - var errorEventCount = 0; - function errorHandler(value){ + function errorHandler(value) { errorEventCount++; - if(errorEventCount === 1) { + if (errorEventCount === 1) { QUnit.equal(value, "a", "produced an error"); } else { QUnit.ok(false, "no more errors"); } } - canReflect.onKeyValue(stream,"error", errorHandler); + canReflect.onKeyValue(stream, "error", errorHandler); stream.emitter.error("a"); + QUnit.equal( + canReflect.getKeyValue(stream, "error"), + "a", + "got initial value" + ); - QUnit.equal( canReflect.getKeyValue(stream,"error"), "a", "got initial value"); - - canReflect.offKeyValue(stream,"error", errorHandler); + canReflect.offKeyValue(stream, "error", errorHandler); stream.emitter.error("b"); }); -QUnit.test("get behavior with constant stream", function(){ - var stream = Kefir.stream(function(emit){ +QUnit.test("get behavior with constant stream", function() { + var stream = Kefir.stream(function(emit) { emit.value(1); }); - canReflect.onKeyValue(stream, "value", function(newVal){ + canReflect.onKeyValue(stream, "value", function(newVal) { QUnit.equal(newVal, 1, "got new Value"); }); - QUnit.equal( canReflect.getKeyValue(stream,"value"), 1, "undefined"); + QUnit.equal(canReflect.getKeyValue(stream, "value"), 1, "undefined"); }); -QUnit.test("read emitter", function(){ +QUnit.test("read emitter", function() { var stream = new Kefir.emitterProperty(); - QUnit.equal( canReflect.getKeyValue(stream,"emitter"), stream.emitter, "got the emitter"); + QUnit.equal( + canReflect.getKeyValue(stream, "emitter"), + stream.emitter, + "got the emitter" + ); }); diff --git a/can-kefir.js b/can-kefir.js index 67e7c00..047489a 100644 --- a/can-kefir.js +++ b/can-kefir.js @@ -1,46 +1,50 @@ -var canReflect = require("can-reflect"); -var canSymbol = require("can-symbol"); -var dev = require("can-util/js/dev/dev"); var Kefir = require("kefir"); -var Observation = require("can-observation"); -var CID = require("can-cid"); -var canBatch = require("can-event/batch/batch"); -var defineLazyValue = require("can-define-lazy-value"); - -var observeDataSymbol = canSymbol.for("can.observeData"); - -function getObserveData(stream) { - var observeData = stream[observeDataSymbol]; - if(!observeData) { - observeData = Object.create(null); - observeData.onValueHandlers = []; - observeData.onErrorHandlers = []; - Object.defineProperty(stream, observeDataSymbol, { - enumerable: false, - configurable: false, - writable: false, - value: observeData - }); - } - return observeData; -} +var canSymbol = require("can-symbol"); +var canReflect = require("can-reflect"); +var mapEventsMixin = require("can-event-queue/map/map"); +var ObservationRecorder = require("can-observation-recorder"); + +var metaSymbol = canSymbol.for("can.meta"); +var onKeyValueSymbol = canSymbol.for("can.onKeyValue"); +var offKeyValueSymbol = canSymbol.for("can.offKeyValue"); var keyNames = { - "value": {on: "onValue", handlers: "onValueHandlers", off: "offValue", handler: "onValueHandler"}, - "error": {on: "onError", handlers: "onErrorHandlers", off: "offError", handler: "onErrorHandler"} + value: { + on: "onValue", + off: "offValue", + handler: "onValueHandler", + handlers: "onValueHandlers" + }, + error: { + on: "onError", + off: "offError", + handler: "onErrorHandler", + handlers: "onErrorHandlers" + } }; +function ensureMeta(obj) { + var meta = obj[metaSymbol]; + + if (!meta) { + meta = {}; + canReflect.setKeyValue(obj, metaSymbol, meta); + } + + return meta; +} + // get the current value from a stream function getCurrentValue(stream, key) { - if(stream._currentEvent && stream._currentEvent.type === key) { + if (stream._currentEvent && stream._currentEvent.type === key) { return stream._currentEvent.value; } else { var names = keyNames[key]; - if(!names) { + if (!names) { return stream[key]; } var VALUE, - valueHandler = function(value){ + valueHandler = function(value) { VALUE = value; }; stream[names.on](valueHandler); @@ -49,124 +53,134 @@ function getCurrentValue(stream, key) { } } -if (Kefir) { - // makes the CID property a virtual property whose value gets defined later. - defineLazyValue(Kefir.Observable.prototype,"_cid", function(){ - return CID({}); - }); +Kefir.Observable.prototype._eventSetup = function eventSetup() { + var stream = this; + var meta = ensureMeta(stream); - // Observable is parent of Kefir.Stream - canReflect.assignSymbols(Kefir.Observable.prototype, { - "can.onKeyValue": function(key, handler){ - var names = keyNames[key]; - //!steal-remove-start - if(!names) { - dev.warn("can-kefir: You can not listen to the "+key+" property on a Kefir stream."); - } - //!steal-remove-end - - var observeData = getObserveData(this); - var handlers = observeData[names.handlers]; - if( handlers.length === 0 ) { - var stream = this; - var onHandler = observeData[names.handler] = function(value){ - // only send events for a change - if(value !== observeData[key]) { - observeData[key] = value; - handlers.forEach(function(handler){ - canBatch.queue([handler, stream, [value]]); - }); - } - }; - // Must push handler so it can be called immediately - handlers.push(handler); - this[names.on](onHandler); - } else { - handlers.push(handler); - } + meta.bound = true; - }, - "can.offKeyValue": function(key, handler){ - var names = keyNames[key]; - - var observeData = getObserveData(this); - var handlers = observeData[names.handlers]; - - var index = handlers.indexOf(handler); - if(index !== -1) { - handlers.splice(index, 1); - if(handlers.length === 0) { - this[names.off](observeData[names.handler]); - delete this[observeDataSymbol]; - } - } - }, - "can.getKeyValue": function(key){ + meta.onValueHandler = function onValueHandler(newValue) { + var oldValue = meta.value; + meta.value = newValue; - if(!keyNames[key]) { - return this[key]; - } + // only send events for a change + if (newValue !== oldValue) { + mapEventsMixin.dispatch.call( + stream, + { type: "value" }, + [newValue, oldValue] + ); + } + }; + + meta.onErrorHandler = function onErrorHandler(error) { + var prevError = meta.error; + meta.error = error; + + mapEventsMixin.dispatch.call( + stream, + { type: "error" }, + [error, prevError] + ); + }; + + stream.onValue(meta.onValueHandler); + stream.onError(meta.onErrorHandler); +}; + +Kefir.Observable.prototype._eventTeardown = function eventTeardown() { + var stream = this; + var meta = ensureMeta(stream); - Observation.add(this, key); + meta.bound = false; + + stream.offValue(meta.onValueHandler); + stream.offError(meta.onErrorHandler); +}; + +// Observable is parent of Kefir.Stream +canReflect.assignSymbols(Kefir.Observable.prototype, { + "can.onKeyValue": function onKeyValue() { + return mapEventsMixin[onKeyValueSymbol].apply( + this, + arguments + ); + }, + "can.offKeyValue": function() { + return mapEventsMixin[offKeyValueSymbol].apply( + this, + arguments + ); + }, + "can.getKeyValue": function(key) { + var stream = this; + var meta = ensureMeta(stream); + + if (!keyNames[key]) { + return stream[key]; + } + + ObservationRecorder.add(stream, key); + + if (meta.bound) { + return meta[key]; + } else { // we haven't been bound ... see what we can get from the observable - if(!this[observeDataSymbol]) { - // using internals for performance ... - var observeData = getObserveData(this); - var currentValue = getCurrentValue(this, key); - // save current value so we won't through events if we provided a value - return observeData[key] = currentValue; - } - return getObserveData(this)[key]; + // using internals for performance ... + var currentValue = getCurrentValue(stream, key); + // save current value so we won't through events if we provided a value + meta[key] = currentValue; + + return currentValue; } - }); + } +}); - Kefir.emitterProperty = function(){ - var emitter; - var setLastValue = false; - var lastValue, - lastError; +Kefir.emitterProperty = function() { + var emitter; + var setLastValue = false; + var lastValue, lastError; - var stream = Kefir.stream(function(EMITTER){ - emitter = EMITTER; - if(setLastValue) { - emitter.value(lastValue); - } - return function(){ - emitter = undefined; - }; - }); - - var property = stream.toProperty(function(){ - return lastValue; - }); - property.emitter = { - value: function(newValue) { - if(emitter) { - return emitter.emit(newValue); - } else { - setLastValue = true; - lastValue = newValue; - } - }, - error: function(error) { - if(emitter) { - return emitter.error(error); - } else { - lastError = error; - } - } + var stream = Kefir.stream(function(EMITTER) { + emitter = EMITTER; + if (setLastValue) { + emitter.value(lastValue); + } + return function() { + emitter = undefined; }; - property.emitter.emit = property.emitter.value; + }); - canReflect.assignSymbols(property,{ - "can.setKeyValue": function(key, value){ - this.emitter[key](value); + var property = stream.toProperty(function() { + return lastValue; + }); + property.emitter = { + value: function(newValue) { + if (emitter) { + return emitter.emit(newValue); + } else { + setLastValue = true; + lastValue = newValue; } - }); - - return property; + }, + error: function(error) { + if (emitter) { + return emitter.error(error); + } else { + lastError = error; + } + } }; -} + property.emitter.emit = property.emitter.value; + + canReflect.assignSymbols(property, { + "can.setKeyValue": function setKeyValue(key, value) { + this.emitter[key](value); + } + }); + + return property; +}; module.exports = Kefir; diff --git a/package.json b/package.json index 7f6394d..5726770 100644 --- a/package.json +++ b/package.json @@ -43,16 +43,16 @@ "streams" ], "dependencies": { - "can-cid": "^1.0.3", - "can-define-lazy-value": "^1.0.0", "can-event": "^3.7.0", - "can-observation": "^3.3.0", - "can-reflect": "^1.2.0", + "can-event-queue": "^0.15.3", + "can-observation-recorder": "^0.1.3", + "can-reflect": "^1.11.1", "can-symbol": "^1.0.0", "can-util": "^3.9.0", "kefir": "^3.5.1" }, "devDependencies": { + "can-queues": "^0.3.1", "detect-cyclic-packages": "^1.1.0", "jshint": "^2.9.1", "steal": "^1.3.1",