Skip to content

Commit

Permalink
Merge 34eebd4 into a259341
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Dec 4, 2018
2 parents a259341 + 34eebd4 commit f79afb4
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
99 changes: 99 additions & 0 deletions observable.go
Expand Up @@ -54,6 +54,9 @@ type Observable interface {
SkipLast(nth uint) Observable
SkipWhile(apply Predicate) Observable
Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer
SumFloat32() Single
SumFloat64() Single
SumInt64() Single
Take(nth uint) Observable
TakeLast(nth uint) Observable
TakeWhile(apply Predicate) Observable
Expand Down Expand Up @@ -1181,3 +1184,99 @@ func (o *observable) BufferWithTimeOrCount(timespan Duration, count int) Observa
}()
return &observable{ch: out}
}

// SumInt64 calculates the average of integers emitted by an Observable and emits an int64.
func (o *observable) SumInt64() Single {
out := make(chan interface{})
go func() {
var sum int64
for item := range o.ch {
switch item := item.(type) {
case int:
sum = sum + int64(item)
case int8:
sum = sum + int64(item)
case int16:
sum = sum + int64(item)
case int32:
sum = sum + int64(item)
case int64:
sum = sum + item
default:
out <- errors.New(errors.IllegalInputError,
fmt.Sprintf("expected type: int, int8, int16, int32 or int64, got %t", item))
close(out)
return
}
}
out <- sum
close(out)
}()
return NewSingleFromChannel(out)
}

// SumFloat32 calculates the average of float32 emitted by an Observable and emits a float32.
func (o *observable) SumFloat32() Single {
out := make(chan interface{})
go func() {
var sum float32
for item := range o.ch {
switch item := item.(type) {
case int:
sum = sum + float32(item)
case int8:
sum = sum + float32(item)
case int16:
sum = sum + float32(item)
case int32:
sum = sum + float32(item)
case int64:
sum = sum + float32(item)
case float32:
sum = sum + item
default:
out <- errors.New(errors.IllegalInputError,
fmt.Sprintf("expected type: float32, int, int8, int16, int32 or int64, got %t", item))
close(out)
return
}
}
out <- sum
close(out)
}()
return NewSingleFromChannel(out)
}

// SumFloat64 calculates the average of float64 emitted by an Observable and emits a float64.
func (o *observable) SumFloat64() Single {
out := make(chan interface{})
go func() {
var sum float64
for item := range o.ch {
switch item := item.(type) {
case int:
sum = sum + float64(item)
case int8:
sum = sum + float64(item)
case int16:
sum = sum + float64(item)
case int32:
sum = sum + float64(item)
case int64:
sum = sum + float64(item)
case float32:
sum = sum + float64(item)
case float64:
sum = sum + item
default:
out <- errors.New(errors.IllegalInputError,
fmt.Sprintf("expected type: float32, float64, int, int8, int16, int32 or int64, got %t", item))
close(out)
return
}
}
out <- sum
close(out)
}()
return NewSingleFromChannel(out)
}
26 changes: 26 additions & 0 deletions observable_test.go
Expand Up @@ -1779,3 +1779,29 @@ func TestBufferWithTimeOrCountWithError(t *testing.T) {
AssertThatObservable(t, obs, HasItems([]interface{}{1, 2}, []interface{}{3}),
HasRaisedAnError())
}

func TestSumInt64(t *testing.T) {
AssertThatSingle(t, Just(1, 2, 3).SumInt64(), HasValue(int64(6)))
AssertThatSingle(t, Just(int8(1), int(2), int16(3), int32(4), int64(5)).SumInt64(),
HasValue(int64(15)))
AssertThatSingle(t, Just(1.1, 2.2, 3.3).SumInt64(), HasRaisedAnError())
AssertThatSingle(t, Empty().SumInt64(), HasValue(int64(0)))
}

func TestSumFloat32(t *testing.T) {
AssertThatSingle(t, Just(float32(1.0), float32(2.0), float32(3.0)).SumFloat32(),
HasValue(float32(6.)))
AssertThatSingle(t, Just(float32(1.1), 2, int8(3), int16(1), int32(1), int64(1)).SumFloat32(),
HasValue(float32(9.1)))
AssertThatSingle(t, Just(1.1, 2.2, 3.3).SumFloat32(), HasRaisedAnError())
AssertThatSingle(t, Empty().SumFloat32(), HasValue(float32(0)))
}

func TestSumFloat64(t *testing.T) {
AssertThatSingle(t, Just(1.1, 2.2, 3.3).SumFloat64(),
HasValue(6.6))
AssertThatSingle(t, Just(float32(1.0), 2, int8(3), 4., int16(1), int32(1), int64(1)).SumFloat64(),
HasValue(float64(13.)))
AssertThatSingle(t, Just("x").SumFloat64(), HasRaisedAnError())
AssertThatSingle(t, Empty().SumFloat64(), HasValue(float64(0)))
}

0 comments on commit f79afb4

Please sign in to comment.