Skip to content

Commit

Permalink
fix: BlocOverrides values should default to Bloc.observer/Bloc.transf…
Browse files Browse the repository at this point in the history
…ormer
  • Loading branch information
felangel committed Aug 5, 2022
1 parent fd87d52 commit 3aa9425
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 63 deletions.
67 changes: 58 additions & 9 deletions packages/bloc/lib/src/bloc.dart
Expand Up @@ -36,14 +36,6 @@ typedef EventTransformer<Event> = Stream<Event> Function(
EventMapper<Event> mapper,
);

class _Handler {
const _Handler({required this.isType, required this.type});
final bool Function(dynamic value) isType;
final Type type;
}

class _DefaultBlocObserver extends BlocObserver {}

/// {@template bloc}
/// Takes a `Stream` of `Events` as input
/// and transforms them into a `Stream` of `States` as output.
Expand All @@ -67,7 +59,11 @@ abstract class Bloc<Event, State> extends BlocBase<State>
/// * [package:bloc_concurrency](https://pub.dev/packages/bloc_concurrency) for an
/// opinionated set of event transformers.
///
static EventTransformer<dynamic> transformer = _defaultEventTransformer;
static EventTransformer<dynamic> transformer = (events, mapper) {
return events
.map(mapper)
.transform<dynamic>(const _FlatMapStreamTransformer<dynamic>());
};

final _eventController = StreamController<Event>.broadcast();
final _subscriptions = <StreamSubscription<dynamic>>[];
Expand Down Expand Up @@ -290,3 +286,56 @@ abstract class Bloc<Event, State> extends BlocBase<State>
return super.close();
}
}

class _Handler {
const _Handler({required this.isType, required this.type});
final bool Function(dynamic value) isType;
final Type type;
}

class _DefaultBlocObserver extends BlocObserver {}

class _FlatMapStreamTransformer<T> extends StreamTransformerBase<Stream<T>, T> {
const _FlatMapStreamTransformer();

@override
Stream<T> bind(Stream<Stream<T>> stream) {
final controller = StreamController<T>.broadcast(sync: true);

controller.onListen = () {
final subscriptions = <StreamSubscription<dynamic>>[];

final outerSubscription = stream.listen(
(inner) {
final subscription = inner.listen(
controller.add,
onError: controller.addError,
);

subscription.onDone(() {
subscriptions.remove(subscription);
if (subscriptions.isEmpty) controller.close();
});

subscriptions.add(subscription);
},
onError: controller.addError,
);

outerSubscription.onDone(() {
subscriptions.remove(outerSubscription);
if (subscriptions.isEmpty) controller.close();
});

subscriptions.add(outerSubscription);

controller.onCancel = () {
if (subscriptions.isEmpty) return null;
final cancels = [for (final s in subscriptions) s.cancel()];
return Future.wait(cancels).then((_) {});
};
};

return controller.stream;
}
}
56 changes: 2 additions & 54 deletions packages/bloc/lib/src/bloc_overrides.dart
Expand Up @@ -58,7 +58,7 @@ abstract class BlocOverrides {
@Deprecated(
'This will be removed in bloc v9.0.0. Please use Bloc.observer instead.',
)
BlocObserver get blocObserver => _defaultBlocObserver;
BlocObserver get blocObserver => Bloc.observer;

/// The [EventTransformer] that will be used within the current [Zone].
///
Expand All @@ -75,7 +75,7 @@ abstract class BlocOverrides {
@Deprecated(
'This will be removed in bloc v9.0.0. Please use Bloc.transformer instead.',
)
EventTransformer get eventTransformer => _defaultEventTransformer;
EventTransformer get eventTransformer => Bloc.transformer;
}

class _BlocOverridesScope extends BlocOverrides {
Expand Down Expand Up @@ -107,55 +107,3 @@ class _BlocOverridesScope extends BlocOverrides {
return super.eventTransformer;
}
}

late final _defaultBlocObserver = _DefaultBlocObserver();
late final _defaultEventTransformer = (Stream events, EventMapper mapper) {
return events
.map(mapper)
.transform<dynamic>(const _FlatMapStreamTransformer<dynamic>());
};

class _FlatMapStreamTransformer<T> extends StreamTransformerBase<Stream<T>, T> {
const _FlatMapStreamTransformer();

@override
Stream<T> bind(Stream<Stream<T>> stream) {
final controller = StreamController<T>.broadcast(sync: true);

controller.onListen = () {
final subscriptions = <StreamSubscription<dynamic>>[];

final outerSubscription = stream.listen(
(inner) {
final subscription = inner.listen(
controller.add,
onError: controller.addError,
);

subscription.onDone(() {
subscriptions.remove(subscription);
if (subscriptions.isEmpty) controller.close();
});

subscriptions.add(subscription);
},
onError: controller.addError,
);

outerSubscription.onDone(() {
subscriptions.remove(outerSubscription);
if (subscriptions.isEmpty) controller.close();
});

subscriptions.add(outerSubscription);

controller.onCancel = () {
if (subscriptions.isEmpty) return null;
final cancels = [for (final s in subscriptions) s.cancel()];
return Future.wait(cancels).then((_) {});
};
};

return controller.stream;
}
}

0 comments on commit 3aa9425

Please sign in to comment.