-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
PublishSubjectExampleActivity.java
131 lines (103 loc) · 3.98 KB
/
PublishSubjectExampleActivity.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.rxjava2.android.samples.ui.operators;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import com.rxjava2.android.samples.R;
import com.rxjava2.android.samples.utils.AppConstant;
import androidx.appcompat.app.AppCompatActivity;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.PublishSubject;
/**
* Created by amitshekhar on 17/12/16.
*/
public class PublishSubjectExampleActivity extends AppCompatActivity {
private static final String TAG = PublishSubjectExampleActivity.class.getSimpleName();
Button btn;
TextView textView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_example);
btn = findViewById(R.id.btn);
textView = findViewById(R.id.textView);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
doSomeWork();
}
});
}
/* PublishSubject emits to an observer only those items that are emitted
* by the source Observable, subsequent to the time of the subscription.
*/
private void doSomeWork() {
PublishSubject<Integer> source = PublishSubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 and onComplete
source.onNext(1);
source.onNext(2);
source.onNext(3);
/*
* it will emit 4 and onComplete for second observer also.
*/
source.subscribe(getSecondObserver());
source.onNext(4);
source.onComplete();
}
private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " First onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" First onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" First onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" First onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onComplete");
}
};
}
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed());
Log.d(TAG, " Second onSubscribe : " + d.isDisposed());
textView.append(AppConstant.LINE_SEPARATOR);
}
@Override
public void onNext(Integer value) {
textView.append(" Second onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" Second onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" Second onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onComplete");
}
};
}
}