diff --git a/lib/src/observables/observables.dart b/lib/src/observables/observables.dart index 5dfabde..fe25a06 100644 --- a/lib/src/observables/observables.dart +++ b/lib/src/observables/observables.dart @@ -3,4 +3,5 @@ export 'package:disposal/disposal.dart' show Disposable; export 'observables/observable.dart'; export 'observables/proxy_observable.dart'; export 'observers/observer.dart'; +export 'subjects/publish_subject.dart'; export 'subjects/subject.dart'; diff --git a/lib/src/observables/subjects/basic_subject.dart b/lib/src/observables/subjects/basic_subject.dart new file mode 100644 index 0000000..3d6cb37 --- /dev/null +++ b/lib/src/observables/subjects/basic_subject.dart @@ -0,0 +1,47 @@ + +import 'package:meta/meta.dart'; +import 'package:disposal/disposal.dart'; + +import '../observers/observer.dart'; +import 'subject.dart'; + +@internal +class BasicSubject implements Subject { + + final _idGenerator = _SerialNumberGenerator(); + final Map> _map = {}; + + @override + Disposable observe(OnData onData) { + final id = _idGenerator.generate(); + _map[id] = onData; + return Disposable(() { + _map.remove(id); + }); + } + + @override + void onData(T data) { + if (_map.isEmpty) { + return; + } + for (final onData in _map.values) { + onData(data); + } + } + + @override + void dispose() { + _map.clear(); + } +} + +class _SerialNumberGenerator { + int? _last; + + int generate() { + final it = _last == null ? 0 : _last! + 1; + _last = it; + return it; + } +} diff --git a/lib/src/observables/subjects/publish_subject.dart b/lib/src/observables/subjects/publish_subject.dart new file mode 100644 index 0000000..b543fae --- /dev/null +++ b/lib/src/observables/subjects/publish_subject.dart @@ -0,0 +1,37 @@ + +import 'package:disposal/disposal.dart'; + +import '../observers/observer.dart'; +import 'basic_subject.dart'; +import 'subject.dart'; + +class PublishSubject implements Subject { + + bool _disposed = false; + final _subject = BasicSubject(); + + @override + Disposable observe(OnData onData) { + if (_disposed) { + throw StateError("Subject is disposed and can't be observed."); + } + return _subject.observe(onData); + } + + @override + void onData(T data) { + if (_disposed) { + return; + } + _subject.onData(data); + } + + @override + void dispose() { + if (_disposed) { + return; + } + _disposed = true; + _subject.dispose(); + } +} diff --git a/test/observables/observables_test.dart b/test/observables/observables_test.dart index a4bd9d0..26d54e5 100644 --- a/test/observables/observables_test.dart +++ b/test/observables/observables_test.dart @@ -9,6 +9,7 @@ import 'observables/proxy_observable_test.dart' as proxy_observable_test; import 'observables/skip_observable_test.dart' as skip_observable_test; import 'observables/where_observable_test.dart' as where_observable_test; import 'observers/observer_test.dart' as observer_test; +import 'subjects/publish_subject_test.dart' as publish_subject_test; import 'subjects/subject_test.dart' as subject_test; @@ -23,5 +24,6 @@ void main() { skip_observable_test.main(); where_observable_test.main(); observer_test.main(); + publish_subject_test.main(); subject_test.main(); } diff --git a/test/observables/subjects/publish_subject_test.dart b/test/observables/subjects/publish_subject_test.dart new file mode 100644 index 0000000..a5941ef --- /dev/null +++ b/test/observables/subjects/publish_subject_test.dart @@ -0,0 +1,157 @@ + +import 'package:test/test.dart'; +import 'package:scopes/scopes.dart'; + +import '../../toolbox/observable_tester.dart'; + +void main() { + + test('publish subject forward data to single observer', () { + + final subject = PublishSubject(); + + final tester = ObservableTester( + subject, + ); + + tester.startObserve(); + + expect(tester.recorded, []); + subject.onData('a'); + expect(tester.recorded, [ + 'a', + ]); + + tester.stopObserve(); + subject.dispose(); + + }); + + test('publish subject forward data to multiple observers', () { + + final subject = PublishSubject(); + + final tester1 = ObservableTester( + subject, + ); + + final tester2 = ObservableTester( + subject, + ); + + tester1.startObserve(); + tester2.startObserve(); + + expect(tester1.recorded, []); + expect(tester2.recorded, []); + subject.onData('a'); + expect(tester1.recorded, [ + 'a', + ]); + expect(tester2.recorded, [ + 'a', + ]); + + tester1.stopObserve(); + tester2.stopObserve(); + subject.dispose(); + + }); + + test("publish subject only forward data after observed", () { + + final subject = PublishSubject(); + + final tester = ObservableTester( + subject, + ); + + subject.onData('a'); + + tester.startObserve(); + + expect(tester.recorded, []); + subject.onData('b'); + expect(tester.recorded, [ + 'b', + ]); + + tester.stopObserve(); + subject.dispose(); + + }); + + test("publish subject won't forward data after observation disposed", () { + + final subject = PublishSubject(); + + final tester = ObservableTester( + subject, + ); + + tester.startObserve(); + + subject.onData('a'); + + tester.stopObserve(); + + expect(tester.recorded, [ + 'a', + ]); + subject.onData('b'); + expect(tester.recorded, [ + 'a', + ]); + + subject.dispose(); + + }); + + test("publish subject won't forward data after subject disposed", () { + + final subject = PublishSubject(); + + final tester = ObservableTester( + subject, + ); + + tester.startObserve(); + + subject.onData('a'); + + subject.dispose(); + + expect(tester.recorded, [ + 'a', + ]); + subject.onData('b'); + expect(tester.recorded, [ + 'a', + ]); + + tester.stopObserve(); + + }); + + test('publish subject throws error when been observed after it is disposed', () { + + final subject = PublishSubject(); + + subject.dispose(); + + expect( + () { + subject.observe((data) {}); + }, + throwsA( + isA() + .having( + (error) => error.toString(), + 'description', + contains("Subject is disposed and can't be observed."), + ), + ), + ); + }); + +}