Skip to content

Commit

Permalink
Decouple Bloc from RxDart
Browse files Browse the repository at this point in the history
  • Loading branch information
felangel committed Feb 2, 2020
1 parent c475519 commit c908edf
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 58 deletions.
101 changes: 60 additions & 41 deletions packages/bloc/lib/src/bloc.dart
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
import 'dart:async';

import 'package:meta/meta.dart';
import 'package:rxdart/rxdart.dart';

import '../bloc.dart';

/// Signature for a mapper function which takes an [Event] as input
/// and outputs a [Stream] of [Transition] objects.
typedef TransitionFunction<Event, State> = Stream<Transition<Event, State>>
Function(Event);

/// {@template bloc}
/// Takes a `Stream` of `Events` as input
/// and transforms them into a `Stream` of `States` as output.
/// {@endtemplate}
abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
final PublishSubject<Event> _eventSubject = PublishSubject<Event>();
final _eventController = StreamController<Event>.broadcast();
final _stateController = StreamController<State>.broadcast();

BehaviorSubject<State> _stateSubject;
State _state;

/// Returns the current [state] of the [bloc].
State get state => _stateSubject.value;
State get state => _state;

/// Returns the [state] before any `events` have been [add]ed.
State get initialState;

/// Returns whether the `Stream<State>` is a broadcast stream.
@override
bool get isBroadcast => _stateSubject.isBroadcast;
bool get isBroadcast => _stateController.stream.isBroadcast;

/// {@macro bloc}
Bloc() {
_stateSubject = BehaviorSubject<State>.seeded(initialState);
_bindStateSubject();
_state = initialState;
_bindEventsToStates();
}

/// Adds a subscription to the `Stream<State>`.
Expand All @@ -41,14 +46,19 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
void Function() onDone,
bool cancelOnError,
}) {
return _stateSubject.listen(
return _createStateStream().listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}

Stream<State> _createStateStream() async* {
yield state;
yield* _stateController.stream;
}

/// Called whenever an [event] is [add]ed to the [bloc].
/// A great spot to add logging/analytics at the individual [bloc] level.
void onEvent(Event event) => null;
Expand Down Expand Up @@ -77,7 +87,7 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
try {
BlocSupervisor.delegate.onEvent(this, event);
onEvent(event);
_eventSubject.sink.add(event);
_eventController.add(event);
} on dynamic catch (error) {
_handleError(error);
}
Expand All @@ -93,14 +103,14 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
@override
@mustCallSuper
Future<void> close() async {
await _eventSubject.close();
await _stateSubject.close();
await _eventController.close();
await _stateController.close();
}

/// Transforms the [events] stream along with a [next] function into
/// a `Stream<State>`.
/// Transforms the [events] stream along with a [transitionFn] function into
/// a `Stream<Transition>`.
/// Events that should be processed by [mapEventToState] need to be passed to
/// [next].
/// [transitionFn].
/// By default `asyncExpand` is used to ensure all [events] are processed in
/// the order in which they are received.
/// You can override [transformEvents] for advanced usage in order to
Expand All @@ -112,26 +122,28 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
///
/// ```dart
/// @override
/// Stream<State> transformEvents(events, next) => events.switchMap(next);
/// Stream<Transition<Event, State>> transformEvents(events, transitionFn) {
/// return events.switchMap(transitionFn);
/// }
/// ```
///
/// Alternatively, if you only want [mapEventToState] to be called for
/// distinct [events]:
///
/// ```dart
/// @override
/// Stream<State> transformEvents(events, next) {
/// Stream<Transition<Event, State>> transformEvents(events, transitionFn) {
/// return super.transformEvents(
/// events.distinct(),
/// next,
/// transitionFn,
/// );
/// }
/// ```
Stream<State> transformEvents(
Stream<Transition<Event, State>> transformEvents(
Stream<Event> events,
Stream<State> Function(Event) next,
TransitionFunction<Event, State> transitionFn,
) {
return events.asyncExpand(next);
return events.asyncExpand(transitionFn);
}

/// Must be implemented when a class extends [bloc].
Expand All @@ -142,39 +154,46 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
Stream<State> mapEventToState(Event event);

/// Transforms the `Stream<State>` into a new `Stream<State>`.
/// By default [transformStates] returns the incoming `Stream<State>`.
/// You can override [transformStates] for advanced usage in order to
/// By default [transformTransitions] returns the incoming `Stream<State>`.
/// You can override [transformTransitions] for advanced usage in order to
/// manipulate the frequency and specificity at which `transitions`
/// (state changes) occur.
///
/// For example, if you want to debounce outgoing [states]:
///
/// ```dart
/// @override
/// Stream<State> transformStates(Stream<State> states) {
/// return states.debounceTime(Duration(seconds: 1));
/// Stream<Transition<Event, State>> transformTransitions(
/// Stream<Transition<Event, State>> transitions,
/// ) {
/// return transitions.debounceTime(Duration(seconds: 1));
/// }
/// ```
Stream<State> transformStates(Stream<State> states) => states;

void _bindStateSubject() {
Event currentEvent;

transformStates(transformEvents(_eventSubject, (event) {
currentEvent = event;
return mapEventToState(currentEvent).handleError(_handleError);
})).forEach(
(nextState) {
if (state == nextState || _stateSubject.isClosed) return;
final transition = Transition(
currentState: state,
event: currentEvent,
nextState: nextState,
);
Stream<Transition<Event, State>> transformTransitions(
Stream<Transition<Event, State>> transitions,
) {
return transitions;
}

void _bindEventsToStates() {
transformTransitions(
transformEvents(_eventController.stream, (event) {
return mapEventToState(event).map((nextState) {
return Transition(
currentState: _state,
event: event,
nextState: nextState,
);
}).handleError(_handleError);
}),
).forEach(
(transition) {
if (state == transition.nextState || _stateController.isClosed) return;
try {
BlocSupervisor.delegate.onTransition(this, transition);
onTransition(transition);
_stateSubject.add(nextState);
_state = transition.nextState;
_stateController.add(transition.nextState);
} on dynamic catch (error) {
_handleError(error);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/bloc/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ environment:

dependencies:
meta: ^1.1.6
rxdart: ^0.23.0

dev_dependencies:
test: ">=1.3.0 <2.0.0"
test_coverage: ^0.2.0
mockito: ^4.0.0
effective_dart: ^1.2.0
rxdart: ^0.23.0
43 changes: 32 additions & 11 deletions packages/bloc/test/bloc_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,21 @@ void main() {
simpleBloc.add('event3');
});

test('isBroadcast returns true', () {
test('is a broadcast stream', () {
final expectedStates = ['', 'data'];

expect(simpleBloc.isBroadcast, isTrue);
expectLater(simpleBloc, emitsInOrder(expectedStates));
expectLater(simpleBloc, emitsInOrder(expectedStates));

simpleBloc.add('event');
});

test('multiple subscribers receive the latest state', () async {
simpleBloc.add('event');
await expectLater(simpleBloc, emitsInOrder(['', 'data']));
await expectLater(simpleBloc, emits('data'));
await expectLater(simpleBloc, emits('data'));
await expectLater(simpleBloc, emits('data'));
});
});

Expand Down Expand Up @@ -199,8 +204,14 @@ void main() {
complexBloc.add(ComplexEventC());
});

test('isBroadcast returns true', () {
test('is a broadcast stream', () {
final expectedStates = [ComplexStateA(), ComplexStateB()];

expect(complexBloc.isBroadcast, isTrue);
expectLater(complexBloc, emitsInOrder(expectedStates));
expectLater(complexBloc, emitsInOrder(expectedStates));

complexBloc.add(ComplexEventB());
});

test('multiple subscribers receive the latest state', () async {
Expand All @@ -211,7 +222,6 @@ void main() {
);
await expectLater(complexBloc, emits(ComplexStateB()));
await expectLater(complexBloc, emits(ComplexStateB()));
await expectLater(complexBloc, emits(ComplexStateB()));
});
});

Expand Down Expand Up @@ -340,16 +350,21 @@ void main() {
counterBloc.add(CounterEvent.increment);
});

test('isBroadcast returns true', () {
test('is a broadcast stream', () {
final expectedStates = [0, 1];

expect(counterBloc.isBroadcast, isTrue);
expectLater(counterBloc, emitsInOrder(expectedStates));
expectLater(counterBloc, emitsInOrder(expectedStates));

counterBloc.add(CounterEvent.increment);
});

test('multiple subscribers receive the latest state', () async {
counterBloc.add(CounterEvent.increment);
await expectLater(counterBloc, emitsInOrder([0, 1]));
await expectLater(counterBloc, emits(1));
await expectLater(counterBloc, emits(1));
await expectLater(counterBloc, emits(1));
});
});

Expand Down Expand Up @@ -468,8 +483,18 @@ void main() {
asyncBloc.add(AsyncEvent());
});

test('isBroadcast returns true', () {
test('is a broadcast stream', () {
final expectedStates = [
AsyncState(isLoading: false, hasError: false, isSuccess: false),
AsyncState(isLoading: true, hasError: false, isSuccess: false),
AsyncState(isLoading: false, hasError: false, isSuccess: true),
];

expect(asyncBloc.isBroadcast, isTrue);
expectLater(asyncBloc, emitsInOrder(expectedStates));
expectLater(asyncBloc, emitsInOrder(expectedStates));

asyncBloc.add(AsyncEvent());
});

test('multiple subscribers receive the latest state', () async {
Expand All @@ -490,10 +515,6 @@ void main() {
asyncBloc,
emits(AsyncState(isLoading: false, hasError: false, isSuccess: true)),
);
await expectLater(
asyncBloc,
emits(AsyncState(isLoading: false, hasError: false, isSuccess: true)),
);
});
});

Expand Down
12 changes: 7 additions & 5 deletions packages/bloc/test/helpers/complex/complex_bloc.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ class ComplexBloc extends Bloc<ComplexEvent, ComplexState> {
ComplexState get initialState => ComplexStateA();

@override
Stream<ComplexState> transformEvents(
Stream<Transition<ComplexEvent, ComplexState>> transformEvents(
Stream<ComplexEvent> events,
Function(ComplexEvent) next,
TransitionFunction<ComplexEvent, ComplexState> transitionFn,
) {
return events.switchMap(next);
return events.switchMap(transitionFn);
}

@override
Expand All @@ -32,7 +32,9 @@ class ComplexBloc extends Bloc<ComplexEvent, ComplexState> {
}

@override
Stream<ComplexState> transformStates(Stream<ComplexState> states) {
return states.debounceTime(Duration(milliseconds: 50));
Stream<Transition<ComplexEvent, ComplexState>> transformTransitions(
Stream<Transition<ComplexEvent, ComplexState>> transitions,
) {
return transitions.debounceTime(Duration(milliseconds: 50));
}
}

0 comments on commit c908edf

Please sign in to comment.