Skip to content

Commit

Permalink
ToSlice and ToChannel added
Browse files Browse the repository at this point in the history
  • Loading branch information
DusanKasan committed Nov 16, 2017
1 parent 06b3958 commit 454a93b
Show file tree
Hide file tree
Showing 12 changed files with 568 additions and 83 deletions.
19 changes: 7 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

** This library is a work in progress **

[![Coverage Status](https://coveralls.io/repos/github/DusanKasan/cesium/badge.svg?branch=master)](https://coveralls.io/github/DusanKasan/cesium?branch=master) [![Go Report Card](https://goreportcard.com/badge/github.com/DusanKasan/cesium)](https://goreportcard.com/report/github.com/DusanKasan/cesium) [![CircleCI](https://circleci.com/gh/DusanKasan/cesium.svg?style=shield)](https://circleci.com/gh/DusanKasan/cesium)
[![Coverage Status](https://coveralls.io/repos/github/DusanKasan/cesium/badge.svg?branch=master)](https://coveralls.io/github/DusanKasan/cesium?branch=master) [![Go Report Card](https://goreportcard.com/badge/github.com/DusanKasan/cesium)](https://goreportcard.com/report/github.com/DusanKasan/cesium) [![CircleCI](https://circleci.com/gh/DusanKasan/cesium.svg?style=shield)](https://circleci.com/gh/DusanKasan/cesium) [![GoDoc](https://godoc.org/github.com/DusanKasan/cesium?status.svg)](https://godoc.org/github.com/DusanKasan/cesium)

This is a port of [Project Reactor](https://projectcesium.io/) into [Go](https://golang.org/). It provides reactive data streams with asynchronous pull backpressure and operator fusion. Its aim is to be as close to the proposed Java API as possible, altering it slightly where needed for it to make sense in Go.

Expand Down Expand Up @@ -107,24 +107,21 @@ Operators listed according to [Reactor docs](https://projectcesium.io/docs/core/
- [x] FlatMap
- [x] Handle(func(T, SynchronousSink))
- [ ] Flux.FlatMapSequential
- [ ] Mono.FlatMapMany
- [ ] Flux.ToSlice
- [x] Mono.FlatMapMany
- [x] Flux.ToSlice
- Maybe ToList (LinkedList would be better to handle large datasets)
- [ ] Flux.ToSortedSlice
- [ ] Flux.ToMap
- [ ] Flux.ToChannel
- [x] Flux.ToChannel
- [x] Flux.Count()
- [x] Flux.Reduce(func(T, T) T)
- [ ] Flux.ReduceWithInitial
- [x] Flux.Scan(func(T, T) T)
- [ ] Flux.ScanWithInitial
- [x] Flux.All(func(T) bool)
- [x] Flux.Any(func(T) bool)
- [x] Flux.HasElements()
- [x] Flux.HasElement(T) Flux
- [x] Flux.Concat(Publisher<Publisher>) Flux
- [x] ConcatWith(Publisher) Flux
- [ ] ConcatWith
- [ ] Flux.ConcatDelayError
- [ ] Flux.MergeSequential
- [ ] Flux.Merge
Expand All @@ -134,7 +131,7 @@ Operators listed according to [Reactor docs](https://projectcesium.io/docs/core/
- [ ] Mono.And
- [ ] Mono.When
- [ ] Flux.CombineLatest
- [ ] First
- [ ] First (implement before Or)
- [ ] Or
- [ ] SwitchMap
- [ ] SwitchOnNext
Expand Down Expand Up @@ -255,7 +252,5 @@ Operators listed according to [Reactor docs](https://projectcesium.io/docs/core/
- How to split up tests for normal and scalar flux/mono?
- Fix locking for flatMaps
- Move most docs to godoc, except some examples and "how to choose an operator"
- transform switch on signal type to accept(sub)
- NoneSignal()
- remove p.OnSubscribe(subscription2) from everywhere to avoid double subscribtion
- at least 1 godoc example per factory function
- NoneSignal() ?
- Performance benchmarks
15 changes: 12 additions & 3 deletions contract.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Cesium is a general purpose 4th generation non-blocking reactive library
// that offers efficient demand management (in the form of managing
// Package cesium is a general purpose 4th generation non-blocking reactive
// library that offers efficient demand management (in the form of managing
// "backpressure"). It offers composable asynchronous sequence APIs Flux (for
// [N] elements) and Mono (for [0|1] elements), extensively implementing the
// Reactive Extensions specification where possible.
Expand Down Expand Up @@ -49,6 +49,8 @@ type Publisher interface {
Subscribe(Subscriber) Subscription
}

// Subscriber is an observer that can manage the rate of emissions of the
// Publisher it is subscribed to.
type Subscriber interface {
// OnNext is called by a Publisher when a it emits an item T.
OnNext(T)
Expand All @@ -64,6 +66,8 @@ type Subscriber interface {
OnSubscribe(Subscription)
}

// Subscription is a way to manage the rate of emission of a Publisher for a
// specific Subscriber.
type Subscription interface {
// Cancel the Subscription, effectively telling the Publisher to stop
// emitting on this subscription.
Expand All @@ -85,6 +89,7 @@ type Processor interface {
Publisher
}

// Scheduler serves as a means to introduce multi-threading to reactive operators.
// Observables/Publishers emit on the thread Subscribe was called on, so
// to introduce multi-threading we execute everything on schedulers. Some
// operators allow you to pass a specific scheduler, because they can not
Expand Down Expand Up @@ -130,6 +135,8 @@ type Flux interface {
Concat(Publisher /*<cesium.Publisher>*/) Flux
ConcatWith(...Publisher) Flux
FlatMap(func(T) Publisher, ...Scheduler) Flux
ToSlice() ([]T, error)
ToChannel() (<-chan T, <-chan error)

DoOnSubscribe(func(Subscription)) Flux
DoOnRequest(func(int64)) Flux
Expand Down Expand Up @@ -167,6 +174,7 @@ type Mono interface {
FlatMapMany(fn func(T) Publisher, scheduler ...Scheduler) Flux
Handle(func(T, SynchronousSink)) Mono
ConcatWith(...Publisher) Flux
ToChannel() (<-chan T, <-chan error)

Filter(func(T) bool) Mono

Expand Down Expand Up @@ -253,6 +261,7 @@ type MonoSink interface {
OnDispose(func())
}

// SignalType represents a type of a signal emitted by a Publisher.
type SignalType string

// Represents an onSubscribe signal type in Signal.
Expand All @@ -267,7 +276,7 @@ const SignalTypeOnComplete SignalType = "onComplete"
// Represents an onError signal type in Signal.
const SignalTypeOnError SignalType = "onError"

// Represents a reactive signal: OnSubscribe, OnNext, OnComplete or OnError.
// Signal represents a reactive signal: OnSubscribe, OnNext, OnComplete or OnError.
type Signal interface {
// Propagate the signal represented by this Signal to a given Subscriber.
Accept(Subscriber)
Expand Down
80 changes: 78 additions & 2 deletions flux/example_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package flux_test

import (
"errors"

"fmt"

"github.com/DusanKasan/cesium"
"github.com/DusanKasan/cesium/flux"
)
Expand All @@ -19,7 +23,8 @@ func ExampleFromSlice() {
flux.Just([]cesium.T{1, 2, 3}).Subscribe(subscriber)
}

// The subscriber will receive items int64(1), int64(2), int64(3) and then an onComplete signal
// The subscriber will receive items int64(1), int64(2), int64(3) and then an
// onComplete signal
func ExampleRange() {
var subscriber cesium.Subscriber

Expand All @@ -30,7 +35,7 @@ func ExampleRange() {
func ExampleEmpty() {
var subscriber cesium.Subscriber

flux.Just(1, 2, 3).Subscribe(subscriber)
flux.Empty().Subscribe(subscriber)
}

// The subscriber will receive no items or termination signal
Expand All @@ -40,6 +45,30 @@ func ExampleNever() {
flux.Never().Subscribe(subscriber)
}

// The subscriber will receive items 1, 2, 3 and an onComplete signal after which
// "Range flux finished, dispose of the resource (3)" is printed.
//
// Internally the return value of the first argument will be used as an input to
// the second, producing a flux of Range(1, 3). The subscriber is subscribed to
// this flux. Then when the subscribe reaches the termination signal, the third
// argument is executed and "Range flux finished, dispose of the resource (3)"
// is printed.
func ExampleUsing() {
var subscriber cesium.Subscriber

flux.Using(
func() cesium.T {
return 3
},
func(t cesium.T) cesium.Publisher {
return flux.Range(1, t.(int))
},
func(t cesium.T) {
fmt.Println("Range flux finished, dispose of the resource (3)")
},
).Subscribe(subscriber)
}

// The subscriber will receive items 1, 2, 3 and an onComplete signal. If the
// subscribe would not be able to keep up with how fast we are emitting items
// they will be buffered thanks to the flux.OverflowStrategyBuffer.
Expand All @@ -53,3 +82,50 @@ func ExampleCreate() {
sink.Complete()
}, flux.OverflowStrategyBuffer).Subscribe(subscriber)
}

// The subscriber will receive items 1, 2, 3 and an onComplete signal. You can
// also see that in the last pass when we signal completion the second signal is
// ignored.
func ExampleGenerate() {
var subscriber cesium.Subscriber

i := 1
flux.Generate(func(sink cesium.SynchronousSink) {
if i > 3 {
sink.Complete()
}

sink.Next(i)
i++
}).Subscribe(subscriber)
}

// The subscriber will receive items 1, 2, 3 and an onComplete signal.
func ExampleDefer() {
var subscriber cesium.Subscriber

flux.Defer(func() cesium.Publisher {
return flux.Just(1, 2, 3)
}).Subscribe(subscriber)
}

// The subscriber will receive no items and an onError signal containing the
// supplied error.
func ExampleError() {
var subscriber cesium.Subscriber

flux.Error(errors.New("error")).Subscribe(subscriber)
}

// The subscriber will receive items 1, 2, 3 and an onComplete signal.
func ExampleFromChannel() {
c := make(chan cesium.T)
var subscriber cesium.Subscriber

flux.FromChannel(c).Subscribe(subscriber)

c <- 1
c <- 2
c <- 3
close(c)
}
19 changes: 16 additions & 3 deletions flux/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,22 @@ import (

type overflowStrategy internal.OverflowStrategy

// OverflowStrategyBuffer when passed to flux.Create instructs the operator to
// buffer the emissions the Subscriber can't process.
const OverflowStrategyBuffer = overflowStrategy(internal.OverflowStrategyBuffer)

// OverflowStrategyDrop when passed to flux.Create instructs the operator to
// discard the emissions the Subscriber can't process.
const OverflowStrategyDrop = overflowStrategy(internal.OverflowStrategyDrop)

// OverflowStrategyError when passed to flux.Create instructs the operator to
// emit the cesium.DownstreamUnableToKeepUpError error if the Subscriber
// can't process the items as fast as they are emitted.
const OverflowStrategyError = overflowStrategy(internal.OverflowStrategyError)

// OverflowStrategyDrop when passed to flux.Create instructs the operator to
// ignore backpressure and emit items as fast as possible, effectivelly turning
// it into an observable.
const OverflowStrategyIgnore = overflowStrategy(internal.OverflowStrategyIgnore)

// Just creates new cesium.Flux that emits the supplied items and completes.
Expand Down Expand Up @@ -56,8 +69,8 @@ func Create(f func(cesium.FluxSink), os overflowStrategy) cesium.Flux {
return internal.FluxCreate(f, internal.OverflowStrategy(os))
}

// Programmatically create a cesium.Flux by generating signals one-by-one when
// they are requested.
// Generate creates a cesium.Flux programmatically by generating signals
// one-by-one when they are requested.
func Generate(f func(cesium.SynchronousSink)) cesium.Flux {
return internal.FluxGenerate(f)
}
Expand All @@ -68,7 +81,7 @@ func Defer(f func() cesium.Publisher) cesium.Flux {
return internal.FluxDefer(f)
}

// Empty creates new cesium.Flux that emits no items and completes with error.
// Error creates new cesium.Flux that emits no items and completes with error.
func Error(err error) cesium.Flux {
return internal.FluxError(err)
}
Expand Down
104 changes: 104 additions & 0 deletions flux/tests/operator_toChannel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package tests

import (
"testing"

"errors"

"github.com/DusanKasan/cesium"
"github.com/DusanKasan/cesium/flux"
)

func TestToChannel(t *testing.T) {
items, errs := flux.
Just(1, 2, 3).
ToChannel()

var is []cesium.T
var es []error

loop:
for {
select {
case i, ok := <-items:
if !ok {
break loop
}
is = append(is, i)
case err, ok := <-errs:
if !ok {
break loop
}
es = append(es, err)
}
}

if len(es) > 0 {
t.Errorf("error received: %v", es[0])
}

x := []cesium.T{1, 2, 3}
if len(is) != 3 {
t.Errorf("Invalid output. Expected: %v, Got: %v", x, is)
return
}

for i := range is {
if is[i] != x[i] {
t.Errorf("Invalid output. Expected: %v, Got: %v", x, is)
return
}
}
}

func TestToChanneWithError(t *testing.T) {
originalErr := errors.New("err")
items, errs := flux.
Create(func(sink cesium.FluxSink) {
sink.Next(1)
sink.Error(originalErr)
}, flux.OverflowStrategyBuffer).
ToChannel()

var is []cesium.T
var es []error

loop:
for {
select {
case i, ok := <-items:
if !ok {
break loop
}
is = append(is, i)
case err, ok := <-errs:
if !ok {
break loop
}
es = append(es, err)
}
}

if len(es) != 1 {
t.Errorf("wrong number of errors received: %v. Details %v", len(es), es)
return
}

if es[0] != originalErr {
t.Errorf("Wrong error received. Expected %v, Got: %v", originalErr, es[0])
return
}

x := []cesium.T{1}
if len(is) != 1 {
t.Errorf("Invalid output. Expected: %v, Got: %v", x, is)
return
}

for i := range is {
if is[i] != x[i] {
t.Errorf("Invalid output. Expected: %v, Got: %v", x, is)
return
}
}
}

0 comments on commit 454a93b

Please sign in to comment.