In [1]:
fun iterPub(iter: Iterable<Int>) = Publisher<Int> { subscriber ->
    subscriber.onSubscribe(object : Subscription {
        override fun request(n: Long) {
            iter.forEach { subscriber.onNext(it) }
            subscriber.onComplete()
        }

        override fun cancel() {
        }
    })
}

fun logSub() = object : Subscriber<Int> {
    override fun onSubscribe(subscription: Subscription) {
        println("onSubscribe")
        subscription.request(Long.MAX_VALUE)
    }

    override fun onNext(item: Int) {
        println("onNext : $item")
    }

    override fun onComplete() = println("onComplete")
    override fun onError(throwable: Throwable) = println("onError : $throwable")
}

// iterPub -> data1 -> logSub
val iterPub = iterPub(generateSequence(1) { it + 1 }.take(10).toList())
val logSub = logSub()

iterPub.subscribe(logSub)

org.jetbrains.kotlinx.jupyter.exceptions.ReplCompilerException: at Cell In[1], line 1, column 36: Unresolved reference: Publisher
at Cell In[1], line 1, column 53: Cannot infer a type for this parameter. Please specify it explicitly.
at Cell In[1], line 2, column 37: Unresolved reference: Subscription
at Cell In[1], line 3, column 9: 'request' overrides nothing
at Cell In[1], line 8, column 9: 'cancel' overrides nothing
at Cell In[1], line 13, column 25: Unresolved reference: Subscriber
at Cell In[1], line 14, column 5: 'onSubscribe' overrides nothing
at Cell In[1], line 14, column 44: Unresolved reference: Subscription
at Cell In[1], line 19, column 5: 'onNext' overrides nothing
at Cell In[1], line 23, column 5: 'onComplete' overrides nothing
at Cell In[1], line 24, column 5: 'onError' overrides nothing

In [67]:
fun mapPub(pub: Publisher<Int>, op: (Int) -> Int) = Publisher<Int> { sub ->  
    pub.subscribe(
        object : Subscriber<Int> {
            override fun onSubscribe(subscription: Subscription) = sub.onSubscribe(subscription)
            override fun onNext(item: Int) = sub.onNext(op(item))
            override fun onComplete() = sub.onComplete()
            override fun onError(throwable: Throwable) = sub.onError(throwable)
        }
    )
 }
 
// iterPub -> data1 -> mapPub -> data2 -> logSub
// mapPub 은 iterPub 입장에서는 Subscriber, logSub 입장에선 mapPub.
// 따라서 mapPub 은 subscribe 하는 기능도 필요하다.
val iterPub = iterPub(generateSequence(1) { it + 1 }.take(10).toList())
val mapMultiPub = mapPub(iterPub) { it * 10 }
val mapNegativePub = mapPub(mapMultiPub) { -it }
val logSub = logSub()

mapNegativePub.subscribe(logSub)

onSubscribe
onNext : -10
onNext : -20
onNext : -30
onNext : -40
onNext : -50
onNext : -60
onNext : -70
onNext : -80
onNext : -90
onNext : -100
onComplete


In [70]:
// adaptor 패턴. 원하는 것만 오버라이드 할 수 있는 Delegate 클래스.
open class DelegateSub(
    private val sub: Subscriber<in Int>
) : Subscriber<Int> {
    override fun onSubscribe(subscription: Subscription) {
        sub.onSubscribe(subscription)
    }

    override fun onNext(item: Int) {
        sub.onNext(item)
    }
    
    override fun onComplete() {
        sub.onComplete()
    }
    
    override fun onError(throwable: Throwable) {
        sub.onError(throwable)
    }
}

In [71]:
fun mapPubRefactoring(upstreamPub: Publisher<Int>, op: (Int) -> Int) = Publisher<Int> { sub ->  
    upstreamPub.subscribe(
        object : DelegateSub(sub) { override fun onNext(item: Int) = sub.onNext(op(item)) }
    )
 }
 
val plusOnePub = mapPubRefactoring(iterPub) { it + 1 }
val logSub = logSub()

plusOnePub.subscribe(logSub)

onSubscribe
onNext : 2
onNext : 3
onNext : 4
onNext : 5
onNext : 6
onNext : 7
onNext : 8
onNext : 9
onNext : 10
onNext : 11
onComplete


In [72]:
// 합계를 구하는 operator Publisher
fun sumPub(upstreamPub: Publisher<Int>) = object : Publisher<Int> {
    
    // 이 sumPub 에 subscriber 를 subscribe 하면
    override fun subscribe(subscriber: Subscriber<in Int>) {
        // 인자로 받은 upstreamPub 의 subscribe() 에 downStream 의 subscriber 를 연결시키면서
        // 중간에 연산을 추가한다.
        upstreamPub.subscribe(object : DelegateSub(sub) {
            private var sum = 0
            
            override fun onNext(item: Int) {
                sum += item // iterPub 의 onNext 가 불렸을 때 
            }

            override fun onComplete() {
                sub.onNext(sum)
                sub.onComplete()
            }
        })
    }
}

val sumPub = sumPub(iterPub)
val logSub = logSub()

sumPub.subscribe(logSub)

onSubscribe
onNext : 55
onComplete


In [79]:
// reduce operator Publisher
fun reducePub(upstreamPub: Publisher<Int>, init: Int, op: (x: Int, y: Int) -> Int) = object : Publisher<Int> {

    override fun subscribe(subscriber: Subscriber<in Int>) {
        upstreamPub.subscribe(object : DelegateSub(subscriber) {
            private var result = init
            
            override fun onNext(item: Int) {
                result = op(result, item)
            }

            override fun onComplete() {
                subscriber.onNext(result)
                subscriber.onComplete()
            }
        })
    }
}

val reducePub = reducePub(iterPub, 10) { x, y -> x * y }
val logSub = logSub()

reducePub.subscribe(logSub)

onSubscribe
onNext : 36288000
onComplete


In [88]:
// 범용적 Generic mapPub
open class GenericDelegateSub<T>(
    private val sub: Subscriber<in T>
) : Subscriber<T> {
    override fun onSubscribe(subscription: Subscription) {
        sub.onSubscribe(subscription)
    }

    override fun onNext(item: T) {
        sub.onNext(item)
    }
    
    override fun onComplete() {
        sub.onComplete()
    }
    
    override fun onError(throwable: Throwable) {
        sub.onError(throwable)
    }
}

fun <T> iterPubGeneric(iterable: Iterable<T>) = object : Publisher<T> {
    override fun subscribe(subscriber: Subscriber<in T>) {
        subscriber.onSubscribe(object : Subscription {
            override fun request(n: Long) {
                iterable.forEach { subscriber.onNext(it) }
                subscriber.onComplete()
            }

            override fun cancel() {}
        })
    }
}

fun <T> mapPubRefactoringGeneric(upstreamPub: Publisher<T>, op: (T) -> T) = Publisher<T> { sub ->  
    upstreamPub.subscribe(object : GenericDelegateSub<T>(sub) { override fun onNext(item: T) = sub.onNext(op(item)) })
}

fun <T> logSubGeneric() = object : Subscriber<T> {
    override fun onSubscribe(subscription: Subscription) {
        println("onSubscribe")
        subscription.request(Long.MAX_VALUE)
    }

    override fun onNext(item: T) = println("onNext : $item")
    override fun onError(throwable: Throwable) = println("onError")
    override fun onComplete() = println("onComplete")
}

val stringIterPub = iterPubGeneric(listOf("a", "b", "c", "d"))
val stringMapPub = mapPubRefactoringGeneric(stringIterPub) { it + "append" }
val logSubGeneric = logSubGeneric<String>()

stringMapPub.subscribe(logSubGeneric)

onSubscribe
onNext : aappend
onNext : bappend
onNext : cappend
onNext : dappend
onComplete


In [98]:
// 범용적 T -> R 타입변환 Generic mapPub
open class GenericTwoDelegateSub<T,R>(
    private val sub: Subscriber<in R>
) : Subscriber<T> {
    override fun onSubscribe(subscription: Subscription) {
        sub.onSubscribe(subscription)
    }

    override fun onNext(item: T) {
        sub.onNext(item as R)
    }
    
    override fun onComplete() {
        sub.onComplete()
    }
    
    override fun onError(throwable: Throwable) {
        sub.onError(throwable)
    }
}

fun <T,R> mapPubRefactoringGenericTypeChange(upstreamPub: Publisher<T>, op: (T) -> R) = Publisher<R> { sub ->  
    upstreamPub.subscribe(object : GenericTwoDelegateSub<T,R>(sub) { override fun onNext(item: T) = sub.onNext(op(item)) })
}

val stringMapPub = mapPubRefactoringGenericTypeChange(iterPub) { "[$it]" }
val logSubGeneric = logSubGeneric<String>()

stringMapPub.subscribe(logSubGeneric)

onSubscribe
onNext : [1]
onNext : [2]
onNext : [3]
onNext : [4]
onNext : [5]
onNext : [6]
onNext : [7]
onNext : [8]
onNext : [9]
onNext : [10]
onComplete


In [None]:
/**
* Flux : Publisher 인터페이스를 구현한 퍼블리셔
* emitter
* subscribe: nextOn() 한번에 대한 subscribe 메서드
*/

import reactor.core.publisher.Flux

Flux.create { t ->
    t.next(1)
    t.next(2)
    t.next(3)
    t.complete()
}.log()
    .map { it * 10 }
    .log()
    .reduce(0) { t, u -> t + u }
    .log()
    .subscribe { println(it) }