This is a package and tool providing Reactive eXtensions for Go.
The main package provides Rx operators for common Go builtin types, and the tool generates Go code for arbitrary types.
Yes, good question. Mostly as an exercise to see if it was feasible/possible. It is, largely, except for operators that produce sequence types (ie. arrays or observables of T).
That said, the Rx operators do provide some nice functionality that would otherwise have to be implemented by hand.
go get github.com/alecthomas/gorx github.com/alecthomas/gorx/cmd/gorx
To use the package:
import "github.com/alecthomas/gorx"
gorx.
FromTimeChannel(time.Tick(time.Second)).
Take(5).
Do(func(t time.Time) { fmt.Printf("%s\n", t) }).
Wait()
To generate Rx operators for custom types:
gorx --import=gopkg.in/alecthomas/kingpin.v2 kingpinrx '*kingpin.CmdClause' '*kingpin.FlagClause'
A very basic example creating an observable from a set of strings and printing them:
gorx.FromStrings("Ben", "George").Do(func(s string) { fmt.Println(s) }).Wait()
A more complex example. Try retrieving article from cache, otherwise fetch original from Wikipedia, all with a timeout.
func GetCached(url string) *ResponseStream {
fmt.Printf("No cache entry for %s\n", url)
return ThrowResponse(errors.New("not implemented"))
}
func SetCached(response *http.Response) {
fmt.Printf("Caching %s\n", response.Request.URL)
}
func Get(url string) *ResponseStream {
return StartResponse(func() (*http.Response, error) {
response, err := http.Get(url)
if err == nil && (response.StatusCode < 200 || response.StatusCode > 299) {
return nil, errors.New(http.StatusText(response.StatusCode))
}
return response, err
})
}
func URLForArticle(article string) string {
return "http://en.wikipedia.org/wiki/" + article
}
func LogError(err error) {
fmt.Printf("error: %s\n", err)
}
func GetWikipediaArticles(timeout time.Duration, articles ...string) *ResponseStream {
// Try cached URL first, then recover with remote URL and
// finally recover with an empty stream.
return FromStringArray(articles).
Map(URLForArticle).
FlatMapResponse(func(url string) ResponseObservable {
remote := Get(url).
Timeout(timeout).
Do(SetCached).
DoOnError(LogError).
Catch(EmptyResponse())
return GetCached(url).
Catch(remote)
})
}
Following are a list of the core operators as defined by reactivex.io that have been implemented and will (probably) be implemented soon:
- Create
- Empty
- Never
- Throw
- Just
- Range
- Repeat
- Start
Not implemented:
- Defer
- Timer
- Map
- Reduce
- Scan
- FlatMap
Not implemented:
- Buffer
- GroupBy
- Window
Note: These operators are currently not implemented because each distinct
observable type requires quite a lot of boilerplate code, and these operators
produce new types. eg. .Buffer(2)
would transform T
to a stream of []T
,
.FlatMap(f)
would transform T
to a TStreamStream
, etc. One "solution" is
to only generate these operators if the user explicitly requests these
resultant types.
- Distinct
- ElementAt
- Filter
- First
- Last
- Skip
- SkipLast
- Take
- TakeLast
- IgnoreElements
- Sample
- Debounce
- Merge
- MergeDelayError
Not implemented:
- CombineLatest
- And / Then / When
- Zip
- Join
- StartWith
- Switch
- Zip
Note: See note above for Transformations for why these are not implemented.
- Catch
- Retry
- Concat
- Average
- Count
- Min
- Max
- Reduce
- Sum
- Do
- Subscribe
Not implemented:
- Delay
- Timeout
- Timestamp
- Materialize / Dematerialize
- Serialize
- TimeInterval
Not implemented:
- All
- Amb
- Contains
- DefaultIfEmpty
- SequenceEqual
- SkipUntil
- SkipWhile
- TakeUntil
- TakeWhile
- To (one, array, channel)