Skip to content

Commit

Permalink
fix: Propagate Stream Errors through the same Future (#1732)
Browse files Browse the repository at this point in the history
# Description

Wait simultaneously for async calls (Futures and Streams) to ensure all
errors are propagated through one common future. Previously a stream
could throw an error before it was even listened to, as the process
still awaited an async call (here `setSource`).

## Checklist

<!-- Before you create this PR confirm that it meets all requirements
listed below by checking the
relevant checkboxes (`[x]`). This will ensure a smooth and quick review
process. -->

- [x] The title of my PR starts with a [Conventional Commit] prefix
(`fix:`, `feat:`, `refactor:`,
      `docs:`, `chore:`, `test:`, `ci:` etc).
- [x] I have read the [Contributor Guide] and followed the process
outlined for submitting PRs.
- [x] I have updated/added tests for ALL new/updated/fixed
functionality.
- [x] I have updated/added relevant documentation and added dartdoc
comments with `///`, where necessary.
- [ ] I have updated/added relevant examples in [example].

The tests are adapted, but not explicitly written to avoid that error.
But the error would occur since Flutter 3.16.x, so they are tested
against in #1715 without further ado.

## Breaking Change

- [ ] Yes, this is a breaking change.
- [x] No, this is *not* a breaking change.

## Related Issues

#1715

<!-- Links -->
[issue database]: https://github.com/bluefireteam/audioplayers/issues
[Contributor Guide]:
https://github.com/bluefireteam/audioplayers/blob/main/contributing.md#feature-requests--prs
[Conventional Commit]: https://conventionalcommits.org
[example]:
https://github.com/bluefireteam/audioplayers/tree/main/packages/audioplayers/example
  • Loading branch information
Gustl22 authored Jan 19, 2024
1 parent 3a5c6dc commit 00d041d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 62 deletions.
60 changes: 33 additions & 27 deletions packages/audioplayers/example/integration_test/platform_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ void main() async {
IntegrationTestWidgetsFlutterBinding.ensureInitialized();
final features = PlatformFeatures.instance();
final isLinux = !kIsWeb && defaultTargetPlatform == TargetPlatform.linux;
final isAndroid = !kIsWeb && defaultTargetPlatform == TargetPlatform.android;
final audioTestDataList = await getAudioTestDataList();

group('Platform method channel', () {
Expand Down Expand Up @@ -67,6 +68,12 @@ void main() async {
}
await tester.pumpLinux();
},
// FIXME(Gustl22): for some reason, the error propagated back from the
// Android MediaPlayer is only triggered, when the timeout has reached,
// although the error is emitted immediately.
// Further, the other future is not fulfilled and then mysteriously
// failing in later tests.
skip: isAndroid,
);

testWidgets('#create and #dispose', (tester) async {
Expand Down Expand Up @@ -488,33 +495,32 @@ extension on WidgetTester {
required LibSourceTestData testData,
}) async {
final eventStream = platform.getEventStream(playerId);
final preparedCompleter = Completer<void>();
final onPreparedSub = eventStream
.where((event) => event.eventType == AudioEventType.prepared)
.map((event) => event.isPrepared!)
.listen(
(isPrepared) {
if (isPrepared) {
preparedCompleter.complete();
}
},
onError: (Object e, [StackTrace? st]) {
if (!preparedCompleter.isCompleted) {
preparedCompleter.completeError(e, st);
}
},
);
await pumpLinux();
final source = testData.source;
if (source is UrlSource) {
await platform.setSourceUrl(playerId, source.url);
} else if (source is AssetSource) {
final cachePath = await AudioCache.instance.loadPath(source.path);
await platform.setSourceUrl(playerId, cachePath, isLocal: true);
} else if (source is BytesSource) {
await platform.setSourceBytes(playerId, source.bytes);
final futurePrepared = eventStream
.firstWhere(
(event) =>
event.eventType == AudioEventType.prepared &&
(event.isPrepared ?? false),
)
.timeout(const Duration(seconds: 30));

Future<void> setSource(Source source) async {
if (source is UrlSource) {
return platform.setSourceUrl(playerId, source.url);
} else if (source is AssetSource) {
final cachePath = await AudioCache.instance.loadPath(source.path);
return platform.setSourceUrl(playerId, cachePath, isLocal: true);
} else if (source is BytesSource) {
return platform.setSourceBytes(playerId, source.bytes);
} else {
throw 'Unknown source type: ${source.runtimeType}';
}
}
await preparedCompleter.future.timeout(const Duration(seconds: 30));
await onPreparedSub.cancel();

// Need to await the setting the source to propagate immediate errors.
final futureSetSource = setSource(testData.source);

// Wait simultaneously to ensure all errors are propagated through the same
// future.
await Future.wait([futureSetSource, futurePrepared]);
}
}
57 changes: 22 additions & 35 deletions packages/audioplayers/lib/src/audioplayer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class AudioPlayer {
Future<void> _create() async {
try {
await _platform.create(playerId);
// Assign the event stream, now that the platform registered this player.
_eventStreamSubscription = _platform.getEventStream(playerId).listen(
_eventStreamController.add,
onError: _eventStreamController.addError,
Expand Down Expand Up @@ -286,22 +287,13 @@ class AudioPlayer {
Future<void> seek(Duration position) async {
await creatingCompleter.future;

final seekCompleter = Completer<void>();
late StreamSubscription<void> onSeekCompleteSubscription;
onSeekCompleteSubscription = onSeekComplete.listen(
(_) async {
seekCompleter.complete();
await onSeekCompleteSubscription.cancel();
},
onError: (Object e, [StackTrace? stackTrace]) async {
if (!seekCompleter.isCompleted) {
seekCompleter.completeError(e, stackTrace);
await onSeekCompleteSubscription.cancel();
}
},
);
await _platform.seek(playerId, position);
await seekCompleter.future.timeout(const Duration(seconds: 30));
final futureSeekComplete =
onSeekComplete.first.timeout(const Duration(seconds: 30));
final futureSeek = _platform.seek(playerId, position);
// Wait simultaneously to ensure all errors are propagated through the same
// future.
await Future.wait([futureSeek, futureSeekComplete]);

await _positionUpdater?.update();
}

Expand Down Expand Up @@ -354,26 +346,21 @@ class AudioPlayer {
await source.setOnPlayer(this);
}

Future<void> _completePrepared(Future<void> Function() fun) async {
/// This method helps waiting for a source to be set until it's prepared.
/// This can happen immediately after [setSource] has finished or it needs to
/// wait for the [AudioEvent] [AudioEventType.prepared] to arrive.
Future<void> _completePrepared(Future<void> Function() setSource) async {
await creatingCompleter.future;
final preparedCompleter = Completer<void>();
late StreamSubscription<bool> onPreparedSubscription;
onPreparedSubscription = _onPrepared.listen(
(isPrepared) async {
if (isPrepared) {
preparedCompleter.complete();
await onPreparedSubscription.cancel();
}
},
onError: (Object e, [StackTrace? stackTrace]) async {
if (!preparedCompleter.isCompleted) {
preparedCompleter.completeError(e, stackTrace);
await onPreparedSubscription.cancel();
}
},
);
await fun();
await preparedCompleter.future.timeout(const Duration(seconds: 30));

final futurePrepared = _onPrepared
.firstWhere((isPrepared) => isPrepared)
.timeout(const Duration(seconds: 30));
// Need to await the setting the source to propagate immediate errors.
final futureSetSource = setSource();

// Wait simultaneously to ensure all errors are propagated through the same
// future.
await Future.wait([futureSetSource, futurePrepared]);

// Share position once after finished loading
await _positionUpdater?.update();
Expand Down

0 comments on commit 00d041d

Please sign in to comment.