Skip to content

Latest commit

 

History

History
734 lines (514 loc) · 15.5 KB

content.adoc

File metadata and controls

734 lines (514 loc) · 15.5 KB

beicon - reactive streams for clojurescript

beicon is a small and concise library that provides reactive streams to clojurescript built on top of rxjs.

Warning
This documentation does not covers all api, so if you miss some function, contributions are very welcome. You can see the full API documentation here.

Since beicon is a young project there can be some API breakage.

The simplest way to use beicon in a clojure project, is by including it in the dependency vector on your project.clj file:

[funcool/beicon "0.4.0"]

This section will give you the available methods for create streams.

The most basic way to create a streamm is just take a collection and convert it in an observable sequence:

(require '[beicon.core :as s])

(def stream (s/from-coll [1 2 3]))

(s/on-value stream #(println "v:" %))
;; ==> v: 1
;; ==> v: 2
;; ==> v: 3

An other way to create potencially infinite stream is using repeat constructor function:

(def stream (s/repeat :a 3))

(s/on-value stream #(println "v:" %))
;; ==> v: :a
;; ==> v: :a
;; ==> v: :a

The second argument for repeat is optional, and omiting it will create an infinite stream.

It there a way for create an observable sequence executing one function:

(def stream (s/from-callback (constantly 1)))

(s/on-value stream #(println "v:" %))
;; ==> v: 1

The function will be executed once and the stream will contain one unique value.

This method allow create potentially infinite stream from executing a callback repeatedly:

(def stream (s/from-poll (constantly 1)))

(s/on-value stream #(println "v:" %))
;; ==> v: 1
;; ==> ...

If a callback returns nil the stream will be ended and if an exception is returned the stream will be ended with error.

Atoms in clojure are watchable, so you can listen its changes. This method convert that changes in an infinite observable sequence of atom changes:

(def a (atom 1))

(def stream (s/from-atom a))

(s/on-value stream #(println "v:" %))
(swap! a inc)
;; ==> v: 2

It creates a observable sequence of one unique value:

(def stream (s/just 10)

(s/on-value stream #(println "v:" %))
;; ==> v: 10

It there is a way for create a observable seequence from multiple values, using the of consturctor:

(def stream (s/of 1 2 3))

(s/on-value stream #(println "v:" %))
;; ==> v: 1
;; ==> v: 2
;; ==> v: 3
Note
the of constructor accept the maximum of 6 parameters.

Some times you also want just a terminated stream:

(def stream (s/empty))

This stream not yelds any value and just terminates.

This allow create an observable seequence of one unique value that will be emited after specified amount of time:

(def stream (s/timeout 1000 10))

(s/on-value stream #(println "v:" %))
;; After 1 sec...
;; ==> v: 10

This is the most advanced and flexible way to create a observable sequence. It allows to have control about termination and errors and intended to be used for build other kind of constructors.

(def stream
  (s/create (fn [sick]
              (sick 1)
              (sick 2)
              (sick nil) ;; ends
              )))

(s/on-value stream #(println "v:" %))
;; ==> v: 1
;; ==> v: 2

This is implemented using protocols for make it flexible and easy extensible by the user. By default any object except nil or instance of js/Error are considered valid values; nil is considered as end of stream and js/Error or any instance of it is considered error termination.

The observable sequence can be in 3 different kind of states: alive, errored or ended. I an error is emited the stream can be considered ended with an error. So error or end states can be considered termination states.

And is convenient you can subscribe to any of that states of an observable seequence.

A general purpose subscription is one that allows you create one subscription that watches all the different possible states of an observable seequence:

(def sub (s/subscribe stream
                      #(println "on-value:" %)
                      #(println "on-error:" %)
                      #(println "on-end:")))

The return value of subscribe function is a funcition that can be called for dispose the subscription.

But in most circumstances you only want consume values regardless of any error or termination. For this purposes is there the on-value function:

(def sub (s/on-value sub #(println "val:" %)))

Like with subscribe function, on-value function also return a callable that when is called will dispose the created subscription.

Note
take care that calling any one of that helper functions creates a separated subscription and it can behave unexpectly if you do not aware if you are using hot-vs-cold-observales.

With on-end function you can watch the successful termination of an observable sequence:

(def sub (s/on-end sub #(println "end!")))

With on-error function you can watch the error termination of an observable seequence:

(def sub (s/on-end sub #(println "error:" %)))

The main advantage of using reactive streams is that you may treat them like normal seequence, and in this case filter them with a predicate:

(def stream (->> (s/from-coll [1 2 3 4 5])
                 (s/filter #(> % 3))))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 4
;; ==> on-value: 5
;; ==> on-end

Also, you can apply a function over each value in the stream:

(def stream (->> (s/from-coll [1 2])
                 (s/map inc)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-end

Convets a observable seequence that can contain other observable seequences in an other observable seequences that emits just plain values.

The result is similar to concatenate all the underlying seequences.

(def stream (->> (s/from-coll [1 2])
                 (s/map #(sfrom-coll (range % (+ % 2))))
                 (s/flat-map))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 2
;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-end

Also, sometimes you just want to skip values from stream under different criteria.

You can skip the first N values:

(def stream (->> (s/from-coll [1 2 3 4 5 6])
                 (s/skip 4)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 5
;; ==> on-value: 6
;; ==> on-end

Skip while some condition evalutates to true:

(def stream (->> (s/from-coll [1 1 1 1 2 3])
                 (s/skip-while odd?)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-end

Or skip until an other observable yelds a value using skip-until (no example at this moment).

You also can limit the observale sequence to an specified number of elements:

(def stream (->> (s/from-coll [1 1 1 1 2 3])
                 (s/take 2)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 1
;; ==> on-end

Or an condition expression evaluates to true:

(def stream (->> (s/from-coll [1 1 1 1 2 3])
                 (s/take-while odd?)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 1
;; ==> on-value: 1
;; ==> on-value: 1
;; ==> on-end

This is a some kind of combination of skip and take, and returns a observable seequence that represents a portion delimited by start and end of the source observable seequence.

(def stream (->> (s/from-coll [1 2 3 4])
                 (s/slice 1 3)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-end

Allows combine all results of an observable seequence using a combining function also called (reducing function):

(def stream (->> (s/from-coll [1 2 3 4])
                 (s/reduce + 0)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 10
;; ==> on-end

Allows combine all results of an observable seequence using a combining function also called (reducing function). Returns a stream of each intermediate result instead of:

(def stream (->> (s/from-coll [1 2 3 4])
                 (s/scan + 0)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 3
;; ==> on-value: 6
;; ==> on-value: 10
;; ==> on-end

This transformer functions allow accomulate N specified values in a buffer and then emits them as one value.

(def stream (->> (s/from-coll [1 2 3 4])
                 (s/buffer 2)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: [1 2]
;; ==> on-value: [3 4]
;; ==> on-end

Perform an arbitrary choice between two or more observable sequences and return the first value available from any provided observables.

This kind if combinator works very well with operations that can timeout:

(def stream (s/choice
              (s/timeout 1000 :timeout)
              (s/timeout 900 :value)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: :value
;; ==> on-end

This combinator combines two observable seequences in one.

(def stream (s/zip
              (s/from-coll [1 2 3])
              (s/from-coll [2 3 4])))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: [1 2]
;; ==> on-value: [2 3]
;; ==> on-value: [3 4]
;; ==> on-end

This cobinator concatenates two or more observable seequences.

(def stream (s/concat
              (s/from-coll [1 2])
              (s/from-coll [3 4])))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 2
;; ==> on-value: 3
;; ==> on-value: 4
;; ==> on-end

This combinator merges two or more observable seequences.

(def stream (s/concat
              (s/from-coll [1 2])
              (s/from-coll [3 4])))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

;; ==> on-value: 1
;; ==> on-value: 3
;; ==> on-value: 2
;; ==> on-value: 4
;; ==> on-end
Note
the order is not required to be the same always.

This is an abstraction that combines observable seequence with the observer. So you can push values into it and transform and subscribe to it like any other seequence.

You can create a bus instance using bus constructor function. There is an example of using bus for the both operations: push values and subscribe to it.

(def bus (s/bus))
(def stream (-> bus
                (s/skip 1)
                (s/map inc)
                (s/take 2)))

(s/subscribe stream
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

(s/push! bus 1)
(s/push! bus 2)
(s/push! bus 1)
(s/push! bus 2)

;; ==> on-value: 3
;; ==> on-value: 2
;; ==> on-end

You can end bus in any moment just executing end! function:

(def bus (s/bus))

(s/subscribe bus
             #(println "on-value:" %)
             #(println "on-error:" %)
             #(println "on-end"))

(s/end! bus)
;; ==> on-end

Five most important rules:

  • Beautiful is better than ugly.

  • Explicit is better than implicit.

  • Simple is better than complex.

  • Complex is better than complicated.

  • Readability counts.

All contributions to beicon should keep these important rules in mind.

Unlike Clojure and other Clojure contributed libraries beicon does not have many restrictions for contributions. Just open an issue or pull request.

beicon is open source and can be found on github.

You can clone the public repository with this command:

git clone https://github.com/funcool/beicon

For running tests just execute this:

./scrpts/build
node ./out/tests.js

beicon is under public domain:

This is free and unencumbered software released into the public domain.

Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.

In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.

For more information, please refer to <http://unlicense.org/>