-
-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
bloc.dart
179 lines (160 loc) · 6.4 KB
/
bloc.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import 'dart:async';
import 'package:bloc/bloc.dart';
import 'package:meta/meta.dart';
import 'package:rxdart/rxdart.dart';
/// {@template bloc}
/// Takes a `Stream` of `Events` as input
/// and transforms them into a `Stream` of `States` as output.
/// {@endtemplate}
abstract class Bloc<Event, State> extends Stream<State> implements Sink<Event> {
final PublishSubject<Event> _eventSubject = PublishSubject<Event>();
BehaviorSubject<State> _stateSubject;
/// Returns the current [state] of the [bloc].
State get state => _stateSubject.value;
/// Returns the [state] before any `events` have been [add]ed.
State get initialState;
/// Returns whether the `Stream<State>` is a broadcast stream.
bool get isBroadcast => _stateSubject.isBroadcast;
/// {@macro bloc}
Bloc() {
_stateSubject = BehaviorSubject<State>.seeded(initialState);
_bindStateSubject();
}
/// Adds a subscription to the `Stream<State>`.
/// Returns a [StreamSubscription] which handles events from the `Stream<State>`
/// using the provided [onData], [onError] and [onDone] handlers.
StreamSubscription<State> listen(
void onData(State value), {
Function onError,
void onDone(),
bool cancelOnError,
}) {
return _stateSubject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}
/// Called whenever an [event] is [add]ed to the [bloc].
/// A great spot to add logging/analytics at the individual [bloc] level.
void onEvent(Event event) => null;
/// Called whenever a [transition] occurs with the given [transition].
/// A [transition] occurs when a new `event` is [add]ed and [mapEventToState] executed.
/// [onTransition] is called before a [bloc]'s [state] has been updated.
/// A great spot to add logging/analytics at the individual [bloc] level.
void onTransition(Transition<Event, State> transition) => null;
/// Called whenever an [error] is thrown within [mapEventToState].
/// By default all [error]s will be ignored and [bloc] functionality will be unaffected.
/// The [stacktrace] argument may be `null` if the [state] stream received an error without a [stacktrace].
/// A great spot to handle errors at the individual [Bloc] level.
void onError(Object error, StackTrace stacktrace) => null;
/// Notifies the [bloc] of a new [event] which triggers [mapEventToState].
/// If [close] has already been called, any subsequent calls to [add] will
/// be delegated to the [onError] method which can be overriden at the [bloc]
/// as well as the [BlocDelegate] level.
@override
void add(Event event) {
try {
BlocSupervisor.delegate.onEvent(this, event);
onEvent(event);
_eventSubject.sink.add(event);
} on Object catch (error) {
_handleError(error);
}
}
/// Closes the `event` and `state` `Streams`.
/// This method should be called when a [bloc] is no longer needed.
/// Once [close] is called, `events` that are [add]ed will not be
/// processed and will result in an error being passed to [onError].
/// In addition, if [close] is called while `events` are still being processed,
/// the [bloc] will continue to process the pending `events` to completion.
@override
@mustCallSuper
Future<void> close() async {
await _eventSubject.close();
await _stateSubject.close();
}
/// Transforms the [events] stream along with a [next] function into a `Stream<State>`.
/// Events that should be processed by [mapEventToState] need to be passed to [next].
/// By default `asyncExpand` is used to ensure all [events] are processed in the order
/// in which they are received. You can override [transformEvents] for advanced usage
/// in order to manipulate the frequency and specificity with which [mapEventToState]
/// is called as well as which [events] are processed.
///
/// For example, if you only want [mapEventToState] to be called on the most recent
/// [event] you can use `switchMap` instead of `asyncExpand`.
///
/// ```dart
/// @override
/// Stream<State> transformEvents(events, next) {
/// return (events as Observable<Event>).switchMap(next);
/// }
/// ```
///
/// Alternatively, if you only want [mapEventToState] to be called for distinct [events]:
///
/// ```dart
/// @override
/// Stream<State> transformEvents(events, next) {
/// return super.transformEvents(
/// (events as Observable<Event>).distinct(),
/// next,
/// );
/// }
/// ```
Stream<State> transformEvents(
Stream<Event> events,
Stream<State> next(Event event),
) {
return events.asyncExpand(next);
}
/// Must be implemented when a class extends [bloc].
/// Takes the incoming [event] as the argument.
/// [mapEventToState] is called whenever an [event] is [add]ed.
/// [mapEventToState] must convert that [event] into a new [state]
/// and return the new [state] in the form of a `Stream<State>`.
Stream<State> mapEventToState(Event event);
/// Transforms the `Stream<State>` into a new `Stream<State>`.
/// By default [transformStates] returns the incoming `Stream<State>`.
/// You can override [transformStates] for advanced usage
/// in order to manipulate the frequency and specificity at which `transitions` (state changes)
/// occur.
///
/// For example, if you want to debounce outgoing [states]:
///
/// ```dart
/// @override
/// Stream<State> transformStates(Stream<State> states) {
/// return (states as Observable<State>).debounceTime(Duration(seconds: 1));
/// }
/// ```
Stream<State> transformStates(Stream<State> states) => states;
void _bindStateSubject() {
Event currentEvent;
transformStates(transformEvents(_eventSubject, (Event event) {
currentEvent = event;
return mapEventToState(currentEvent).handleError(_handleError);
})).forEach(
(State nextState) {
if (state == nextState || _stateSubject.isClosed) return;
final transition = Transition(
currentState: state,
event: currentEvent,
nextState: nextState,
);
try {
BlocSupervisor.delegate.onTransition(this, transition);
onTransition(transition);
_stateSubject.add(nextState);
} on Object catch (error) {
_handleError(error);
}
},
);
}
void _handleError(Object error, [StackTrace stacktrace]) {
BlocSupervisor.delegate.onError(this, error, stacktrace);
onError(error, stacktrace);
}
}