Skip to content

Commit

Permalink
Merge pull request #571 from arkivanov/fix-buffer-skip-logic
Browse files Browse the repository at this point in the history
Fix Observable.buffer skip logic
  • Loading branch information
Arkadii Ivanov committed Dec 14, 2020
2 parents 27ac9b6 + 8b3e3ba commit 43a6520
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ import com.badoo.reaktive.base.ErrorCallback
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.utils.SharedList
import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.utils.queue.SharedQueue

fun <T> Observable<T>.buffer(count: Int, skip: Int = 0): Observable<List<T>> {
/**
* Please refer to the corresponding RxJava [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#buffer-int-int-)
*/
fun <T> Observable<T>.buffer(count: Int, skip: Int = count): Observable<List<T>> {
require(count > 0) { "Count value must be positive" }
require(skip >= 0) { "Skip value must not be negative" }
require(skip > 0) { "Skip value must be positive" }

return observable { emitter ->
val list = SharedList<T>(count)
val skipCounter = AtomicInt(0)
val listQueue = SharedQueue<SharedList<T>>()
val skipCounter = AtomicInt(1)

subscribe(
object : ObservableObserver<T>, ErrorCallback by emitter {
Expand All @@ -20,26 +24,29 @@ fun <T> Observable<T>.buffer(count: Int, skip: Int = 0): Observable<List<T>> {
}

override fun onNext(value: T) {
if (skipCounter.value > 0) {
skipCounter.addAndGet(-1)
} else {
list += value
if (list.size == count) {
emitter.onNext(list.toList())
list.clear()
if (skip > 0) {
skipCounter.value = skip
}
}
if (skipCounter.addAndGet(-1) == 0) {
skipCounter.value = skip
listQueue.offer(SharedList())
}

listQueue.forEach { it += value }

if (listQueue.peek?.size == count) {
pollAndEmit()
}
}

override fun onComplete() {
if (list.isNotEmpty()) {
emitter.onNext(list)
while (!listQueue.isEmpty) {
pollAndEmit()
}
emitter.onComplete()
}

private fun pollAndEmit() {
val list = listQueue.poll()!!
emitter.onNext(list)
}
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun emits_first_batch_WHEN_skip_set_and_count_reached() {
val observer = upstream.buffer(count = 3, skip = 3).test()
val observer = upstream.buffer(count = 3, skip = 4).test()

upstream.onNext(0, null, 1)

Expand All @@ -74,7 +74,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun skips_values_WHEN_skip_set() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1, null, 2, null, 3, null, 4, null, 5, null, 6)

Expand All @@ -83,7 +83,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun emits_last_buffered_values_WHEN_skip_not_set_and_upstream_completed() {
val observer = upstream.buffer(count = 3, skip = 0).test()
val observer = upstream.buffer(count = 3).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -95,7 +95,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun completes_WHEN_skip_not_set_and_buffered_values_and_upstream_completed() {
val observer = upstream.buffer(count = 3, skip = 0).test()
val observer = upstream.buffer(count = 3).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -107,7 +107,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun emits_last_buffered_values_WHEN_skip_set_and_upstream_completed() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -119,7 +119,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun completes_WHEN_skip_set_and_buffered_values_and_upstream_completed() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand Down Expand Up @@ -156,7 +156,7 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

@Test
fun does_not_emit_anything_WHEN_completed_while_skipping() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -166,10 +166,9 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl
observer.assertNoValues()
}


@Test
fun completes_WHEN_completed_while_skipping() {
val observer = upstream.buffer(count = 3, skip = 2).test()
val observer = upstream.buffer(count = 3, skip = 5).test()

upstream.onNext(0, null, 1)
observer.reset()
Expand All @@ -178,4 +177,44 @@ class BufferCountSkipTest : ObservableToObservableTests by ObservableToObservabl

observer.assertComplete()
}

@Test
fun emits_overlapping_values_WHEN_skip_is_less_than_count() {
val observer = upstream.buffer(count = 4, skip = 2).test()

upstream.onNext(0, 1, 2, 3, 4, 5, 6, 7)
upstream.onComplete()

observer.assertValues(listOf(0, 1, 2, 3), listOf(2, 3, 4, 5), listOf(4, 5, 6, 7), listOf(6, 7))
}

@Test
fun emits_not_overlapping_values_WHEN_skip_is_equal_to_count() {
val observer = upstream.buffer(count = 4, skip = 4).test()

upstream.onNext(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
upstream.onComplete()

observer.assertValues(listOf(0, 1, 2, 3), listOf(4, 5, 6, 7), listOf(8, 9))
}

@Test
fun emits_values_with_gaps_WHEN_skip_is_more_than_count() {
val observer = upstream.buffer(count = 4, skip = 6).test()

upstream.onNext(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
upstream.onComplete()

observer.assertValues(listOf(0, 1, 2, 3), listOf(6, 7, 8, 9), listOf(12, 13))
}

@Test
fun emits_all_pending_buffers_WHEN_upstream_completed() {
val observer = upstream.buffer(count = 7, skip = 2).test()

upstream.onNext(0, 1, 2, 3, 4, 5, 6)
upstream.onComplete()

observer.assertValues(listOf(0, 1, 2, 3, 4, 5, 6), listOf(2, 3, 4, 5, 6), listOf(4, 5, 6), listOf(6))
}
}

0 comments on commit 43a6520

Please sign in to comment.