Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ua.naiksoftware.stompclientexample;

import io.reactivex.Flowable;
import io.reactivex.Completable;
import retrofit2.http.POST;
import retrofit2.http.Query;

Expand All @@ -10,5 +10,5 @@
public interface ExampleRepository {

@POST("hello-convert-and-send")
Flowable<Void> sendRestEcho(@Query("msg") String message);
Completable sendRestEcho(@Query("msg") String message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.List;
import java.util.Locale;

import io.reactivex.FlowableTransformer;
import io.reactivex.CompletableTransformer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
Expand Down Expand Up @@ -91,7 +91,7 @@ public void connectStomp(View view) {
public void sendEchoViaStomp(View v) {
mStompClient.send("/topic/hello-msg-mapping", "Echo STOMP " + mTimeFormat.format(new Date()))
.compose(applySchedulers())
.subscribe(aVoid -> {
.subscribe(() -> {
Log.d(TAG, "STOMP echo send successfully");
}, throwable -> {
Log.e(TAG, "Error send STOMP echo", throwable);
Expand All @@ -103,7 +103,7 @@ public void sendEchoViaRest(View v) {
mRestPingDisposable = RestClient.getInstance().getExampleRepository()
.sendRestEcho("Echo REST " + mTimeFormat.format(new Date()))
.compose(applySchedulers())
.subscribe(aVoid -> {
.subscribe(() -> {
Log.d(TAG, "REST echo send successfully");
}, throwable -> {
Log.e(TAG, "Error send REST echo", throwable);
Expand All @@ -122,8 +122,8 @@ private void toast(String text) {
Toast.makeText(this, text, Toast.LENGTH_SHORT).show();
}

protected <T> FlowableTransformer<T, T> applySchedulers() {
return tFlowable -> tFlowable
protected CompletableTransformer applySchedulers() {
return upstream -> upstream
.unsubscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ua.naiksoftware.stomp;

import io.reactivex.Completable;
import io.reactivex.Flowable;

/**
Expand All @@ -17,7 +18,7 @@ public interface ConnectionProvider {
* onError if not connected or error detected will be called, or onCompleted id sending started
* TODO: send messages with ACK
*/
Flowable<Void> send(String stompMessage);
Completable send(String stompMessage);

/**
* Subscribe this for receive #LifecycleEvent events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@

import android.util.Log;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.subjects.PublishSubject;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand All @@ -29,37 +26,50 @@
private final Map<String, String> mConnectHttpHeaders;
private final OkHttpClient mOkHttpClient;

private final List<FlowableEmitter<? super LifecycleEvent>> mLifecycleEmitters;
private final List<FlowableEmitter<? super String>> mMessagesEmitters;
private final PublishSubject<LifecycleEvent> mLifecycleEmitter = PublishSubject.create();
private final PublishSubject<String> mMessagesEmitter = PublishSubject.create();
private final Flowable<String> mMessagesFlowable;

private WebSocket openedSocked;


/* package */ OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders, OkHttpClient okHttpClient) {
mUri = uri;
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
mLifecycleEmitters = Collections.synchronizedList(new ArrayList<>());
mMessagesEmitters = new ArrayList<>();
mOkHttpClient = okHttpClient;

mMessagesFlowable = Flowable.defer(() ->
mMessagesEmitter.toFlowable(BackpressureStrategy.BUFFER)
.doOnSubscribe(s -> createWebSocketConnection())
.doFinally(() -> {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
if (openedSocked != null) openedSocked.close(1000, "");
openedSocked = null;
})
).share();
}

@Override
public Completable send(String stompMessage) {
return Completable.create(subscriber -> {
if (openedSocked == null) {
subscriber.onError(new IllegalStateException("Not connected yet"));
} else {
Log.d(TAG, "Send STOMP message: " + stompMessage);
openedSocked.send(stompMessage);
subscriber.onComplete();
}
});
}

@Override
public Flowable<String> messages() {
Flowable<String> flowable = Flowable.<String>create(mMessagesEmitters::add, BackpressureStrategy.BUFFER)
.doFinally(() -> {
Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}
return mMessagesFlowable;
}

if (mMessagesEmitters.size() < 1) {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
openedSocked.close(1000, "");
openedSocked = null;
}
});
createWebSocketConnection();
return flowable;
@Override
public Flowable<LifecycleEvent> getLifecycleReceiver() {
return mLifecycleEmitter.toFlowable(BackpressureStrategy.BUFFER);
}

private void createWebSocketConnection() {
Expand Down Expand Up @@ -105,7 +115,7 @@ public void onClosed(WebSocket webSocket, int code, String reason) {
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.ERROR, new Exception(t)));
}

@Override
public void onClosing(final WebSocket webSocket, final int code, final String reason) {
webSocket.close(code, reason);
Expand All @@ -115,32 +125,6 @@ public void onClosing(final WebSocket webSocket, final int code, final String re
);
}

@Override
public Flowable<Void> send(String stompMessage) {
return Flowable.create(subscriber -> {
if (openedSocked == null) {
subscriber.onError(new IllegalStateException("Not connected yet"));
} else {
Log.d(TAG, "Send STOMP message: " + stompMessage);
openedSocked.send(stompMessage);
subscriber.onComplete();
}
}, BackpressureStrategy.BUFFER);
}

@Override
public Flowable<LifecycleEvent> getLifecycleReceiver() {
return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER)
.doFinally(() -> {
synchronized (mLifecycleEmitters) {
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}
}
});
}

private TreeMap<String, String> headersAsMap(Response response) {
TreeMap<String, String> headersAsMap = new TreeMap<>();
Headers headers = response.headers();
Expand All @@ -157,18 +141,12 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<S
}

private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
synchronized (mLifecycleEmitters) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
for (FlowableEmitter<? super LifecycleEvent> subscriber : mLifecycleEmitters) {
subscriber.onNext(lifecycleEvent);
}
}
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
mLifecycleEmitter.onNext(lifecycleEvent);
}

private void emitMessage(String stompMessage) {
Log.d(TAG, "Emit STOMP message: " + stompMessage);
for (FlowableEmitter<? super String> subscriber : mMessagesEmitters) {
subscriber.onNext(stompMessage);
}
mMessagesEmitter.onNext(stompMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,18 @@
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.subjects.PublishSubject;

/**
* Created by naik on 05.05.16.
Expand All @@ -34,15 +33,13 @@
private final String mUri;
private final Map<String, String> mConnectHttpHeaders;

private final List<FlowableEmitter<? super LifecycleEvent>> mLifecycleEmitters;
private final List<FlowableEmitter<? super String>> mMessagesEmitters;

private final PublishSubject<LifecycleEvent> mLifecycleEmitter = PublishSubject.create();
private final PublishSubject<String> mMessagesEmitter = PublishSubject.create();
private final Flowable<String> mMessagesFlowable;
private WebSocketClient mWebSocketClient;
private boolean haveConnection;
private TreeMap<String, String> mServerHandshakeHeaders;

private final Object mLifecycleLock = new Object();

/**
* Support UIR scheme ws://host:port/path
*
Expand All @@ -51,26 +48,20 @@
/* package */ WebSocketsConnectionProvider(String uri, Map<String, String> connectHttpHeaders) {
mUri = uri;
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
mLifecycleEmitters = new ArrayList<>();
mMessagesEmitters = new ArrayList<>();

mMessagesFlowable = Flowable.defer(() ->
mMessagesEmitter.toFlowable(BackpressureStrategy.BUFFER)
.doOnSubscribe(s -> createWebSocketConnection())
.doFinally(() -> {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
mWebSocketClient.close();
})
).share();
}

@Override
public Flowable<String> messages() {
Flowable<String> flowable = Flowable.<String>create(mMessagesEmitters::add, BackpressureStrategy.BUFFER)
.doFinally(() -> {
Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}

if (mMessagesEmitters.size() < 1) {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
mWebSocketClient.close();
}
});
createWebSocketConnection();
return flowable;
return mMessagesFlowable;
}

private void createWebSocketConnection() {
Expand Down Expand Up @@ -134,44 +125,30 @@ public void onError(Exception ex) {
}

@Override
public Flowable<Void> send(String stompMessage) {
return Flowable.create(emitter -> {
public Completable send(String stompMessage) {
return Completable.create(emitter -> {
if (mWebSocketClient == null) {
emitter.onError(new IllegalStateException("Not connected yet"));
} else {
Log.d(TAG, "Send STOMP message: " + stompMessage);
mWebSocketClient.send(stompMessage);
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER);
});
}

private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
synchronized (mLifecycleLock) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
for (FlowableEmitter<? super LifecycleEvent> emitter : mLifecycleEmitters) {
emitter.onNext(lifecycleEvent);
}
}
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
mLifecycleEmitter.onNext(lifecycleEvent);
}

private void emitMessage(String stompMessage) {
Log.d(TAG, "Emit STOMP message: " + stompMessage);
for (FlowableEmitter<? super String> emitter : mMessagesEmitters) {
emitter.onNext(stompMessage);
}
mMessagesEmitter.onNext(stompMessage);
}

@Override
public Flowable<LifecycleEvent> getLifecycleReceiver() {
return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER)
.doFinally(() -> {
synchronized (mLifecycleLock) {
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
while (iterator.hasNext()) {
if (iterator.next().isCancelled()) iterator.remove();
}
}
});
return mLifecycleEmitter.toFlowable(BackpressureStrategy.BUFFER);
}
}
Loading