From a2dd674b1fa8d5d8d7e50131cc90f3418f3ee674 Mon Sep 17 00:00:00 2001 From: Teiva Harsanyi Date: Sun, 4 Oct 2020 22:00:38 +0200 Subject: [PATCH] Find operator --- README.md | 1 + doc/find.md | 35 +++++++++++++++++++++++++++++++++++ observable.go | 1 + observable_operator.go | 30 ++++++++++++++++++++++++++++++ observable_operator_test.go | 20 ++++++++++++++++++++ 5 files changed, 87 insertions(+) create mode 100644 doc/find.md diff --git a/README.md b/README.md index d3079279..024c1785 100644 --- a/README.md +++ b/README.md @@ -450,6 +450,7 @@ How to use the [assert API](doc/assert.md) to write unit tests while using RxGo. * [Distinct](doc/distinct.md)/[DistinctUntilChanged](doc/distinctuntilchanged.md) — suppress duplicate items emitted by an Observable * [ElementAt](doc/elementat.md) — emit only item n emitted by an Observable * [Filter](doc/filter.md) — emit only those items from an Observable that pass a predicate test +* [Find](doc/find.md) — emit the first item passing a predicate then complete * [First](doc/first.md)/[FirstOrDefault](doc/firstordefault.md) — emit only the first item or the first item that meets a condition, from an Observable * [IgnoreElements](doc/ignoreelements.md) — do not emit any items from an Observable but mirror its termination notification * [Last](doc/last.md)/[LastOrDefault](doc/lastordefault.md) — emit only the last item emitted by an Observable diff --git a/doc/find.md b/doc/find.md new file mode 100644 index 00000000..d12cefc8 --- /dev/null +++ b/doc/find.md @@ -0,0 +1,35 @@ +# Find Operator + +## Overview + +Emit the first item passing a predicate then complete. + +## Example + +```go +observable := rxgo.Just(1, 2, 3)().Find(func(i interface{}) bool { + return i == 2 +}) +``` + +Output: + +``` +2 +``` + +## Options + +* [WithBufferedChannel](options.md#withbufferedchannel) + +* [WithContext](options.md#withcontext) + +* [WithPool](options.md#withpool) + +* [WithCPUPool](options.md#withcpupool) + +* [WithObservationStrategy](options.md#withobservationstrategy) + +* [WithErrorStrategy](options.md#witherrorstrategy) + +* [WithPublishStrategy](options.md#withpublishstrategy) \ No newline at end of file diff --git a/observable.go b/observable.go index 8f628319..892e5690 100644 --- a/observable.go +++ b/observable.go @@ -40,6 +40,7 @@ type Observable interface { Error(opts ...Option) error Errors(opts ...Option) []error Filter(apply Predicate, opts ...Option) Observable + Find(find Predicate, opts ...Option) OptionalSingle First(opts ...Option) OptionalSingle FirstOrDefault(defaultValue interface{}, opts ...Option) Single FlatMap(apply ItemToObservable, opts ...Option) Observable diff --git a/observable_operator.go b/observable_operator.go index b9f5c6d9..777bcfa8 100644 --- a/observable_operator.go +++ b/observable_operator.go @@ -1020,6 +1020,36 @@ func (op *filterOperator) end(_ context.Context, _ chan<- Item) { func (op *filterOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) { } +// Find emits the first item passing a predicate then complete. +func (o *ObservableImpl) Find(find Predicate, opts ...Option) OptionalSingle { + return optionalSingle(o, func() operator { + return &findOperator{ + find: find, + } + }, true, true, opts...) +} + +type findOperator struct { + find Predicate +} + +func (op *findOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { + if op.find(item.V) { + item.SendContext(ctx, dst) + operatorOptions.stop() + } +} + +func (op *findOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { + defaultErrorFuncOperator(ctx, item, dst, operatorOptions) +} + +func (op *findOperator) end(_ context.Context, _ chan<- Item) { +} + +func (op *findOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) { +} + // First returns new Observable which emit only first item. // Cannot be run in parallel. func (o *ObservableImpl) First(opts ...Option) OptionalSingle { diff --git a/observable_operator_test.go b/observable_operator_test.go index 8ff5bc96..6b11c83f 100644 --- a/observable_operator_test.go +++ b/observable_operator_test.go @@ -676,6 +676,26 @@ func Test_Observable_Filter_Parallel(t *testing.T) { Assert(ctx, t, obs, HasItemsNoOrder(2, 4), HasNoError()) } +func Test_Observable_Find_NotEmpty(t *testing.T) { + defer goleak.VerifyNone(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + obs := testObservable(ctx, 1, 2, 3).Find(func(i interface{}) bool { + return i == 2 + }) + Assert(ctx, t, obs, HasItem(2)) +} + +func Test_Observable_Find_Empty(t *testing.T) { + defer goleak.VerifyNone(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + obs := Empty().Find(func(_ interface{}) bool { + return true + }) + Assert(ctx, t, obs, IsEmpty()) +} + func Test_Observable_First_NotEmpty(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background())