Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel attachment uploading #3034

Merged
merged 4 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

# Upcoming

## StreamChat
### ✅ Added
- Add parallel attachment uploading [#3034](https://github.com/GetStream/stream-chat-swift/pull/3034)

## StreamChatUI
### 🐞 Fixed
- Fix composer link preview overridden by previous enrichment [#3025](https://github.com/GetStream/stream-chat-swift/pull/3025)
Expand Down
2 changes: 1 addition & 1 deletion DemoApp/StreamChat/Components/DemoChatChannelVC.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ final class DemoChatChannelVC: ChatChannelVC, UIGestureRecognizerDelegate {
}

@objc private func goBack() {
dismiss(animated: true)
navigationController?.popViewController(animated: true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,46 +59,44 @@ class AttachmentQueueUploader: Worker {
private func handleChanges(changes: [ListChange<AttachmentDTO>]) {
guard !changes.isEmpty else { return }

// Only start uploading attachment when inserted and it is present in pendingAttachmentIds
database.backgroundReadOnlyContext.perform { [weak self] in
var wasEmpty: Bool = false
self?._pendingAttachmentIDs.mutate { pendingAttachmentIDs in
wasEmpty = pendingAttachmentIDs.isEmpty
changes.pendingUploadAttachmentIDs.forEach {
let newAttachmentIds = Set(changes.attachmentIDs).subtracting(pendingAttachmentIDs)
newAttachmentIds.forEach {
pendingAttachmentIDs.insert($0)
}
Comment on lines +66 to 68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative could be pendingAttachmentIDs.formUnion(newAttachmentIds). But it does not matter much.

}
if wasEmpty {
self?.uploadNextAttachment()
newAttachmentIds.forEach { id in
self?.uploadAttachment(with: id)
}
}
}
}

private func uploadNextAttachment() {
guard let attachmentID = pendingAttachmentIDs.first else { return }

prepareAttachmentForUpload(with: attachmentID) { [weak self] attachment in
private func uploadAttachment(with id: AttachmentId) {
prepareAttachmentForUpload(with: id) { [weak self] attachment in
guard let attachment = attachment else {
self?.removeAttachmentIDAndContinue(attachmentID)
self?.removePendingAttachment(with: id)
return
}

self?.apiClient.uploadAttachment(
attachment,
progress: {
self?.updateAttachmentIfNeeded(
attachmentId: attachmentID,
attachmentId: id,
uploadedAttachment: nil,
newState: .uploading(progress: $0),
completion: {}
)
},
completion: { result in
self?.updateAttachmentIfNeeded(
attachmentId: attachmentID,
attachmentId: id,
uploadedAttachment: result.value,
newState: result.error == nil ? .uploaded : .uploadingFailed,
completion: {
self?.removeAttachmentIDAndContinue(attachmentID)
self?.removePendingAttachment(with: id)
}
)
}
Expand Down Expand Up @@ -128,9 +126,8 @@ class AttachmentQueueUploader: Worker {
}
}

private func removeAttachmentIDAndContinue(_ id: AttachmentId) {
private func removePendingAttachment(with id: AttachmentId) {
_pendingAttachmentIDs.mutate { $0.remove(id) }
uploadNextAttachment()
}

private func updateAttachmentIfNeeded(
Expand Down Expand Up @@ -229,7 +226,7 @@ class AttachmentQueueUploader: Worker {
}

private extension Array where Element == ListChange<AttachmentDTO> {
var pendingUploadAttachmentIDs: [AttachmentId] {
var attachmentIDs: [AttachmentId] {
compactMap {
switch $0 {
case let .insert(dto, _), let .update(dto, _):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ final class APIClient_Spy: APIClient, Spy {
@Atomic var uploadFile_attachment: AnyChatMessageAttachment?
@Atomic var uploadFile_progress: ((Double) -> Void)?
@Atomic var uploadFile_completion: ((Result<UploadedAttachment, Error>) -> Void)?
@Atomic var uploadFile_callCount = 0

@Atomic var init_sessionConfiguration: URLSessionConfiguration
@Atomic var init_requestEncoder: RequestEncoder
Expand Down Expand Up @@ -143,9 +144,11 @@ final class APIClient_Spy: APIClient, Spy {
progress: ((Double) -> Void)?,
completion: @escaping (Result<UploadedAttachment, Error>) -> Void
) {

uploadFile_attachment = attachment
uploadFile_progress = progress
uploadFile_completion = completion
uploadFile_callCount += 1
uploadRequest_expectation.fulfill()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@
apiClient.uploadFile_completion?(.success(response))

AssertAsync {
Assert.willBeEqual(message?.localMessageState, .sendingFailed)

Check failure on line 312 in Tests/StreamChatTests/Workers/Background/AttachmentQueueUploader_Tests.swift

View workflow job for this annotation

GitHub Actions / Test LLC (Debug)

test_uploader_whenHasFailedAttachments_doNotMarkMessagePendingSend, failed - Found difference for
}
}

Expand Down Expand Up @@ -601,6 +601,44 @@
"messaging:dummy-fake-0.txt"
)
}

func test_uploadAttachmentsInParallel() throws {
let cid: ChannelId = .unique
let messageId: MessageId = .unique

// Create channel in the database.
try database.createChannel(cid: cid, withMessages: false)
// Create message in the database.
try database.createMessage(id: messageId, cid: cid, localState: .pendingSend)

let attachmentPayloads: [AnyAttachmentPayload] = [
.mockFile,
.mockImage,
.mockVideo,
.mockAudio,
.mockVoiceRecording
]

for (index, envelope) in attachmentPayloads.enumerated() {
let attachmentId = AttachmentId(cid: cid, messageId: messageId, index: index)
// Seed attachment in `.pendingUpload` state to the database.
try database.writeSynchronously { session in
try session.createNewAttachment(attachment: envelope, id: attachmentId)
}

// Load attachment from the database.
let attachment = try XCTUnwrap(database.viewContext.attachment(id: attachmentId))

// Assert attachment is in `.pendingUpload` state.
XCTAssertEqual(attachment.localState, .pendingUpload)
}

// Attachments start all uploading at the same time.
AssertAsync.willBeEqual(
apiClient.uploadFile_callCount,
attachmentPayloads.count
)
}
}

private extension URL {
Expand Down