From 1812c94fd2d3cd4a8ffad6c2983601e57d0e6c8a Mon Sep 17 00:00:00 2001 From: Sahil Kumar Date: Wed, 3 Sep 2025 14:54:29 +0200 Subject: [PATCH] feat(llc): Add eagerError to uploadBatch This commit introduces an `eagerError` parameter to the `uploadBatch` method in `AttachmentUploader`. When `eagerError` is true, the upload stream will throw an exception and close immediately upon the first failed upload. If `eagerError` is false (the default behavior), failed uploads will be emitted as `Result.failure` and the upload process will continue for the remaining attachments. Additionally, the `attachments` parameter in `uploadBatch` has been changed from `List` to `Iterable` for better flexibility. A similar change was made to the `merge` extension method. --- .../uploader/attachment_uploader.dart | 26 ++++++++++++++++--- .../lib/src/utils/list_extensions.dart | 2 +- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/packages/stream_core/lib/src/attachment/uploader/attachment_uploader.dart b/packages/stream_core/lib/src/attachment/uploader/attachment_uploader.dart index 4c5fa27..73eeaf9 100644 --- a/packages/stream_core/lib/src/attachment/uploader/attachment_uploader.dart +++ b/packages/stream_core/lib/src/attachment/uploader/attachment_uploader.dart @@ -129,13 +129,22 @@ extension StreamAttachmentUploaderBatch on StreamAttachmentUploader { /// [Result] objects as each upload completes. Progress updates are provided /// through the optional [onProgress] callback. /// + /// When [eagerError] is true, the stream throws an exception and closes + /// immediately on the first upload failure. When false (default), failed + /// uploads are emitted as [Result.failure] and processing continues. + /// /// Returns a [Stream] of [Result] objects in completion order, not input order. Stream> uploadBatch( - List attachments, { + Iterable attachments, { OnBatchUploadProgress? onProgress, int maxConcurrent = 5, - }) { - return Stream.fromIterable(attachments).flatMap( + bool eagerError = false, + }) async* { + // Early return for empty list + if (attachments.isEmpty) return; + + // Create a stream that uploads attachments with controlled concurrency + final uploadStream = Stream.fromIterable(attachments).flatMap( maxConcurrent: maxConcurrent, (attachment) => Stream.fromFuture( upload( @@ -146,5 +155,16 @@ extension StreamAttachmentUploaderBatch on StreamAttachmentUploader { ), ), ); + + // Yield results as they complete + await for (final result in uploadStream) { + // If eagerError is enabled, throw on first failure + if (result.exceptionOrNull() case final error? when eagerError) { + final stackTrace = result.stackTraceOrNull(); + Error.throwWithStackTrace(error, stackTrace ?? StackTrace.current); + } + + yield result; + } } } diff --git a/packages/stream_core/lib/src/utils/list_extensions.dart b/packages/stream_core/lib/src/utils/list_extensions.dart index 8f8e5e4..e4ff308 100644 --- a/packages/stream_core/lib/src/utils/list_extensions.dart +++ b/packages/stream_core/lib/src/utils/list_extensions.dart @@ -222,7 +222,7 @@ extension SortedListExtensions on List { /// // Result: [Score(userId: '1', points: 150), Score(userId: '3', points: 120), Score(userId: '2', points: 80)] /// ``` List merge( - List other, { + Iterable other, { required K Function(T item) key, T Function(T original, T updated)? update, Comparator? compare,