Swift Reactive Programming.
Swift Other
Clone or download
Latest commit 82320b6 Sep 23, 2015

README.md

ReactKit Circle CI

Swift Reactive Programming.

How to install

See Wiki page.

Example

For UI Demo, please see ReactKit/ReactKitCatalog.

Key-Value Observing

// create stream via KVO
self.obj1Stream = KVO.stream(obj1, "value")

// bind stream via KVC (`<~` as binding operator)
(obj2, "value") <~ self.obj1Stream

XCTAssertEqual(obj1.value, "initial")
XCTAssertEqual(obj2.value, "initial")

obj1.value = "REACT"

XCTAssertEqual(obj1.value, "REACT")
XCTAssertEqual(obj2.value, "REACT")

To remove stream-bindings, just release stream itself (or call stream.cancel()).

self.obj1Stream = nil   // release stream & its bindings

obj1.value = "Done"

XCTAssertEqual(obj1.value, "Done")
XCTAssertEqual(obj2.value, "REACT")

If you want to observe changes in Swift.Array or NSMutableArray, use DynamicArray feature in Pull Request #23.

NSNotification

self.stream = Notification.stream("MyNotification", obj1)
    |> map { notification -> NSString? in
        return "hello" // convert NSNotification? to NSString?
    }

(obj2, "value") <~ self.stream

Normally, NSNotification itself is useless value for binding with other objects, so use Stream Operations e.g. map(f: T -> U) to convert it.

To understand more about |> pipelining operator, see Stream Pipelining.

Target-Action

// UIButton
self.buttonStream = self.button.buttonStream("OK")

// UITextField
self.textFieldStream = self.textField.textChangedStream()

^{ println($0) } <~ self.buttonStream     // prints "OK" on tap

// NOTE: ^{ ... } = closure-first operator, same as `stream ~> { ... }`
^{ println($0) } <~ self.textFieldStream  // prints textField.text on change

Complex example

The example below is taken from

where it describes 4 UITextFields which enables/disables UIButton at certain condition (demo available in ReactKit/ReactKitCatalog):

let usernameTextStream = self.usernameTextField.textChangedStream()
let emailTextStream = self.emailTextField.textChangedStream()
let passwordTextStream = self.passwordTextField.textChangedStream()
let password2TextStream = self.password2TextField.textChangedStream()

let allTextStreams = [usernameTextStream, emailTextStream, passwordTextStream, password2TextStream]

let combinedTextStream = allTextStreams |> merge2All

// create button-enabling stream via any textField change
self.buttonEnablingStream = combinedTextStream
    |> map { (values, changedValue) -> NSNumber? in

        let username: NSString? = values[0] ?? nil
        let email: NSString? = values[1] ?? nil
        let password: NSString? = values[2] ?? nil
        let password2: NSString? = values[3] ?? nil

        // validation
        let buttonEnabled = username?.length > 0 && email?.length > 0 && password?.length >= MIN_PASSWORD_LENGTH && password == password2

        // NOTE: use NSNumber because KVO does not understand Bool
        return NSNumber(bool: buttonEnabled)
    }

// REACT: enable/disable okButton
(self.okButton, "enabled") <~ self.buttonEnablingStream!

For more examples, please see XCTestCases.

How it works

ReactKit is based on powerful SwiftTask (JavaScript Promise-like) library, allowing to start & deliver multiple events (KVO, NSNotification, Target-Action, etc) continuously over time using its resume & progress feature (react() or <~ operator in ReactKit).

Unlike Reactive Extensions (Rx) libraries which has a basic concept of "hot" and "cold" observables, ReactKit gracefully integrated them into one hot + paused (lazy) stream Stream<T> class. Lazy streams will be auto-resumed via react() & <~ operator.

Here are some differences in architecture:

Reactive Extensions (Rx) ReactKit
Basic Classes Hot Observable (broadcasting)
Cold Observable (laziness)
Stream<T>
Generating Cold Observable (cloneability) Void -> Stream<T>
(= Stream<T>.Producer)
Subscribing observable.subscribe(onNext, onError, onComplete) stream.react {...}.then {...}
(method-chainable)
Pausing pausableObservable.pause() stream.pause()
Disposing disposable.dispose() stream.cancel()

Stream Pipelining

Streams can be composed by using |> stream-pipelining operator and Stream Operations.

For example, a very common incremental search technique using searchTextStream will look like this:

let searchResultsStream: Stream<[Result]> = searchTextStream
    |> debounce(0.3)
    |> distinctUntilChanged
    |> map { text -> Stream<[Result]> in
        return API.getSearchResultsStream(text)
    }
    |> switchLatestInner

There are some scenarios (e.g. repeat()) when you want to use a cloneable Stream<T>.Producer (Void -> Stream<T>) rather than plain Stream<T>. In this case, you can use |>> streamProducer-pipelining operator instead.

// first, wrap stream with closure
let timerProducer: Void -> Stream<Int> = {
    return createTimerStream(interval: 1)
        |> map { ... }
        |> filter { ... }
}

// then, use `|>>`  (streamProducer-pipelining operator)
let repeatTimerProducer = timerProducer |>> repeat(3)

But in the above case, wrapping with closure will always become cumbersome, so you can also use |>> operator for Stream & Stream Operations as well (thanks to @autoclosure).

let repeatTimerProducer = createTimerStream(interval: 1)
    |>> map { ... }
    |>> filter { ... }
    |>> repeat(3)

Functions

Stream Operations

  • For Single Stream

    • Transforming
      • asStream(ValueType)
      • map(f: T -> U)
      • flatMap(f: T -> Stream<U>)
      • map2(f: (old: T?, new: T) -> U)
      • mapAccumulate(initialValue, accumulator) (alias: scan)
      • buffer(count)
      • bufferBy(stream)
      • groupBy(classifier: T -> Key)
    • Filtering
      • filter(f: T -> Bool)
      • filter2(f: (old: T?, new: T) -> Bool)
      • take(count)
      • takeUntil(stream)
      • skip(count)
      • skipUntil(stream)
      • sample(stream)
      • distinct()
      • distinctUntilChanged()
    • Combining
      • merge(stream)
      • concat(stream)
      • startWith(initialValue)
      • combineLatest(stream)
      • zip(stream)
      • recover(stream)
    • Timing
      • delay(timeInterval)
      • interval(timeInterval)
      • throttle(timeInterval)
      • debounce(timeInterval)
    • Collecting
      • reduce(initialValue, accumulator)
    • Other Utilities
      • peek(f: T -> Void) (for injecting side effects e.g. debug-logging)
      • customize(...)
  • For Array Streams

    • mergeAll(streams)
    • merge2All(streams) (generalized method for mergeAll & combineLatestAll)
    • combineLatestAll(streams)
    • zipAll(streams)
  • For Nested Stream (Stream<Stream<T>>)

    • mergeInner(nestedStream)
    • concatInner(nestedStream)
    • switchLatestInner(nestedStream)
  • For Stream Producer (Void -> Stream<T>)

    • prestart(bufferCapacity) (alias: replay)
    • times(count)
    • retry(count)

Helpers

  • Creating

    • Stream.once(value) (alias: just)
    • Stream.never()
    • Stream.fulfilled() (alias: empty)
    • Stream.rejected() (alias: error)
    • Stream.sequence(values) (a.k.a Rx.fromArray)
    • Stream.infiniteSequence(initialValue, iterator) (a.k.a Rx.iterate)
  • Other Utilities

    • ownedBy(owner: NSObject) (easy strong referencing to keep streams alive)

Dependencies

References

Licence

MIT