Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Observable.buffer skip logic #571

Merged
merged 1 commit into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.utils.SharedList
import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.utils.queue.SharedQueue

fun <T> Observable<T>.buffer(count: Int, skip: Int = 0): Observable<List<T>> {
/**
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#buffer-int-int-)
*/
fun <T> Observable<T>.buffer(count: Int, skip: Int = count): Observable<List<T>> {
require(count > 0) { "Count value must be positive" }
require(skip >= 0) { "Skip value must not be negative" }
require(skip > 0) { "Skip value must be positive" }

return observable { emitter ->
val list = SharedList<T>(count)
val skipCounter = AtomicInt(0)
val listQueue = SharedQueue<SharedList<T>>()
val skipCounter = AtomicInt(1)

subscribe(
object : ObservableObserver<T>, ErrorCallback by emitter {
Expand All @@ -20,26 +24,29 @@ fun <T> Observable<T>.buffer(count: Int, skip: Int = 0): Observable<List<T>> {
}

override fun onNext(value: T) {
if (skipCounter.value > 0) {
skipCounter.addAndGet(-1)
} else {
list += value
if (list.size == count) {
emitter.onNext(list.toList())
list.clear()
if (skip > 0) {
skipCounter.value = skip
}
}
if (skipCounter.addAndGet(-1) == 0) {
skipCounter.value = skip
listQueue.offer(SharedList())
}

listQueue.forEach { it += value }

if (listQueue.peek?.size == count) {
pollAndEmit()
}
}

override fun onComplete() {
if (list.isNotEmpty()) {
emitter.onNext(list)
while (!listQueue.isEmpty) {
pollAndEmit()
}
emitter.onComplete()
}

private fun pollAndEmit() {
val list = listQueue.poll()!!
emitter.onNext(list)
}
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun emits_first_batch_WHEN_skip_set_and_count_reached() {
val observer = upstream.buffer(count = 3, skip = 3).test()
val observer = upstream.buffer(count = 3, skip = 4).test()

upstream.onNext(0, null, 1)

Expand All @@ -74,7 +74,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun skips_values_WHEN_skip_set() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1, null, 2, null, 3, null, 4, null, 5, null, 6)

Expand All @@ -83,7 +83,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun emits_last_buffered_values_WHEN_skip_not_set_and_upstream_completed() {
val observer = upstream.buffer(count = 3, skip = 0).test()
val observer = upstream.buffer(count = 3).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -95,7 +95,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun completes_WHEN_skip_not_set_and_buffered_values_and_upstream_completed() {
val observer = upstream.buffer(count = 3, skip = 0).test()
val observer = upstream.buffer(count = 3).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -107,7 +107,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun emits_last_buffered_values_WHEN_skip_set_and_upstream_completed() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -119,7 +119,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun completes_WHEN_skip_set_and_buffered_values_and_upstream_completed() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand Down Expand Up @@ -156,7 +156,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun does_not_emit_anything_WHEN_completed_while_skipping() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -166,10 +166,9 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl
observer.assertNoValues()
}


@Test
fun completes_WHEN_completed_while_skipping() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -178,4 +177,44 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

observer.assertComplete()
}

@Test
fun emits_overlapping_values_WHEN_skip_is_less_than_count() {
val observer = upstream.buffer(count = 4, skip = 2).test()

upstream.onNext(0, 1, 2, 3, 4, 5, 6, 7)
upstream.onComplete()

observer.assertValues(listOf(0, 1, 2, 3), listOf(2, 3, 4, 5), listOf(4, 5, 6, 7), listOf(6, 7))
}

@Test
fun emits_not_overlapping_values_WHEN_skip_is_equal_to_count() {
val observer = upstream.buffer(count = 4, skip = 4).test()

upstream.onNext(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
upstream.onComplete()

observer.assertValues(listOf(0, 1, 2, 3), listOf(4, 5, 6, 7), listOf(8, 9))
}

@Test
fun emits_values_with_gaps_WHEN_skip_is_more_than_count() {
val observer = upstream.buffer(count = 4, skip = 6).test()

upstream.onNext(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
upstream.onComplete()

observer.assertValues(listOf(0, 1, 2, 3), listOf(6, 7, 8, 9), listOf(12, 13))
}

@Test
fun emits_all_pending_buffers_WHEN_upstream_completed() {
val observer = upstream.buffer(count = 7, skip = 2).test()

upstream.onNext(0, 1, 2, 3, 4, 5, 6)
upstream.onComplete()

observer.assertValues(listOf(0, 1, 2, 3, 4, 5, 6), listOf(2, 3, 4, 5, 6), listOf(4, 5, 6), listOf(6))
}
}