Skip to content

Commit

Permalink
Apply Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
felangel committed Feb 9, 2020
1 parent 92206d1 commit bff6ffc
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 14 deletions.
33 changes: 19 additions & 14 deletions packages/bloc/lib/src/bloc.dart
Expand Up @@ -18,6 +18,7 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
final _stateController = StreamController<State>.broadcast();

State _state;
StreamSubscription<Transition<Event, State>> _transitionSubscription;

/// Returns the current [state] of the [bloc].
State get state => _state;
Expand Down Expand Up @@ -46,15 +47,15 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
void Function() onDone,
bool cancelOnError,
}) {
return _createStateStream().listen(
return _stateStream.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}

Stream<State> _createStateStream() async* {
Stream<State> get _stateStream async* {
yield state;
yield* _stateController.stream;
}
Expand Down Expand Up @@ -105,6 +106,7 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
Future<void> close() async {
await _eventController.close();
await _stateController.close();
await _transitionSubscription?.cancel();
}

/// Transforms the [events] stream along with a [transitionFn] function into
Expand Down Expand Up @@ -177,7 +179,7 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
}

void _bindEventsToStates() {
transformTransitions(transformEvents(
_transitionSubscription = transformTransitions(transformEvents(
_eventController.stream,
(event) {
return mapEventToState(event).map((nextState) {
Expand All @@ -188,18 +190,21 @@ abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
);
}).skipWhile((transition) {
return state == transition.nextState || _stateController.isClosed;
}).handleError(_handleError);
});
},
)).forEach((transition) {
try {
BlocSupervisor.delegate.onTransition(this, transition);
onTransition(transition);
_state = transition.nextState;
_stateController.add(transition.nextState);
} on dynamic catch (error) {
_handleError(error);
}
});
)).listen(
(transition) {
try {
BlocSupervisor.delegate.onTransition(this, transition);
onTransition(transition);
_state = transition.nextState;
_stateController.add(transition.nextState);
} on dynamic catch (error) {
_handleError(error);
}
},
onError: _handleError,
);
}

void _handleError(Object error, [StackTrace stacktrace]) {
Expand Down
27 changes: 27 additions & 0 deletions packages/bloc/test/bloc_test.dart
Expand Up @@ -518,6 +518,33 @@ void main() {
});
});

group('flatMap', () {
test('maintains correct transitions', () {
final expectedTransitions = <Transition<CounterEvent, int>>[
Transition(
currentState: 0,
event: CounterEvent.decrement,
nextState: -1,
),
Transition(
currentState: -1,
event: CounterEvent.increment,
nextState: 0,
),
];
final expectedStates = [0, -1, 0];
final transitions = <Transition<CounterEvent, int>>[];

final bloc = FlatMapBloc(onTransitionCallback: transitions.add);

expectLater(bloc, emitsInOrder(expectedStates)).then((value) {
expect(transitions, expectedTransitions);
});
bloc.add(CounterEvent.decrement);
bloc.add(CounterEvent.increment);
});
});

group('Exception', () {
test('does not break stream', () {
final expected = [0, -1];
Expand Down
1 change: 1 addition & 0 deletions packages/bloc/test/helpers/counter/counter.dart
@@ -1,6 +1,7 @@
export './counter_bloc.dart';
export './counter_error_bloc.dart';
export './counter_exception_bloc.dart';
export './flat_map_bloc.dart';
export './on_error_bloc.dart';
export './on_exception_bloc.dart';
export './on_transition_error_bloc.dart';
41 changes: 41 additions & 0 deletions packages/bloc/test/helpers/counter/flat_map_bloc.dart
@@ -0,0 +1,41 @@
import 'package:bloc/bloc.dart';
import 'package:rxdart/rxdart.dart';

import '../helpers.dart';

class FlatMapBloc extends Bloc<CounterEvent, int> {
final void Function(Transition<CounterEvent, int>) onTransitionCallback;

FlatMapBloc({this.onTransitionCallback});

@override
int get initialState => 0;

@override
void onTransition(Transition<CounterEvent, int> transition) {
super.onTransition(transition);
onTransitionCallback?.call(transition);
}

@override
Stream<Transition<CounterEvent, int>> transformEvents(
Stream<CounterEvent> events,
TransitionFunction<CounterEvent, int> transitionFn,
) {
return events.flatMap(transitionFn);
}

@override
Stream<int> mapEventToState(
CounterEvent event,
) async* {
switch (event) {
case CounterEvent.decrement:
yield state - 1;
break;
case CounterEvent.increment:
yield state + 1;
break;
}
}
}

0 comments on commit bff6ffc

Please sign in to comment.