Skip to content

Commit

Permalink
Upload attachments in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
nuno-vieira committed Feb 16, 2024
1 parent 1a20263 commit 927b897
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 17 deletions.
31 changes: 14 additions & 17 deletions Sources/StreamChat/Workers/Background/AttachmentQueueUploader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,46 +59,44 @@ class AttachmentQueueUploader: Worker {
private func handleChanges(changes: [ListChange<AttachmentDTO>]) {
guard !changes.isEmpty else { return }

// Only start uploading attachment when inserted and it is present in pendingAttachmentIds
database.backgroundReadOnlyContext.perform { [weak self] in
var wasEmpty: Bool = false
self?._pendingAttachmentIDs.mutate { pendingAttachmentIDs in
wasEmpty = pendingAttachmentIDs.isEmpty
changes.pendingUploadAttachmentIDs.forEach {
let newAttachmentIds = Set(changes.attachmentIDs).subtracting(pendingAttachmentIDs)
newAttachmentIds.forEach {
pendingAttachmentIDs.insert($0)
}
}
if wasEmpty {
self?.uploadNextAttachment()
newAttachmentIds.forEach { id in
self?.uploadAttachment(with: id)
}
}
}
}

private func uploadNextAttachment() {
guard let attachmentID = pendingAttachmentIDs.first else { return }

prepareAttachmentForUpload(with: attachmentID) { [weak self] attachment in
private func uploadAttachment(with id: AttachmentId) {
prepareAttachmentForUpload(with: id) { [weak self] attachment in
guard let attachment = attachment else {
self?.removeAttachmentIDAndContinue(attachmentID)
self?.removePendingAttachment(with: id)
return
}

self?.apiClient.uploadAttachment(
attachment,
progress: {
self?.updateAttachmentIfNeeded(
attachmentId: attachmentID,
attachmentId: id,
uploadedAttachment: nil,
newState: .uploading(progress: $0),
completion: {}
)
},
completion: { result in
self?.updateAttachmentIfNeeded(
attachmentId: attachmentID,
attachmentId: id,
uploadedAttachment: result.value,
newState: result.error == nil ? .uploaded : .uploadingFailed,
completion: {
self?.removeAttachmentIDAndContinue(attachmentID)
self?.removePendingAttachment(with: id)
}
)
}
Expand Down Expand Up @@ -128,9 +126,8 @@ class AttachmentQueueUploader: Worker {
}
}

private func removeAttachmentIDAndContinue(_ id: AttachmentId) {
private func removePendingAttachment(with id: AttachmentId) {
_pendingAttachmentIDs.mutate { $0.remove(id) }
uploadNextAttachment()
}

private func updateAttachmentIfNeeded(
Expand Down Expand Up @@ -229,7 +226,7 @@ class AttachmentQueueUploader: Worker {
}

private extension Array where Element == ListChange<AttachmentDTO> {
var pendingUploadAttachmentIDs: [AttachmentId] {
var attachmentIDs: [AttachmentId] {
compactMap {
switch $0 {
case let .insert(dto, _), let .update(dto, _):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ final class APIClient_Spy: APIClient, Spy {
@Atomic var uploadFile_attachment: AnyChatMessageAttachment?
@Atomic var uploadFile_progress: ((Double) -> Void)?
@Atomic var uploadFile_completion: ((Result<UploadedAttachment, Error>) -> Void)?
@Atomic var uploadFile_callCount = 0

@Atomic var init_sessionConfiguration: URLSessionConfiguration
@Atomic var init_requestEncoder: RequestEncoder
Expand Down Expand Up @@ -143,9 +144,11 @@ final class APIClient_Spy: APIClient, Spy {
progress: ((Double) -> Void)?,
completion: @escaping (Result<UploadedAttachment, Error>) -> Void
) {

uploadFile_attachment = attachment
uploadFile_progress = progress
uploadFile_completion = completion
uploadFile_callCount += 1
uploadRequest_expectation.fulfill()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,44 @@ final class AttachmentQueueUploader_Tests: XCTestCase {
"messaging:dummy-fake-0.txt"
)
}

func test_uploadAttachmentsInParallel() throws {
let cid: ChannelId = .unique
let messageId: MessageId = .unique

// Create channel in the database.
try database.createChannel(cid: cid, withMessages: false)
// Create message in the database.
try database.createMessage(id: messageId, cid: cid, localState: .pendingSend)

let attachmentPayloads: [AnyAttachmentPayload] = [
.mockFile,
.mockImage,
.mockVideo,
.mockAudio,
.mockVoiceRecording
]

for (index, envelope) in attachmentPayloads.enumerated() {
let attachmentId = AttachmentId(cid: cid, messageId: messageId, index: index)
// Seed attachment in `.pendingUpload` state to the database.
try database.writeSynchronously { session in
try session.createNewAttachment(attachment: envelope, id: attachmentId)
}

// Load attachment from the database.
let attachment = try XCTUnwrap(database.viewContext.attachment(id: attachmentId))

// Assert attachment is in `.pendingUpload` state.
XCTAssertEqual(attachment.localState, .pendingUpload)
}

// Attachments start all uploading at the same time.
AssertAsync.willBeEqual(
apiClient.uploadFile_callCount,
attachmentPayloads.count
)
}
}

private extension URL {
Expand Down

0 comments on commit 927b897

Please sign in to comment.