Skip to content

Rxjava clojure bindings final #925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1b13e56
Initial import of indigena-rx
daveray Feb 18, 2014
80ba1b3
Replace chain with operators. General core cleanup
daveray Feb 18, 2014
da88202
Got rid of base. It was weird.
daveray Feb 18, 2014
236cbdb
Clean up future stuff and docs
daveray Feb 19, 2014
bfc03d6
Some more core operators
daveray Feb 19, 2014
4276368
Update blocking stuff
daveray Feb 19, 2014
a2d2314
Experimental warnings
daveray Feb 19, 2014
0da1e88
Update Clojure README
daveray Feb 19, 2014
cf5a20c
fix gradle spec in README
mlc Feb 19, 2014
df5503f
Cleaned up naming of observable operator definition fns
daveray Feb 20, 2014
f1ff778
Implement catch-error-value
daveray Feb 20, 2014
feb16d0
more README
daveray Feb 20, 2014
963cfd5
Implement group-by
daveray Feb 20, 2014
3714db9
Implement interleave and interleave*
daveray Feb 20, 2014
02ff779
Implemented partition-all
daveray Feb 20, 2014
1ffe5fb
Avoid intermediate toList for blocking/into
daveray Feb 20, 2014
7f4f07e
Update merge/merge-delay-error impls.
daveray Feb 20, 2014
67cf57e
Updates from @mbossenbroek's feedback.
daveray Feb 20, 2014
2b597fc
interpose docstring
daveray Feb 20, 2014
c573cfd
Reimplement into without toList
daveray Feb 20, 2014
c51054e
Update generator docstring
daveray Feb 20, 2014
ea22e7a
Make catch and finally ->> friendly
daveray Feb 21, 2014
ee45b44
Eliminate sorted-list variants and cleanup sort tests
daveray Feb 21, 2014
9269d4e
Eliminated macro version of future stuff based on feedback.
daveray Feb 21, 2014
9bc5a85
Implemented multi-sequence mapcat
daveray Feb 21, 2014
ea2fd75
Remove val-fn version of group-by
daveray Feb 21, 2014
6bc098d
Implement iterate
daveray Feb 21, 2014
7919c04
Implement range
daveray Feb 21, 2014
d19bc42
seq->o should seq-ify arg
daveray Feb 22, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 93 additions & 32 deletions language-adaptors/rxjava-clojure/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,96 @@
# Clojure Adaptor for RxJava
Clojure bindings for RxJava.

# Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-clojure%22).

Example for Leiningen:

```clojure
[com.netflix.rxjava/rxjava-clojure "x.y.z"]
```

and for Gradle:

```groovy
compile 'com.netflix.rxjava:rxjava-clojure:x.y.z'
```

and for Maven:

```xml
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-clojure</artifactId>
<version>x.y.z</version>
</dependency>
```

and for Ivy:

```xml
<dependency org="com.netflix.rxjava" name="rxjava-clojure" rev="x.y.z" />
```

# Clojure Bindings
This library provides convenient, idiomatic Clojure bindings for RxJava.

The bindings try to present an API that will be comfortable and familiar to a Clojure programmer that's familiar with the sequence operations in `clojure.core`. It "fixes" several issues with using RxJava with raw Java interop, for example:

* Argument lists are in the "right" order. So in RxJava, the function applied in `Observable.map` is the second argument, while here it's the first argument with one or more Observables as trailing arguments
* Operators take normal Clojure functions as arguments, bypassing need for the interop described below
* Predicates accomodate Clojure's notion of truth
* Operators are generally names as they would be in `clojure.core` rather than the Rx names

There is no object wrapping going on. That is, all functions return normal `rx.Observable` objects, so you can always drop back to Java interop for anything that's missing in this wrapper.

## Basic Usage
Most functionality resides in the `rx.lang.clojure.core` namespace and for the most part looks like normal Clojure sequence manipulation:

```clojure
(require '[rx.lang.clojure.core :as rx])

(->> my-observable
(rx/map (comp clojure.string/lower-case :first-name))
(rx/map clojure.string/lower-case)
(rx/filter #{"bob"})
(rx/distinct)
(rx/into []))
;=> An Observable that emits a single vector of names
```

Blocking operators, which are useful for testing, but should otherwise be avoided, reside in `rx.lang.clojure.blocking`. For example:

```clojure
(require '[rx.lang.clojure.blocking :as rxb])

(rxb/doseq [{:keys [first-name]} users-observable]
(println "Hey," first-name))
;=> nil
```

## Open Issues

* The missing stuff mentioned below
* `group-by` val-fn variant isn't implemented in RxJava
* There are some functions for defining customer Observables and Operators (`subscriber`, `operator*`, `observable*`). I don't think these are really enough for serious operator implementation, but I'm hesitant to guess at an abstraction at this point. These will probably change dramatically.

## What's Missing
This library is an ongoing work in progress driven primarily by the needs of one team at Netflix. As such some things are currently missing:

* Highly-specific operators that we felt cluttered the API and were easily composed from existing operators, especially since we're in not-Java land. For example, `Observable.sumLong()`.
* Most everything involving schedulers
* Most everything involving time
* `Observable.window` and `Observable.buffer`. Who knows which parts of these beasts to wrap?

Of course, contributions that cover these cases are welcome.

# Low-level Interop
This adaptor provides functions and macros to ease Clojure/RxJava interop. In particular, there are functions and macros for turning Clojure functions and code into RxJava `Func*` and `Action*` interfaces without the tedium of manually reifying the interfaces.

# Basic Usage
## Basic Usage

## Requiring the interop namespace
### Requiring the interop namespace
The first thing to do is to require the namespace:

```clojure
Expand All @@ -19,7 +105,7 @@ or, at the REPL:
(require '[rx.lang.clojure.interop :as rx])
```

## Using rx/fn
### Using rx/fn
Once the namespace is required, you can use the `rx/fn` macro anywhere RxJava wants a `rx.util.functions.Func` object. The syntax is exactly the same as `clojure.core/fn`:

```clojure
Expand All @@ -34,7 +120,7 @@ If you already have a plain old Clojure function you'd like to use, you can pass
(.reduce (rx/fn* +)))
```

## Using rx/action
### Using rx/action
The `rx/action` macro is identical to `rx/fn` except that the object returned implements `rx.util.functions.Action` interfaces. It's used in `subscribe` and other side-effect-y contexts:

```clojure
Expand All @@ -46,7 +132,7 @@ The `rx/action` macro is identical to `rx/fn` except that the object returned im
(rx/action [] (println "Sequence complete"))))
```

## Using Observable/create
### Using Observable/create
As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnSubscribe` which is basically an alias for `rx.util.functions.Action1` that takes an `rx.Subscriber` as its argument. Thus, you can just use `rx/action` when creating new observables:

```clojure
Expand All @@ -59,35 +145,10 @@ As of 0.17, `rx.Observable/create` takes an implementation of `rx.Observable$OnS
(.onCompleted s)))
```

# Gotchas
## Gotchas
Here are a few things to keep in mind when using this interop:

* Keep in mind the (mostly empty) distinction between `Func` and `Action` and which is used in which contexts
* If there are multiple Java methods overloaded by `Func` arity, you'll need to use a type hint to let the compiler know which one to choose.
* Methods that take a predicate (like filter) expect the predicate to return a boolean value. A function that returns a non-boolean value will result in a `ClassCastException`.

# Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-clojure%22).

Example for Maven:

```xml
<dependency>
<groupId>com.netflix.rxjava</groupId>
<artifactId>rxjava-clojure</artifactId>
<version>x.y.z</version>
</dependency>
```

and for Ivy:

```xml
<dependency org="com.netflix.rxjava" name="rxjava-clojure" rev="x.y.z" />
```

and for Leiningen:

```clojure
[com.netflix.rxjava/rxjava-clojure "x.y.z"]
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
(ns rx.lang.clojure.blocking
"Blocking operators and functions. These should never be used in
production code except at the end of an async chain to convert from
rx land back to sync land. For example, to produce a servlet response.

If you use these, you're a bad person.
"
(:refer-clojure :exclude [first into doseq last])
(:require [rx.lang.clojure.interop :as iop] [rx.lang.clojure.core :as rx])
(:import [rx Observable]
[rx.observables BlockingObservable]))

(def ^:private -ns- *ns*)
(set! *warn-on-reflection* true)

(defmacro ^:private with-ex-unwrap
"The blocking ops wrap errors stuff in RuntimeException because of stupid Java.
This tries to unwrap them so callers get the exceptions they expect."
[& body]
`(try
~@body
(catch RuntimeException e#
(throw (or
(and (identical? RuntimeException (class e#))
(.getCause e#))
e#)))))

(defn ^BlockingObservable ->blocking
"Convert an Observable to a BlockingObservable.

If o is already a BlockingObservable it's returned unchanged.
"
[o]
(if (instance? BlockingObservable o)
o
(.toBlockingObservable ^Observable o)))

(defn o->seq
"Returns a lazy sequence of the items emitted by o

See:
rx.observables.BlockingObservable/getIterator
rx.lang.clojure.core/seq->o
"
[o]
(-> (->blocking o)
(.getIterator)
(iterator-seq)))

(defn first
"*Blocks* and waits for the first value emitted by the given observable.

If the Observable is empty, returns nil

If an error is produced it is thrown.

See:
clojure.core/first
rx/first
rx.observables.BlockingObservable/first
"
[observable]
(with-ex-unwrap
(.firstOrDefault (->blocking observable) nil)))

(defn last
"*Blocks* and waits for the last value emitted by the given observable.

If the Observable is empty, returns nil

If an error is produced it is thrown.

See:
clojure.core/last
rx/last
rx.observable.BlockingObservable/last
"
[observable]
(with-ex-unwrap
(.lastOrDefault (->blocking observable) nil)))

(defn single
"*Blocks* and waits for the first value emitted by the given observable.

An error is thrown if zero or more then one value is produced.
"
[observable]
(with-ex-unwrap
(.single (->blocking observable))))

(defn into
"*Blocks* and pours the elements emitted by the given observables into
to.

If an error is produced it is thrown.

See:
clojure.core/into
rx/into
"
[to from-observable]
(with-ex-unwrap
(clojure.core/into to (o->seq from-observable))))

(defn doseq*
"*Blocks* and executes (f x) for each x emitted by xs

Returns nil.

See:
doseq
clojure.core/doseq
"
[xs f]
(with-ex-unwrap
(-> (->blocking xs)
(.forEach (rx.lang.clojure.interop/action* f)))))

(defmacro doseq
"Like clojure.core/doseq except iterates over an observable in a blocking manner.

Unlike clojure.core/doseq, only supports a single binding

Returns nil.

Example:

(rx-blocking/doseq [{:keys [name]} users-observable]
(println \"User:\" name))

See:
doseq*
clojure.core/doseq
"
[bindings & body]
(when (not= (count bindings) 2)
(throw (IllegalArgumentException. (str "sorry, rx/doseq only supports one binding"))))
(let [[k v] bindings]
`(doseq* ~v (fn [~k] ~@body))))

Loading