Skip to content

Commit

Permalink
Merge branch 'v2-buffer' of github.com:reactivex/rxgo into issue_118
Browse files Browse the repository at this point in the history
* 'v2-buffer' of github.com:reactivex/rxgo:
  goimports
  Race condition fix
  BufferWithTimeOrCount implementation improvement
  BufferWithTimeOrCount operator
  BufferWithTime operator
  BufferWithCount operator
  • Loading branch information
avelino committed Dec 4, 2018
2 parents 7a86d36 + dbb48a5 commit a259341
Show file tree
Hide file tree
Showing 6 changed files with 483 additions and 30 deletions.
4 changes: 2 additions & 2 deletions duration.go
Expand Up @@ -29,8 +29,8 @@ func (d *duration) duration() time.Duration {
return d.d
}

func WithFrequency(frequency time.Duration) Duration {
func WithDuration(d time.Duration) Duration {
return &duration{
d: frequency,
d: d,
}
}
2 changes: 1 addition & 1 deletion duration_test.go
Expand Up @@ -8,6 +8,6 @@ import (
)

func TestWithFrequency(t *testing.T) {
frequency := WithFrequency(100 * time.Millisecond)
frequency := WithDuration(100 * time.Millisecond)
assert.Equal(t, 100*time.Millisecond, frequency.duration())
}
256 changes: 245 additions & 11 deletions observable.go
Expand Up @@ -23,6 +23,9 @@ type Observable interface {
AverageInt16() Single
AverageInt32() Single
AverageInt64() Single
BufferWithCount(count, skip int) Observable
BufferWithTime(timespan, timeshift Duration) Observable
BufferWithTimeOrCount(timespan Duration, count int) Observable
Contains(equal Predicate) Single
Count() Single
DefaultIfEmpty(defaultValue interface{}) Observable
Expand Down Expand Up @@ -158,24 +161,24 @@ func (o *observable) Subscribe(handler handlers.EventHandler, opts ...options.Op
}
}()
} else {
wg := sync.WaitGroup{}

var e error
results := make([]chan error, 0)
for i := 0; i < observableOptions.Parallelism(); i++ {
wg.Add(1)

ch := make(chan error)
go func() {
e = iterate(o, ob)
wg.Done()
ch <- iterate(o, ob)
}()
results = append(results, ch)
}

go func() {
wg.Wait()
// OnDone only gets executed if there's no error.
if e == nil {
ob.OnDone()
for _, ch := range results {
err := <-ch
if err != nil {
return
}
}

ob.OnDone()
}()
}

Expand Down Expand Up @@ -947,3 +950,234 @@ func (o *observable) Min(comparator Comparator) OptionalSingle {
}()
return &optionalSingle{ch: out}
}

// BufferWithCount returns an Observable that emits buffers of items it collects
// from the source Observable.
// The resulting Observable emits buffers every skip items, each containing a slice of count items.
// When the source Observable completes or encounters an error,
// the resulting Observable emits the current buffer and propagates
// the notification from the source Observable.
func (o *observable) BufferWithCount(count, skip int) Observable {
out := make(chan interface{})
go func() {
if count <= 0 {
out <- errors.New(errors.IllegalInputError, "count must be positive")
close(out)
return
}

if skip <= 0 {
out <- errors.New(errors.IllegalInputError, "skip must be positive")
close(out)
return
}

buffer := make([]interface{}, count, count)
iCount := 0
iSkip := 0
for item := range o.ch {
switch item := item.(type) {
case error:
if iCount != 0 {
out <- buffer[:iCount]
}
out <- item
close(out)
return
default:
if iCount >= count { // Skip
iSkip++
} else { // Add to buffer
buffer[iCount] = item
iCount++
iSkip++
}

if iSkip == skip { // Send current buffer
out <- buffer
buffer = make([]interface{}, count, count)
iCount = 0
iSkip = 0
}
}
}

if iCount != 0 {
out <- buffer[:iCount]
}

close(out)
}()
return &observable{ch: out}
}

// BufferWithTime returns an Observable that emits buffers of items it collects from the source
// Observable. The resulting Observable starts a new buffer periodically, as determined by the
// timeshift argument. It emits each buffer after a fixed timespan, specified by the timespan argument.
// When the source Observable completes or encounters an error, the resulting Observable emits
// the current buffer and propagates the notification from the source Observable.
func (o *observable) BufferWithTime(timespan, timeshift Duration) Observable {
out := make(chan interface{})
go func() {
if timespan == nil || timespan.duration() == 0 {
out <- errors.New(errors.IllegalInputError, "timespan must not be nil")
close(out)
return
}

if timeshift == nil {
timeshift = WithDuration(0)
}

var mux sync.Mutex
var listenMutex sync.Mutex
buffer := make([]interface{}, 0)
stop := false
listen := true

// First goroutine in charge to check the timespan
go func() {
for {
time.Sleep(timespan.duration())
mux.Lock()
if !stop {
out <- buffer
buffer = make([]interface{}, 0)
mux.Unlock()

if timeshift.duration() != 0 {
listenMutex.Lock()
listen = false
listenMutex.Unlock()
time.Sleep(timeshift.duration())
listenMutex.Lock()
listen = true
listenMutex.Unlock()
}
} else {
mux.Unlock()
return
}
}
}()

// Second goroutine in charge to retrieve the items from the source observable
go func() {
for item := range o.ch {
switch item := item.(type) {
case error:
mux.Lock()
if len(buffer) > 0 {
out <- buffer
}
out <- item
close(out)
stop = true
mux.Unlock()
return
default:
listenMutex.Lock()
l := listen
listenMutex.Unlock()

mux.Lock()
if l {
buffer = append(buffer, item)
}
mux.Unlock()
}
}
mux.Lock()
if len(buffer) > 0 {
out <- buffer
}
close(out)
stop = true
mux.Unlock()
}()

}()
return &observable{ch: out}
}

// BufferWithTimeOrCount returns an Observable that emits buffers of items it collects
// from the source Observable. The resulting Observable emits connected,
// non-overlapping buffers, each of a fixed duration specified by the timespan argument
// or a maximum size specified by the count argument (whichever is reached first).
// When the source Observable completes or encounters an error, the resulting Observable
// emits the current buffer and propagates the notification from the source Observable.
func (o *observable) BufferWithTimeOrCount(timespan Duration, count int) Observable {
out := make(chan interface{})
go func() {
if timespan == nil || timespan.duration() == 0 {
out <- errors.New(errors.IllegalInputError, "timespan must not be nil")
close(out)
return
}

if count <= 0 {
out <- errors.New(errors.IllegalInputError, "count must be positive")
close(out)
return
}

sendCh := make(chan []interface{})
errCh := make(chan error)
buffer := make([]interface{}, 0)
var bufferMutex sync.Mutex

// First sender goroutine
go func() {
for {
select {
case currentBuffer := <-sendCh:
out <- currentBuffer
case error := <-errCh:
if len(buffer) > 0 {
out <- buffer
}
if error != nil {
out <- error
}
close(out)
return
case <-time.After(timespan.duration()): // Send on timer
bufferMutex.Lock()
b := make([]interface{}, len(buffer))
copy(b, buffer)
buffer = make([]interface{}, 0)
bufferMutex.Unlock()

out <- b
}
}
}()

// Second goroutine in charge to retrieve the items from the source observable
go func() {
for item := range o.ch {
switch item := item.(type) {
case error:
errCh <- item
return
default:
bufferMutex.Lock()
buffer = append(buffer, item)
if len(buffer) >= count {
b := make([]interface{}, len(buffer))
copy(b, buffer)
buffer = make([]interface{}, 0)
bufferMutex.Unlock()

sendCh <- b
} else {
bufferMutex.Unlock()
}
}
}
errCh <- nil
}()

}()
return &observable{ch: out}
}

0 comments on commit a259341

Please sign in to comment.