Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor - prefer source over observable to name input observable #177

Merged
merged 1 commit into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 18 additions & 18 deletions lib/src/dart_observable/observables/observable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ abstract class Observable<T> {
///
/// Modified from: https://reactivex.io/documentation/operators/combinelatest.html
static Observable<R> combine<T, R>({
required List<Observable<T>> observables,
required List<Observable<T>> sources,
required R Function(List<T> items) combiner,
}) => ObservableCombine(
observables: observables,
sources: sources,
combiner: combiner,
);

Expand All @@ -129,12 +129,12 @@ abstract class Observable<T> {
///
/// Modified from: https://reactivex.io/documentation/operators/combinelatest.html
static Observable<R> combine2<T1, T2, R>({
required Observable<T1> observable1,
required Observable<T2> observable2,
required Observable<T1> source1,
required Observable<T2> source2,
required R Function(T1, T2) combiner,
}) => CombineObservable2(
observable1: observable1,
observable2: observable2,
source1: source1,
source2: source2,
combiner: combiner,
);

Expand All @@ -146,14 +146,14 @@ abstract class Observable<T> {
///
/// Modified from: https://reactivex.io/documentation/operators/combinelatest.html
static Observable<R> combine3<T1, T2, T3, R>({
required Observable<T1> observable1,
required Observable<T2> observable2,
required Observable<T3> observable3,
required Observable<T1> source1,
required Observable<T2> source2,
required Observable<T3> source3,
required R Function(T1, T2, T3) combiner,
}) => CombineObservable3(
observable1: observable1,
observable2: observable2,
observable3: observable3,
source1: source1,
source2: source2,
source3: source3,
combiner: combiner,
);
}
Expand All @@ -174,7 +174,7 @@ extension ObservableX<T> on Observable<T> {
Observable<R> map<R>(R Function(T) convert) {
return ObservableMap<T, R>(
convert: convert,
observable: this,
source: this,
);
}

Expand All @@ -188,15 +188,15 @@ extension ObservableX<T> on Observable<T> {
Observable<T> where(bool Function(T) test) {
return ObservableWhere<T>(
test: test,
observable: this,
source: this,
);
}

/// Mapping source `Observable<T>` to `Observable<R>`
/// by casting each item as `R`.
Observable<R> cast<R>() {
return ObservableCast<T, R>(
observable: this,
source: this,
);
}

Expand All @@ -207,7 +207,7 @@ extension ObservableX<T> on Observable<T> {
Observable<T> distinct([Equals<T>? equals]) {
return ObservableDistinct<T>(
equals: equals,
observable: this,
source: this,
);
}

Expand All @@ -232,7 +232,7 @@ extension ObservableX<T> on Observable<T> {
Observable<T> skip(int n) {
return ObservableSkip(
n: n,
observable: this,
source: this,
);
}

Expand All @@ -249,7 +249,7 @@ extension ObservableX<T> on Observable<T> {
}) {
return ObservableMulticast<T>(
createSubject: createSubject,
observable: this,
source: this,
);
}

Expand Down
8 changes: 4 additions & 4 deletions lib/src/dart_observable/observables/observable_cast.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import 'observable.dart';
class ObservableCast<T, R> implements Observable<R> {

const ObservableCast({
required Observable<T> observable,
}): _observable = observable;
required Observable<T> source,
}): _source = source;

final Observable<T> _observable;
final Observable<T> _source;

@override
Disposable observe(OnData<R> onData) {
return _observable.observe((data) {
return _source.observe((data) {
onData(data as R);
});
}
Expand Down
52 changes: 26 additions & 26 deletions lib/src/dart_observable/observables/observable_combine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ import 'observation.dart';
class ObservableCombine<T, R> implements Observable<R> {

const ObservableCombine({
required List<Observable<T>> observables,
required List<Observable<T>> sources,
required R Function(List<T> items) combiner,
}): _observables = observables,
}): _sources = sources,
_combiner = combiner;

final List<Observable<T>> _observables;
final List<Observable<T>> _sources;
final R Function(List<T> items) _combiner;

@override
Disposable observe(OnData<R> onData) {
if (_observables.isEmpty) {
if (_sources.isEmpty) {
return Disposable.empty;
}
return _Observation<T, R>(
observables: _observables,
sources: _sources,
combiner: _combiner,
emit: onData,
);
Expand All @@ -35,13 +35,13 @@ class ObservableCombine<T, R> implements Observable<R> {
class CombineObservable2<T1, T2, R> extends ObservableCombine<Object?, R> {

CombineObservable2({
required Observable<T1> observable1,
required Observable<T2> observable2,
required Observable<T1> source1,
required Observable<T2> source2,
required R Function(T1, T2) combiner,
}): super(
observables: [
observable1.cast<Object?>(),
observable2.cast<Object?>(),
sources: [
source1.cast<Object?>(),
source2.cast<Object?>(),
],
combiner: (items) => combiner(
items[0] as T1,
Expand All @@ -54,15 +54,15 @@ class CombineObservable2<T1, T2, R> extends ObservableCombine<Object?, R> {
class CombineObservable3<T1, T2, T3, R> extends ObservableCombine<Object?, R> {

CombineObservable3({
required Observable<T1> observable1,
required Observable<T2> observable2,
required Observable<T3> observable3,
required Observable<T1> source1,
required Observable<T2> source2,
required Observable<T3> source3,
required R Function(T1, T2, T3) combiner,
}): super(
observables: [
observable1.cast<Object?>(),
observable2.cast<Object?>(),
observable3.cast<Object?>(),
sources: [
source1.cast<Object?>(),
source2.cast<Object?>(),
source3.cast<Object?>(),
],
combiner: (items) => combiner(
items[0] as T1,
Expand All @@ -75,32 +75,32 @@ class CombineObservable3<T1, T2, T3, R> extends ObservableCombine<Object?, R> {
class _Observation<T, R> extends Observation<R> {

_Observation({
required List<Observable<T>> observables,
required List<Observable<T>> sources,
required R Function(List<T> items) combiner,
required super.emit,
}): _observables = observables,
}): _sources = sources,
_combiner = combiner;

final List<Observable<T>> _observables;
final List<Observable<T>> _sources;
final R Function(List<T> items) _combiner;

late final int _observablesLength;
late final int _sourcesLength;
late final Set<int> _emitted;
late final List<T?> _latestItems;
late final List<Disposable> _sourceObservations;

@override
void init() {
_observablesLength = _observables.length;
_sourcesLength = _sources.length;
_emitted = <int>{};
_latestItems = List<T?>.filled(_observablesLength, null);
_latestItems = List<T?>.filled(_sourcesLength, null);
_sourceObservations = Iterable<Disposable>
.generate(_observablesLength, _observeIndexed)
.generate(_sourcesLength, _observeIndexed)
.toList();
}

Disposable _observeIndexed(int index) {
return _observables[index]
return _sources[index]
.observe(_onDataIndexed(index));
}

Expand All @@ -110,7 +110,7 @@ class _Observation<T, R> extends Observation<R> {
_emitted.add(index);
}
_latestItems[index] = data;
if (_emitted.length == _observablesLength) {
if (_emitted.length == _sourcesLength) {
final items = List<T>.from(_latestItems, growable: false);
final combinedItem = _combiner(items);
emit(combinedItem);
Expand Down
16 changes: 8 additions & 8 deletions lib/src/dart_observable/observables/observable_distinct.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ import 'observation.dart';
class ObservableDistinct<T> implements Observable<T> {
const ObservableDistinct({
required Equals<T>? equals,
required Observable<T> observable,
required Observable<T> source,
}): _equals = equals ?? defaultEquals,
_observable = observable;
_source = source;

final Equals<T> _equals;
final Observable<T> _observable;
final Observable<T> _source;

@override
Disposable observe(OnData<T> onData) {
return _Observation<T>(
equals: _equals,
observable: _observable,
source: _source,
emit: onData,
);
}
Expand All @@ -33,20 +33,20 @@ class _Observation<T> extends Observation<T> implements Observer<T> {

_Observation({
required Equals<T> equals,
required Observable<T> observable,
required Observable<T> source,
required super.emit,
}): _equals = equals,
_observable = observable;
_source = source;

final Equals<T> _equals;
final Observable<T> _observable;
final Observable<T> _source;

Value<T>? _oldData;
late final Disposable _sourceObservation;

@override
void init() {
_sourceObservation = _observable.observe(onData);
_sourceObservation = _source.observe(onData);
}

@override
Expand Down
8 changes: 4 additions & 4 deletions lib/src/dart_observable/observables/observable_map.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ class ObservableMap<T, R> implements Observable<R> {

const ObservableMap({
required R Function(T) convert,
required Observable<T> observable,
required Observable<T> source,
}): _convert = convert,
_observable = observable;
_source = source;

final R Function(T) _convert;
final Observable<T> _observable;
final Observable<T> _source;

@override
Disposable observe(OnData<R> onData) {
return _observable.observe((data) {
return _source.observe((data) {
final result = _convert(data);
onData(result);
});
Expand Down
16 changes: 8 additions & 8 deletions lib/src/dart_observable/observables/observable_multicast.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ class ObservableMulticast<T> implements Observable<T> {

ObservableMulticast({
Subject<T> Function()? createSubject,
required Observable<T> observable,
required Observable<T> source,
}): _createSubject = createSubject ?? _defaultCreateSubject,
_observable = observable,
_source = source,
_shared = _SharedBetweenObservations<T>();

final Subject<T> Function() _createSubject;
final Observable<T> _observable;
final Observable<T> _source;

final _SharedBetweenObservations<T> _shared;

@override
Disposable observe(OnData<T> onData) {
return _Observation<T>(
createSubject: _createSubject,
observable: _observable,
source: _source,
shared: _shared,
emit: onData,
);
Expand All @@ -48,15 +48,15 @@ class _Observation<T> extends Observation<T> {

_Observation({
required Subject<T> Function() createSubject,
required Observable<T> observable,
required Observable<T> source,
required _SharedBetweenObservations<T> shared,
required super.emit,
}): _createSubject = createSubject,
_observable = observable,
_source = source,
_shared = shared;

final Subject<T> Function() _createSubject;
final Observable<T> _observable;
final Observable<T> _source;

final _SharedBetweenObservations<T> _shared;

Expand All @@ -69,7 +69,7 @@ class _Observation<T> extends Observation<T> {
_subjectObservation = subject.observe(emit);
_shared.observersCount += 1;
if (_shared.observersCount == 1) {
_shared.observation = _observable.observe(subject.onData);
_shared.observation = _source.observe(subject.onData);
}
}

Expand Down