Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions native/swift/Sources/wordpress-api/SafeRequestExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ public protocol SafeRequestExecutor: RequestExecutor, Sendable {
func upload(request: WpMultipartFormRequest) async -> Result<WpNetworkResponse, RequestExecutionError>

#if PROGRESS_REPORTING_ENABLED
/// Returns a publisher that emits zero or one `Progress` instance representing the overall progress of the task
/// for the given `requestId`.
func progress(forRequestWithId requestId: String) -> AnyPublisher<Progress, Never>
func progresses(for context: RequestContext) -> AnyPublisher<Progress, Never>
#endif
}

Expand Down Expand Up @@ -124,10 +122,14 @@ public final class WpRequestExecutor: SafeRequestExecutor {
}

#if PROGRESS_REPORTING_ENABLED
public func progress(forRequestWithId requestId: String) -> AnyPublisher<Progress, Never> {
public func progresses(for context: RequestContext) -> AnyPublisher<Progress, Never> {
NotificationCenter.default.publisher(for: RequestExecutorDelegate.didCreateTaskNotification)
.compactMap { $0.object as? URLSessionTask }
.first { $0.originalRequest?.requestId == requestId }
.filter {
guard let requestId = $0.originalRequest?.requestId else { return false }

return context.requestIds().contains(requestId)
}
.map { $0.progress }
.eraseToAnyPublisher()
}
Expand Down
67 changes: 32 additions & 35 deletions native/swift/Sources/wordpress-api/WordPressAPI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,64 +170,61 @@ public actor WordPressAPI {
}

#if PROGRESS_REPORTING_ENABLED
public func uploadMedia(
params: MediaCreateParams,
fulfilling progress: Progress
) async throws -> MediaRequestCreateResponse {
/// Track the progress of the given HTTP API calls in the `apiCall` closure.
///
/// Note: pass the `RequestContext` parameter in `apiCall` to one and only one HTTP API call.
public func fulfill<R: Sendable>(
progress: Progress,
withApiCall apiCall: sending @escaping (RequestContext) async throws -> R
) async throws -> R {
precondition(progress.completedUnitCount == 0 && progress.totalUnitCount > 0)
precondition(progress.cancellationHandler == nil)

let context = RequestContext()

let uploadTask = Task {
try await media.createCancellation(params: params, context: context)
try await withTaskCancellationHandler {
try await apiCall(context)
} onCancel: {
requestExecutor.cancel(context: context)
}
}

let progressObserver = Task {
// A request id will be put into the `RequestContext` during the execution of the `media.create` above.
// This loop waits for the request id becomes available
let requestId: String
while true {
try await Task.sleep(nanoseconds: 100_000)
try Task.checkCancellation()

guard let id = context.requestIds().first else {
continue
}

requestId = id
break
for await task in requestExecutor.progresses(for: context).values {
// For one single request call, the Rust layer should send HTTP requests sequentially.
// For example, the retry mechanism in the Rust layer only send the retry call when the initial
// call fails.
//
// Since we can't know how many HTTP requests will be sent, the best we can do is make the `progress`
// starts from zero to complete for each HTTP request.
progress.completedUnitCount = 0
progress.addChild(task, withPendingUnitCount: progress.totalUnitCount)
}

// Get the progress of the `URLSessionTask` of the given request id.
guard let task = await requestExecutor
.progress(forRequestWithId: requestId)
.values
.first(where: { _ in true }) else { return }

try Task.checkCancellation()

progress.addChild(task, withPendingUnitCount: progress.totalUnitCount - progress.completedUnitCount)
}

progress.cancellationHandler = {
uploadTask.cancel()
progressObserver.cancel()
}

defer { progressObserver.cancel() }

return try await withTaskCancellationHandler {
try await uploadTask.value
} onCancel: {
// Please note: the async functions exported by uniffi-rs _do not_ support cancellation.
// That means cancelling an API call like `Task { try await api.users.retrieveMe() }.cancel()`
// does not cancel the underlying HTTP request sent by URLSession.
//
// The `progress.cancel()` in this particular function can cancel the HTTP request, because the
// `progress` instance is the parent progress of `URLSessionTask.progress`, and cancelling a parent
// progress automatically cancels their child progress, which is the `URLSessionTask` in this case.
progress.cancel()
}
}

public func uploadMedia(
params: MediaCreateParams,
fulfilling progress: Progress
) async throws -> MediaRequestCreateResponse {
try await fulfill(progress: progress) { [media] in
try await media.createCancellation(params: params, context: $0)
}
}
#endif

enum ParseError: Error {
Expand Down
2 changes: 1 addition & 1 deletion native/swift/Tests/wordpress-api/Support/HTTPStubs.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ final class HTTPStubs: SafeRequestExecutor {
}

#if PROGRESS_REPORTING_ENABLED
func progress(forRequestWithId requestId: String) -> AnyPublisher<Progress, Never> {
func progresses(for context: RequestContext) -> AnyPublisher<Progress, Never> {
Record(output: [], completion: .finished).eraseToAnyPublisher()
}
#endif
Expand Down