-
Notifications
You must be signed in to change notification settings - Fork 180
/
ReactiveStreamsAdapters.java
185 lines (154 loc) · 6.32 KB
/
ReactiveStreamsAdapters.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.reactivestreams;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static java.util.Objects.requireNonNull;
/**
* A set of adapter methods for converting to and from
* <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive Streams</a> APIs and ServiceTalk APIs.
*/
public final class ReactiveStreamsAdapters {
private ReactiveStreamsAdapters() {
// No instances.
}
/**
* Converts the passed {@link Publisher} to a
* <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive Streams</a>
* {@link org.reactivestreams.Publisher}.
*
* @param source {@link org.reactivestreams.Publisher} to convert to a {@link Publisher}.
* @param <T> Type of items emitted from the {@code source} and the returned {@link Publisher}.
* @return A {@link Publisher} representation of the passed {@link org.reactivestreams.Publisher}.
*/
public static <T> Publisher<T> fromReactiveStreamsPublisher(org.reactivestreams.Publisher<T> source) {
return new RsToStPublisher<>(source);
}
/**
* Converts the passed {@link Publisher} to a
* <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive Streams</a>
* {@link org.reactivestreams.Publisher}.
*
* @param publisher {@link Publisher} to convert to a {@link Publisher}.
* @param <T> Type of items emitted from the {@code source} and the returned {@link Publisher}.
* @return A {@link org.reactivestreams.Publisher} representation of the passed {@link Publisher}.
*/
public static <T> org.reactivestreams.Publisher<T> toReactiveStreamsPublisher(Publisher<T> publisher) {
return new StToRsPublisher<>(toSource(publisher));
}
/**
* Converts the passed {@link PublisherSource} to a
* <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive Streams</a>
* {@link org.reactivestreams.Publisher}.
*
* @param source {@link PublisherSource} to convert to a {@link org.reactivestreams.Publisher}.
* @param <T> Type of items emitted from the {@code source} and the returned {@link org.reactivestreams.Publisher}.
* @return A {@link org.reactivestreams.Publisher} representation of the passed {@link PublisherSource}.
*/
public static <T> org.reactivestreams.Publisher<T> toReactiveStreamsPublisher(PublisherSource<T> source) {
return new StToRsPublisher<>(source);
}
private static final class StToRsPublisher<T> implements org.reactivestreams.Publisher<T> {
private final PublisherSource<T> source;
StToRsPublisher(final PublisherSource<T> source) {
this.source = requireNonNull(source);
}
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
source.subscribe(new RsToStSubscriber<>(subscriber));
}
}
private static final class RsToStSubscriber<T> implements PublisherSource.Subscriber<T> {
private final Subscriber<? super T> subscriber;
RsToStSubscriber(final Subscriber<? super T> subscriber) {
this.subscriber = requireNonNull(subscriber);
}
@Override
public void onSubscribe(final PublisherSource.Subscription subscription) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(final long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
});
}
@Override
public void onNext(final T t) {
subscriber.onNext(t);
}
@Override
public void onError(final Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
}
private static final class RsToStPublisher<T> extends Publisher<T> {
private final org.reactivestreams.Publisher<T> source;
RsToStPublisher(final org.reactivestreams.Publisher<T> source) {
this.source = requireNonNull(source);
}
@Override
protected void handleSubscribe(final PublisherSource.Subscriber<? super T> subscriber) {
source.subscribe(new StToRsSubscriber<>(subscriber));
}
}
private static final class RsToStSubscription implements PublisherSource.Subscription {
private final Subscription s;
RsToStSubscription(final Subscription s) {
this.s = s;
}
@Override
public void request(final long n) {
s.request(n);
}
@Override
public void cancel() {
s.cancel();
}
}
private static final class StToRsSubscriber<T> implements Subscriber<T> {
private final PublisherSource.Subscriber<? super T> subscriber;
StToRsSubscriber(final PublisherSource.Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSubscribe(final Subscription s) {
subscriber.onSubscribe(new RsToStSubscription(s));
}
@Override
public void onNext(final T t) {
subscriber.onNext(t);
}
@Override
public void onError(final Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
}
}