Skip to content

Commit

Permalink
add PublishSubject
Browse files Browse the repository at this point in the history
  • Loading branch information
beeth0ven committed Jun 6, 2022
1 parent 8bf76db commit a1fc94c
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/src/observables/observables.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export 'package:disposal/disposal.dart' show Disposable;
export 'observables/observable.dart';
export 'observables/proxy_observable.dart';
export 'observers/observer.dart';
export 'subjects/publish_subject.dart';
export 'subjects/subject.dart';
47 changes: 47 additions & 0 deletions lib/src/observables/subjects/basic_subject.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

import 'package:meta/meta.dart';
import 'package:disposal/disposal.dart';

import '../observers/observer.dart';
import 'subject.dart';

@internal
class BasicSubject<T> implements Subject<T> {

final _idGenerator = _SerialNumberGenerator();
final Map<int, OnData<T>> _map = {};

@override
Disposable observe(OnData<T> onData) {
final id = _idGenerator.generate();
_map[id] = onData;
return Disposable(() {
_map.remove(id);
});
}

@override
void onData(T data) {
if (_map.isEmpty) {
return;
}
for (final onData in _map.values) {
onData(data);
}
}

@override
void dispose() {
_map.clear();
}
}

class _SerialNumberGenerator {
int? _last;

int generate() {
final it = _last == null ? 0 : _last! + 1;
_last = it;
return it;
}
}
37 changes: 37 additions & 0 deletions lib/src/observables/subjects/publish_subject.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

import 'package:disposal/disposal.dart';

import '../observers/observer.dart';
import 'basic_subject.dart';
import 'subject.dart';

class PublishSubject<T> implements Subject<T> {

bool _disposed = false;
final _subject = BasicSubject<T>();

@override
Disposable observe(OnData<T> onData) {
if (_disposed) {
throw StateError("Subject is disposed and can't be observed.");
}
return _subject.observe(onData);
}

@override
void onData(T data) {
if (_disposed) {
return;
}
_subject.onData(data);
}

@override
void dispose() {
if (_disposed) {
return;
}
_disposed = true;
_subject.dispose();
}
}
2 changes: 2 additions & 0 deletions test/observables/observables_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'observables/proxy_observable_test.dart' as proxy_observable_test;
import 'observables/skip_observable_test.dart' as skip_observable_test;
import 'observables/where_observable_test.dart' as where_observable_test;
import 'observers/observer_test.dart' as observer_test;
import 'subjects/publish_subject_test.dart' as publish_subject_test;
import 'subjects/subject_test.dart' as subject_test;


Expand All @@ -23,5 +24,6 @@ void main() {
skip_observable_test.main();
where_observable_test.main();
observer_test.main();
publish_subject_test.main();
subject_test.main();
}
157 changes: 157 additions & 0 deletions test/observables/subjects/publish_subject_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@

import 'package:test/test.dart';
import 'package:scopes/scopes.dart';

import '../../toolbox/observable_tester.dart';

void main() {

test('publish subject forward data to single observer', () {

final subject = PublishSubject<String>();

final tester = ObservableTester(
subject,
);

tester.startObserve();

expect(tester.recorded, []);
subject.onData('a');
expect(tester.recorded, [
'a',
]);

tester.stopObserve();
subject.dispose();

});

test('publish subject forward data to multiple observers', () {

final subject = PublishSubject<String>();

final tester1 = ObservableTester(
subject,
);

final tester2 = ObservableTester(
subject,
);

tester1.startObserve();
tester2.startObserve();

expect(tester1.recorded, []);
expect(tester2.recorded, []);
subject.onData('a');
expect(tester1.recorded, [
'a',
]);
expect(tester2.recorded, [
'a',
]);

tester1.stopObserve();
tester2.stopObserve();
subject.dispose();

});

test("publish subject only forward data after observed", () {

final subject = PublishSubject<String>();

final tester = ObservableTester(
subject,
);

subject.onData('a');

tester.startObserve();

expect(tester.recorded, []);
subject.onData('b');
expect(tester.recorded, [
'b',
]);

tester.stopObserve();
subject.dispose();

});

test("publish subject won't forward data after observation disposed", () {

final subject = PublishSubject<String>();

final tester = ObservableTester(
subject,
);

tester.startObserve();

subject.onData('a');

tester.stopObserve();

expect(tester.recorded, [
'a',
]);
subject.onData('b');
expect(tester.recorded, [
'a',
]);

subject.dispose();

});

test("publish subject won't forward data after subject disposed", () {

final subject = PublishSubject<String>();

final tester = ObservableTester(
subject,
);

tester.startObserve();

subject.onData('a');

subject.dispose();

expect(tester.recorded, [
'a',
]);
subject.onData('b');
expect(tester.recorded, [
'a',
]);

tester.stopObserve();

});

test('publish subject throws error when been observed after it is disposed', () {

final subject = PublishSubject<String>();

subject.dispose();

expect(
() {
subject.observe((data) {});
},
throwsA(
isA<StateError>()
.having(
(error) => error.toString(),
'description',
contains("Subject is disposed and can't be observed."),
),
),
);
});

}

0 comments on commit a1fc94c

Please sign in to comment.