Skip to content

Observable from async function #2551

Open
@DarkSatyr

Description

@DarkSatyr

Short description of missing functionality:

Starting from RxSwift 6.5 there is a very helpful extension function to AsyncSequence asObservable() which helps to convert coroutine to Observable, but if I want to convert just async function to Observable I need to perform 2 steps: convert coroutine to Async[Throwing]Stream and then using asObservable() extension function convert sequence to Observable. While most of courotines are just 'singles' by nature I need to use take(1) operator after asObservable() or it won't complete at all (in case of unfolding constructor). The other option to use constructor with continuation, but I need to write a bunch of imperative-style code to produce terminative Async[Throwing]Stream. My proposal to add Observable.create factory method which accepts coroutine as an input and produce an Observable

Short code example of how you would like to use the API:

extension Observable {
 static func create(asyncFunc: @escaping () async throws -> Element) -> Observable<Element> {
   Observable.create { observer in
     let task = Task {
       do {
         let value = try await asyncFunc()
         observer.onNext(value)
         observer.onCompleted()
       } catch {
         observer.onError(error)
       }
     }
     return Disposables.create { task.cancel() }
   }
 }
}

The reason why I need this functionality:

This will simplify conversion and eliminate explicit use of Swift Streams

Code I have right now:

AsyncThrowingStream(unfolding: asyncFunc)
  .asObservable()
  .take(1)

or

 AsyncThrowingStream { continuation in
     Task {
         let result = try await provider.fetchSomeData(account: "")
         continuation.yield(result)
         continuation.finish()
     }   
    }
    .asObservable()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions