diff --git a/example-client/src/main/java/ua/naiksoftware/stompclientexample/ExampleRepository.java b/example-client/src/main/java/ua/naiksoftware/stompclientexample/ExampleRepository.java index b3beb47..5c0a768 100644 --- a/example-client/src/main/java/ua/naiksoftware/stompclientexample/ExampleRepository.java +++ b/example-client/src/main/java/ua/naiksoftware/stompclientexample/ExampleRepository.java @@ -1,6 +1,6 @@ package ua.naiksoftware.stompclientexample; -import io.reactivex.Flowable; +import io.reactivex.Completable; import retrofit2.http.POST; import retrofit2.http.Query; @@ -10,5 +10,5 @@ public interface ExampleRepository { @POST("hello-convert-and-send") - Flowable sendRestEcho(@Query("msg") String message); + Completable sendRestEcho(@Query("msg") String message); } diff --git a/example-client/src/main/java/ua/naiksoftware/stompclientexample/MainActivity.java b/example-client/src/main/java/ua/naiksoftware/stompclientexample/MainActivity.java index 55ae725..dcc9044 100644 --- a/example-client/src/main/java/ua/naiksoftware/stompclientexample/MainActivity.java +++ b/example-client/src/main/java/ua/naiksoftware/stompclientexample/MainActivity.java @@ -19,7 +19,7 @@ import java.util.List; import java.util.Locale; -import io.reactivex.FlowableTransformer; +import io.reactivex.CompletableTransformer; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; @@ -91,7 +91,7 @@ public void connectStomp(View view) { public void sendEchoViaStomp(View v) { mStompClient.send("/topic/hello-msg-mapping", "Echo STOMP " + mTimeFormat.format(new Date())) .compose(applySchedulers()) - .subscribe(aVoid -> { + .subscribe(() -> { Log.d(TAG, "STOMP echo send successfully"); }, throwable -> { Log.e(TAG, "Error send STOMP echo", throwable); @@ -103,7 +103,7 @@ public void sendEchoViaRest(View v) { mRestPingDisposable = RestClient.getInstance().getExampleRepository() .sendRestEcho("Echo REST " + mTimeFormat.format(new Date())) .compose(applySchedulers()) - .subscribe(aVoid -> { + .subscribe(() -> { Log.d(TAG, "REST echo send successfully"); }, throwable -> { Log.e(TAG, "Error send REST echo", throwable); @@ -122,8 +122,8 @@ private void toast(String text) { Toast.makeText(this, text, Toast.LENGTH_SHORT).show(); } - protected FlowableTransformer applySchedulers() { - return tFlowable -> tFlowable + protected CompletableTransformer applySchedulers() { + return upstream -> upstream .unsubscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); diff --git a/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java index 068963e..37a0722 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java @@ -1,5 +1,6 @@ package ua.naiksoftware.stomp; +import io.reactivex.Completable; import io.reactivex.Flowable; /** @@ -17,7 +18,7 @@ public interface ConnectionProvider { * onError if not connected or error detected will be called, or onCompleted id sending started * TODO: send messages with ACK */ - Flowable send(String stompMessage); + Completable send(String stompMessage); /** * Subscribe this for receive #LifecycleEvent events diff --git a/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java index 9190095..380036b 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java @@ -2,17 +2,14 @@ import android.util.Log; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.TreeMap; import io.reactivex.BackpressureStrategy; +import io.reactivex.Completable; import io.reactivex.Flowable; -import io.reactivex.FlowableEmitter; +import io.reactivex.subjects.PublishSubject; import okhttp3.Headers; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -29,8 +26,9 @@ private final Map mConnectHttpHeaders; private final OkHttpClient mOkHttpClient; - private final List> mLifecycleEmitters; - private final List> mMessagesEmitters; + private final PublishSubject mLifecycleEmitter = PublishSubject.create(); + private final PublishSubject mMessagesEmitter = PublishSubject.create(); + private final Flowable mMessagesFlowable; private WebSocket openedSocked; @@ -38,28 +36,40 @@ /* package */ OkHttpConnectionProvider(String uri, Map connectHttpHeaders, OkHttpClient okHttpClient) { mUri = uri; mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>(); - mLifecycleEmitters = Collections.synchronizedList(new ArrayList<>()); - mMessagesEmitters = new ArrayList<>(); mOkHttpClient = okHttpClient; + + mMessagesFlowable = Flowable.defer(() -> + mMessagesEmitter.toFlowable(BackpressureStrategy.BUFFER) + .doOnSubscribe(s -> createWebSocketConnection()) + .doFinally(() -> { + Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread()); + if (openedSocked != null) openedSocked.close(1000, ""); + openedSocked = null; + }) + ).share(); + } + + @Override + public Completable send(String stompMessage) { + return Completable.create(subscriber -> { + if (openedSocked == null) { + subscriber.onError(new IllegalStateException("Not connected yet")); + } else { + Log.d(TAG, "Send STOMP message: " + stompMessage); + openedSocked.send(stompMessage); + subscriber.onComplete(); + } + }); } @Override public Flowable messages() { - Flowable flowable = Flowable.create(mMessagesEmitters::add, BackpressureStrategy.BUFFER) - .doFinally(() -> { - Iterator> iterator = mMessagesEmitters.iterator(); - while (iterator.hasNext()) { - if (iterator.next().isCancelled()) iterator.remove(); - } + return mMessagesFlowable; + } - if (mMessagesEmitters.size() < 1) { - Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread()); - openedSocked.close(1000, ""); - openedSocked = null; - } - }); - createWebSocketConnection(); - return flowable; + @Override + public Flowable getLifecycleReceiver() { + return mLifecycleEmitter.toFlowable(BackpressureStrategy.BUFFER); } private void createWebSocketConnection() { @@ -105,7 +115,7 @@ public void onClosed(WebSocket webSocket, int code, String reason) { public void onFailure(WebSocket webSocket, Throwable t, Response response) { emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.ERROR, new Exception(t))); } - + @Override public void onClosing(final WebSocket webSocket, final int code, final String reason) { webSocket.close(code, reason); @@ -115,32 +125,6 @@ public void onClosing(final WebSocket webSocket, final int code, final String re ); } - @Override - public Flowable send(String stompMessage) { - return Flowable.create(subscriber -> { - if (openedSocked == null) { - subscriber.onError(new IllegalStateException("Not connected yet")); - } else { - Log.d(TAG, "Send STOMP message: " + stompMessage); - openedSocked.send(stompMessage); - subscriber.onComplete(); - } - }, BackpressureStrategy.BUFFER); - } - - @Override - public Flowable getLifecycleReceiver() { - return Flowable.create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER) - .doFinally(() -> { - synchronized (mLifecycleEmitters) { - Iterator> iterator = mLifecycleEmitters.iterator(); - while (iterator.hasNext()) { - if (iterator.next().isCancelled()) iterator.remove(); - } - } - }); - } - private TreeMap headersAsMap(Response response) { TreeMap headersAsMap = new TreeMap<>(); Headers headers = response.headers(); @@ -157,18 +141,12 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map subscriber : mLifecycleEmitters) { - subscriber.onNext(lifecycleEvent); - } - } + Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name()); + mLifecycleEmitter.onNext(lifecycleEvent); } private void emitMessage(String stompMessage) { Log.d(TAG, "Emit STOMP message: " + stompMessage); - for (FlowableEmitter subscriber : mMessagesEmitters) { - subscriber.onNext(stompMessage); - } + mMessagesEmitter.onNext(stompMessage); } } diff --git a/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java b/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java index 09316d8..f8f1bb6 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java @@ -10,10 +10,8 @@ import org.java_websocket.handshake.ServerHandshake; import java.net.URI; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -21,8 +19,9 @@ import javax.net.ssl.SSLSocketFactory; import io.reactivex.BackpressureStrategy; +import io.reactivex.Completable; import io.reactivex.Flowable; -import io.reactivex.FlowableEmitter; +import io.reactivex.subjects.PublishSubject; /** * Created by naik on 05.05.16. @@ -34,15 +33,13 @@ private final String mUri; private final Map mConnectHttpHeaders; - private final List> mLifecycleEmitters; - private final List> mMessagesEmitters; - + private final PublishSubject mLifecycleEmitter = PublishSubject.create(); + private final PublishSubject mMessagesEmitter = PublishSubject.create(); + private final Flowable mMessagesFlowable; private WebSocketClient mWebSocketClient; private boolean haveConnection; private TreeMap mServerHandshakeHeaders; - private final Object mLifecycleLock = new Object(); - /** * Support UIR scheme ws://host:port/path * @@ -51,26 +48,20 @@ /* package */ WebSocketsConnectionProvider(String uri, Map connectHttpHeaders) { mUri = uri; mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>(); - mLifecycleEmitters = new ArrayList<>(); - mMessagesEmitters = new ArrayList<>(); + + mMessagesFlowable = Flowable.defer(() -> + mMessagesEmitter.toFlowable(BackpressureStrategy.BUFFER) + .doOnSubscribe(s -> createWebSocketConnection()) + .doFinally(() -> { + Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread()); + mWebSocketClient.close(); + }) + ).share(); } @Override public Flowable messages() { - Flowable flowable = Flowable.create(mMessagesEmitters::add, BackpressureStrategy.BUFFER) - .doFinally(() -> { - Iterator> iterator = mMessagesEmitters.iterator(); - while (iterator.hasNext()) { - if (iterator.next().isCancelled()) iterator.remove(); - } - - if (mMessagesEmitters.size() < 1) { - Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread()); - mWebSocketClient.close(); - } - }); - createWebSocketConnection(); - return flowable; + return mMessagesFlowable; } private void createWebSocketConnection() { @@ -134,8 +125,8 @@ public void onError(Exception ex) { } @Override - public Flowable send(String stompMessage) { - return Flowable.create(emitter -> { + public Completable send(String stompMessage) { + return Completable.create(emitter -> { if (mWebSocketClient == null) { emitter.onError(new IllegalStateException("Not connected yet")); } else { @@ -143,35 +134,21 @@ public Flowable send(String stompMessage) { mWebSocketClient.send(stompMessage); emitter.onComplete(); } - }, BackpressureStrategy.BUFFER); + }); } private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) { - synchronized (mLifecycleLock) { - Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name()); - for (FlowableEmitter emitter : mLifecycleEmitters) { - emitter.onNext(lifecycleEvent); - } - } + Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name()); + mLifecycleEmitter.onNext(lifecycleEvent); } private void emitMessage(String stompMessage) { Log.d(TAG, "Emit STOMP message: " + stompMessage); - for (FlowableEmitter emitter : mMessagesEmitters) { - emitter.onNext(stompMessage); - } + mMessagesEmitter.onNext(stompMessage); } @Override public Flowable getLifecycleReceiver() { - return Flowable.create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER) - .doFinally(() -> { - synchronized (mLifecycleLock) { - Iterator> iterator = mLifecycleEmitters.iterator(); - while (iterator.hasNext()) { - if (iterator.next().isCancelled()) iterator.remove(); - } - } - }); + return mLifecycleEmitter.toFlowable(BackpressureStrategy.BUFFER); } } diff --git a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java b/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java index 3c42156..f3d51d3 100644 --- a/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java +++ b/lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java @@ -5,20 +5,18 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import io.reactivex.BackpressureStrategy; +import io.reactivex.Completable; import io.reactivex.Flowable; -import io.reactivex.FlowableEmitter; +import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; -import io.reactivex.flowables.ConnectableFlowable; +import io.reactivex.disposables.Disposables; +import io.reactivex.subjects.BehaviorSubject; +import io.reactivex.subjects.PublishSubject; import ua.naiksoftware.stomp.ConnectionProvider; import ua.naiksoftware.stomp.LifecycleEvent; import ua.naiksoftware.stomp.StompHeader; @@ -28,23 +26,27 @@ */ public class StompClient { - private static final String TAG = StompClient.class.getSimpleName(); - public static final String SUPPORTED_VERSIONS = "1.1,1.0"; public static final String DEFAULT_ACK = "auto"; - - private Disposable mMessagesDisposable; - private Disposable mLifecycleDisposable; - private final Map>> mEmitters = Collections.synchronizedMap(new HashMap<>()); - private List> mWaitConnectionFlowables; + private static final String TAG = StompClient.class.getSimpleName(); + private final CompositeDisposable mCompositeDisposable = new CompositeDisposable(); + private final Map mEmitters = Collections.synchronizedMap(new HashMap<>()); + private final BehaviorSubject mConnectionSignal = BehaviorSubject.create(); private final ConnectionProvider mConnectionProvider; - private HashMap mTopics; + private final Map mTopics = Collections.synchronizedMap(new HashMap<>()); private boolean mConnected; private boolean isConnecting; public StompClient(ConnectionProvider connectionProvider) { mConnectionProvider = connectionProvider; - mWaitConnectionFlowables = new CopyOnWriteArrayList<>(); + } + + public boolean isConnected() { + return mConnected; + } + + public boolean isConnecting() { + return isConnecting; } /** @@ -61,175 +63,166 @@ public void connect(boolean reconnect) { /** * Connect without reconnect if connected * - * @param _headers might be null + * @param headers might be null */ - public void connect(List _headers) { - connect(_headers, false); + public void connect(List headers) { + connect(headers, false); } /** * If already connected and reconnect=false - nope * - * @param _headers might be null + * @param headers might be null */ - public void connect(List _headers, boolean reconnect) { + public void connect(List headers, boolean reconnect) { if (reconnect) disconnect(); if (mConnected) return; - mLifecycleDisposable = mConnectionProvider.getLifecycleReceiver() - .subscribe(lifecycleEvent -> { - switch (lifecycleEvent.getType()) { - case OPENED: - List headers = new ArrayList<>(); - headers.add(new StompHeader(StompHeader.VERSION, SUPPORTED_VERSIONS)); - if (_headers != null) headers.addAll(_headers); - mConnectionProvider.send(new StompMessage(StompCommand.CONNECT, headers, null).compile()) - .subscribe(); - break; - - case CLOSED: - mConnected = false; - isConnecting = false; - break; - - case ERROR: - mConnected = false; - isConnecting = false; - break; - } - }); + manage( + mConnectionProvider.getLifecycleReceiver() + .subscribe(lifecycleEvent -> { + switch (lifecycleEvent.getType()) { + case OPENED: + List headerList = new ArrayList<>(); + headerList.add(new StompHeader(StompHeader.VERSION, SUPPORTED_VERSIONS)); + if (headers != null) headerList.addAll(headers); + mConnectionProvider.send( + new StompMessage(StompCommand.CONNECT, headerList, null).compile() + ).subscribe(); + break; + + case CLOSED: + setConnected(false); + isConnecting = false; + break; + + case ERROR: + setConnected(false); + isConnecting = false; + break; + } + }) + ); isConnecting = true; - mMessagesDisposable = mConnectionProvider.messages() - .map(StompMessage::from) - .subscribe(stompMessage -> { - if (stompMessage.getStompCommand().equals(StompCommand.CONNECTED)) { - mConnected = true; - isConnecting = false; - for (ConnectableFlowable flowable : mWaitConnectionFlowables) { - flowable.connect(); - } - mWaitConnectionFlowables.clear(); - } - callSubscribers(stompMessage); - }); + manage( + mConnectionProvider.messages() + .map(StompMessage::from) + .subscribe(stompMessage -> { + if (StompCommand.CONNECTED.equals(stompMessage.getStompCommand())) { + setConnected(true); + isConnecting = false; + } + callSubscribers(stompMessage); + }) + ); } - public Flowable send(String destination) { + public Completable send(String destination) { return send(new StompMessage( StompCommand.SEND, Collections.singletonList(new StompHeader(StompHeader.DESTINATION, destination)), null)); } - public Flowable send(String destination, String data) { + public Completable send(String destination, String data) { return send(new StompMessage( StompCommand.SEND, Collections.singletonList(new StompHeader(StompHeader.DESTINATION, destination)), data)); } - public Flowable send(StompMessage stompMessage) { - Flowable flowable = mConnectionProvider.send(stompMessage.compile()); - if (!mConnected) { - ConnectableFlowable deferred = flowable.publish(); - mWaitConnectionFlowables.add(deferred); - return deferred; - } else { - return flowable; - } - } - - private void callSubscribers(StompMessage stompMessage) { - String messageDestination = stompMessage.findHeader(StompHeader.DESTINATION); - synchronized (mEmitters) { - for (String dest : mEmitters.keySet()) { - if (dest.equals(messageDestination)) { - for (FlowableEmitter subscriber : mEmitters.get(dest)) { - subscriber.onNext(stompMessage); - } - return; - } - } - } + public Completable send(StompMessage stompMessage) { + return mConnectionSignal.filter(connected -> connected).firstOrError().toCompletable() + .concatWith(mConnectionProvider.send(stompMessage.compile())) + .doOnSubscribe(this::manage); } public Flowable lifecycle() { - return mConnectionProvider.getLifecycleReceiver(); + return mConnectionProvider.getLifecycleReceiver() + .doOnSubscribe(subscription -> manage(Disposables.fromSubscription(subscription))); } public void disconnect() { - if (mMessagesDisposable != null) mMessagesDisposable.dispose(); - if (mLifecycleDisposable != null) mLifecycleDisposable.dispose(); - mConnected = false; + mCompositeDisposable.clear(); + setConnected(false); } public Flowable topic(String destinationPath) { return topic(destinationPath, null); } - public Flowable topic(String destinationPath, List headerList) { - return Flowable.create(emitter -> { - synchronized (mEmitters) { - Set> emittersSet = mEmitters.get(destinationPath); - if (emittersSet == null) { - emittersSet = new HashSet<>(); - mEmitters.put(destinationPath, emittersSet); - subscribePath(destinationPath, headerList).subscribe(); - } - emittersSet.add(emitter); - } - }, BackpressureStrategy.BUFFER) - .doFinally(() -> { - synchronized (mEmitters) { - Iterator mapIterator = mEmitters.keySet().iterator(); - while (mapIterator.hasNext()) { - String destinationUrl = mapIterator.next(); - Set> set = mEmitters.get(destinationUrl); - Iterator> setIterator = set.iterator(); - while (setIterator.hasNext()) { - FlowableEmitter subscriber = setIterator.next(); - if (subscriber.isCancelled()) { - setIterator.remove(); - if (set.size() < 1) { - mapIterator.remove(); - unsubscribePath(destinationUrl).subscribe(); - } - } - } - } - } - }); - } - - private Flowable subscribePath(String destinationPath, List headerList) { - if (destinationPath == null) return Flowable.empty(); - String topicId = UUID.randomUUID().toString(); - - if (mTopics == null) mTopics = new HashMap<>(); - mTopics.put(destinationPath, topicId); - List headers = new ArrayList<>(); - headers.add(new StompHeader(StompHeader.ID, topicId)); - headers.add(new StompHeader(StompHeader.DESTINATION, destinationPath)); - headers.add(new StompHeader(StompHeader.ACK, DEFAULT_ACK)); - if (headerList != null) headers.addAll(headerList); - return send(new StompMessage(StompCommand.SUBSCRIBE, - headers, null)); - } - - - private Flowable unsubscribePath(String dest) { + private void callSubscribers(StompMessage stompMessage) { + String messageDestination = stompMessage.findHeader(StompHeader.DESTINATION); + SharedEmitter sharedEmitter = mEmitters.get(messageDestination); + if (sharedEmitter != null) sharedEmitter.emitter.onNext(stompMessage); + } + + public Flowable topic(final String destinationPath, final List headerList) { + return Flowable.defer(() -> { + synchronized (mEmitters) { + SharedEmitter sharedEmitter = mEmitters.get(destinationPath); + if (sharedEmitter == null) { + PublishSubject emitter = PublishSubject.create(); + Flowable sharedFlowable = emitter.toFlowable(BackpressureStrategy.BUFFER) + .doOnSubscribe(subscription -> { + manage(Disposables.fromSubscription(subscription)); + subscribePath(destinationPath, headerList).subscribe(); + }) + .doFinally(() -> { + synchronized (mEmitters) { + mEmitters.remove(destinationPath); + unsubscribePath(destinationPath).subscribe(); + } + }) + .share(); + sharedEmitter = new SharedEmitter(emitter, sharedFlowable); + mEmitters.put(destinationPath, sharedEmitter); + } + return sharedEmitter.sharedFlowable; + } + } + ); + + } + + private Completable subscribePath(String destinationPath, List headerList) { + if (destinationPath == null) return Completable.complete(); + String topicId = UUID.randomUUID().toString(); + mTopics.put(destinationPath, topicId); + List headers = new ArrayList<>(); + headers.add(new StompHeader(StompHeader.ID, topicId)); + headers.add(new StompHeader(StompHeader.DESTINATION, destinationPath)); + headers.add(new StompHeader(StompHeader.ACK, DEFAULT_ACK)); + if (headerList != null) headers.addAll(headerList); + return send(new StompMessage(StompCommand.SUBSCRIBE, headers, null)); + } + + private Completable unsubscribePath(String dest) { String topicId = mTopics.get(dest); Log.d(TAG, "Unsubscribe path: " + dest + " id: " + topicId); - return send(new StompMessage(StompCommand.UNSUBSCRIBE, - Collections.singletonList(new StompHeader(StompHeader.ID, topicId)), null)); + return send(new StompMessage( + StompCommand.UNSUBSCRIBE, + Collections.singletonList(new StompHeader(StompHeader.ID, topicId)), + null)); } - public boolean isConnected() { - return mConnected; + private void setConnected(boolean connected) { + mConnected = connected; + mConnectionSignal.onNext(mConnected); } - public boolean isConnecting() { - return isConnecting; + private void manage(Disposable disposable) { + mCompositeDisposable.add(disposable); + } + + private static class SharedEmitter { + public final PublishSubject emitter; + public final Flowable sharedFlowable; + + private SharedEmitter(PublishSubject emitter, Flowable sharedFlowable) { + this.emitter = emitter; + this.sharedFlowable = sharedFlowable; + } } }