-
Notifications
You must be signed in to change notification settings - Fork 24
/
ThrottleFirstOperatorActivity.kt
93 lines (81 loc) · 3.22 KB
/
ThrottleFirstOperatorActivity.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.freeankit.rxkotlinoperators.ui.RxOperators
import android.os.Bundle
import android.support.v7.app.AppCompatActivity
import android.util.Log
import com.freeankit.rxkotlinoperators.R
import com.freeankit.rxkotlinoperators.utils.Constant
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import kotlinx.android.synthetic.main.activity_example_operator.*
import java.util.concurrent.TimeUnit
/**
* @author Ankit Kumar (ankitdroiddeveloper@gmail.com) on 15/02/2018 (MM/DD/YYYY )
*/
class ThrottleFirstOperatorActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_example_operator)
btn.setOnClickListener({ executeThrottleFirstOperator() })
}
/*
* Using throttleFirst() -> if the source Observable has emitted no items since
* the last time it was sampled, the Observable that results from this operator
* will emit no item for that sampling period.
*/
private fun executeThrottleFirstOperator() {
getObservable()
.throttleFirst(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver())
}
private fun getObservable(): Observable<Int> {
return Observable.create { emitter ->
// send events with simulated time wait
Thread.sleep(0)
emitter.onNext(1) // deliver
emitter.onNext(2) // skip
Thread.sleep(505)
emitter.onNext(3) // deliver
Thread.sleep(99)
emitter.onNext(4) // skip
Thread.sleep(100)
emitter.onNext(5) // skip
emitter.onNext(6) // skip
Thread.sleep(305)
emitter.onNext(7) // deliver
Thread.sleep(510)
emitter.onComplete()
}
}
private fun getObserver(): Observer<Int> {
return object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
Log.d(Constant().TAG, " onSubscribe : " + d.isDisposed)
}
override fun onNext(value: Int) {
textView.append(" onNext : ")
textView.append(Constant().LINE_SEPARATOR)
textView.append(" value : " + value)
textView.append(Constant().LINE_SEPARATOR)
Log.d(Constant().TAG, " onNext ")
Log.d(Constant().TAG, " value : " + value)
}
override fun onError(e: Throwable) {
textView.append(" onError : " + e.message)
textView.append(Constant().LINE_SEPARATOR)
Log.d(Constant().TAG, " onError : " + e.message)
}
override fun onComplete() {
textView.append(" onComplete")
textView.append(Constant().LINE_SEPARATOR)
Log.d(Constant().TAG, " onComplete")
}
}
}
}