Skip to content
This repository has been archived by the owner on Jul 2, 2021. It is now read-only.

add remove a single subscriber #38

Merged
merged 2 commits into from
Jul 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions snail-kotlin/src/main/java/com/compass/snail/Fail.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package com.compass.snail
import kotlinx.coroutines.CoroutineDispatcher

open class Fail<T>(private val _error: Throwable) : Observable<T>() {
override fun subscribe(dispatcher: CoroutineDispatcher?, next: ((T) -> Unit)?, error: ((Throwable) -> Unit)?, done: (() -> Unit)?) {
notify(Subscriber(dispatcher, createHandler(next, error, done)), Event(error = _error))
override fun subscribe(dispatcher: CoroutineDispatcher?, next: ((T) -> Unit)?, error: ((Throwable) -> Unit)?, done: (() -> Unit)?): Subscriber<T> {
val subscriber = Subscriber(dispatcher, createHandler(next, error, done))
notify(subscriber, Event(error = _error))
return subscriber
}
}
3 changes: 2 additions & 1 deletion snail-kotlin/src/main/java/com/compass/snail/IObservable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ package com.compass.snail
import kotlinx.coroutines.CoroutineDispatcher

interface IObservable<T> {
fun subscribe(dispatcher: CoroutineDispatcher? = null, next: ((T) -> Unit)? = null, error: ((Throwable) -> Unit)? = null, done: (() -> Unit)? = null)
fun subscribe(dispatcher: CoroutineDispatcher? = null, next: ((T) -> Unit)? = null, error: ((Throwable) -> Unit)? = null, done: (() -> Unit)? = null): Subscriber<T>
fun on(dispatcher: CoroutineDispatcher): Observable<T>
fun next(value: T)
fun error(error: Throwable)
fun done()
fun removeSubscribers()
fun removeSubscriber(subscriber: Subscriber<T>)
suspend fun block(): BlockResult<T>
fun throttle(delayMs: Long): Observable<T>
fun debounce(delayMs: Long): Observable<T>
Expand Down
8 changes: 5 additions & 3 deletions snail-kotlin/src/main/java/com/compass/snail/Just.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package com.compass.snail
import kotlinx.coroutines.CoroutineDispatcher

open class Just<T>(private val value: T) : Observable<T>() {
override fun subscribe(dispatcher: CoroutineDispatcher?, next: ((T) -> Unit)?, error: ((Throwable) -> Unit)?, done: (() -> Unit)?) {
notify(Subscriber(dispatcher, createHandler(next, error, done)), Event(next = Next(value)))
notify(Subscriber(dispatcher, createHandler(next, error, done)), Event(done = true))
override fun subscribe(dispatcher: CoroutineDispatcher?, next: ((T) -> Unit)?, error: ((Throwable) -> Unit)?, done: (() -> Unit)?): Subscriber<T> {
val subscriber = Subscriber(dispatcher, createHandler(next, error, done))
notify(subscriber, Event(next = Next(value)))
notify(subscriber, Event(done = true))
return subscriber
}
}
14 changes: 10 additions & 4 deletions snail-kotlin/src/main/java/com/compass/snail/Observable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ open class Observable<T> : IObservable<T> {
private var stoppedEvent: Event<T>? = null
private var subscribers: MutableList<Subscriber<T>> = mutableListOf()

override fun subscribe(dispatcher: CoroutineDispatcher?, next: ((T) -> Unit)?, error: ((Throwable) -> Unit)?, done: (() -> Unit)?) {
override fun subscribe(dispatcher: CoroutineDispatcher?, next: ((T) -> Unit)?, error: ((Throwable) -> Unit)?, done: (() -> Unit)?) : Subscriber<T> {
val subscriber = Subscriber(dispatcher, createHandler(next, error, done))
stoppedEvent?.let {
notify(Subscriber(dispatcher, createHandler(next, error, done)), it)
return
notify(subscriber, it)
return subscriber
}
subscribers.add(Subscriber(dispatcher, createHandler(next, error, done)))
subscribers.add(subscriber)
return subscriber
}

override fun on(dispatcher: CoroutineDispatcher): Observable<T> {
Expand All @@ -46,6 +48,10 @@ open class Observable<T> : IObservable<T> {
subscribers.clear()
}

override fun removeSubscriber(subscriber: Subscriber<T>) {
subscribers.remove(subscriber)
}

private fun on(event: Event<T>) {
if (stoppedEvent != null) return

Expand Down
4 changes: 2 additions & 2 deletions snail-kotlin/src/main/java/com/compass/snail/Replay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import kotlinx.coroutines.CoroutineDispatcher
open class Replay<T>(private val threshold: Int) : Observable<T>() {
private var values: MutableList<T> = mutableListOf()

override fun subscribe(dispatcher: CoroutineDispatcher?, next: ((T) -> Unit)?, error: ((Throwable) -> Unit)?, done: (() -> Unit)?) {
super.subscribe(dispatcher, next, error, done)
override fun subscribe(dispatcher: CoroutineDispatcher?, next: ((T) -> Unit)?, error: ((Throwable) -> Unit)?, done: (() -> Unit)?): Subscriber<T> {
replay(dispatcher, createHandler(next, error, done))
return super.subscribe(dispatcher, next, error, done)
}

override fun next(value: T) {
Expand Down
30 changes: 30 additions & 0 deletions snail-kotlin/src/test/java/com/compass/snail/ObservableTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,36 @@ class ObservableTests {
assertEquals("2", strings?.get(1))
}

@Test
fun testRemoveSubscriber() {
val listOfString = mutableListOf<String>()
val listOfInt = mutableListOf<Int>()
val stringSubscriber = subject?.subscribe(next = { listOfString.add(it) })
val intSubscriber = subject?.subscribe(next = { listOfInt.add(it.toInt()) })
subject?.next("1")

assertEquals(1, listOfString.size)
assertEquals(1, listOfInt.size)
assertEquals("1", listOfString[0])
assertEquals(1, listOfInt[0])

intSubscriber?.let { subject?.removeSubscriber(it) }
subject?.next("2")
assertEquals(2, listOfString.size)
assertEquals(1, listOfInt.size)
assertEquals("1", listOfString[0])
assertEquals("2", listOfString[1])
assertEquals(1, listOfInt[0])

stringSubscriber?.let { subject?.removeSubscriber(it) }
subject?.next("3")
assertEquals(2, listOfString.size)
assertEquals(1, listOfInt.size)
assertEquals("1", listOfString[0])
assertEquals("2", listOfString[1])
assertEquals(1, listOfInt[0])
}

@Test
fun testMultipleSubscribers() {
val more = mutableListOf<String>()
Expand Down