Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Improved, remove duplicate code
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhao Jin committed Jul 15, 2016
1 parent 9b0e8a2 commit 7f221bb
Showing 1 changed file with 25 additions and 29 deletions.
54 changes: 25 additions & 29 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampClient.java
Expand Up @@ -281,34 +281,7 @@ public void run() {
* @return An observable that can be used to provide a procedure.
*/
public Observable<Request> registerProcedure(final String topic) {
return Observable.create(new OnSubscribe<Request>() {
@Override
public void call(final Subscriber<? super Request> subscriber) {
try {
UriValidator.validate(topic, clientConfig.useStrictUriValidation());
}
catch (WampError e) {
subscriber.onError(e);
return;
}

stateController.scheduler().execute(new Runnable() {
@Override
public void run() {
// If the Subscriber unsubscribed in the meantime we return early
if (subscriber.isUnsubscribed()) return;
// Set subscription to completed if we are not connected
if (!(stateController.currentState() instanceof SessionEstablishedState)) {
subscriber.onCompleted();
return;
}
// Forward publish into the session
SessionEstablishedState curState = (SessionEstablishedState) stateController.currentState();
curState.performRegisterProcedure(topic, EnumSet.noneOf(RegisterFlags.class), subscriber);
}
});
}
});
return this.registerProcedure(topic, EnumSet.noneOf(RegisterFlags.class));
}

/**
Expand All @@ -330,6 +303,28 @@ public void run() {
* @return An observable that can be used to provide a procedure.
*/
public Observable<Request> registerProcedure(final String topic, final RegisterFlags... flags) {
return this.registerProcedure(topic, EnumSet.copyOf(Arrays.asList(flags)));
}

/**
* Registers a procedure at the router which will afterwards be available
* for remote procedure calls from other clients.<br>
* The actual registration will only happen after the user subscribes on
* the returned Observable. This guarantees that no RPC requests get lost.
* Incoming RPC requests will be pushed to the Subscriber via it's
* onNext method. The Subscriber can send responses through the methods on
* the {@link Request}.<br>
* If the client no longer wants to provide the method it can call
* unsubscribe() on the Subscription to unregister the procedure.<br>
* If the connection closes onCompleted will be called.<br>
* In case of errors during subscription onError will be called.
* @param topic The name of the procedure which this client wants to
* provide.<br>
* Must be valid WAMP URI.
* @param flags procedure flags
* @return An observable that can be used to provide a procedure.
*/
public Observable<Request> registerProcedure(final String topic, final EnumSet<RegisterFlags> flags) {
return Observable.create(new OnSubscribe<Request>() {
@Override
public void call(final Subscriber<? super Request> subscriber) {
Expand All @@ -353,12 +348,13 @@ public void run() {
}
// Forward publish into the session
SessionEstablishedState curState = (SessionEstablishedState) stateController.currentState();
curState.performRegisterProcedure(topic, EnumSet.copyOf(Arrays.asList(flags)), subscriber);
curState.performRegisterProcedure(topic, flags, subscriber);
}
});
}
});
}

/**
* Returns an observable that allows to subscribe on the given topic.<br>
* The actual subscription will only be made after subscribe() was called
Expand Down

0 comments on commit 7f221bb

Please sign in to comment.