Skip to content

Commit

Permalink
Minor websocket fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mushketyk committed May 8, 2018
1 parent 11dffb4 commit e574576
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 86 deletions.
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies {
"org.java-websocket:Java-WebSocket:$javaWebSocketVersion"
testCompile project(path: ':crypto', configuration: 'testArtifacts'),
"nl.jqno.equalsverifier:equalsverifier:$equalsverifierVersion",
group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
"ch.qos.logback:logback-classic:$logbackVersion"
}

task createProperties(dependsOn: processResources) doLast {
Expand Down
28 changes: 0 additions & 28 deletions core/src/main/java/org/web3j/protocol/core/JsonRpc2_0Web3j.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@
import org.web3j.protocol.rx.JsonRpc2_0Rx;
import org.web3j.protocol.websocket.events.LogNotification;
import org.web3j.protocol.websocket.events.NewHeadsNotification;
import org.web3j.protocol.websocket.events.PendingTransactionNotification;
import org.web3j.protocol.websocket.events.SyncingNotfication;
import org.web3j.utils.Async;
import org.web3j.utils.Numeric;

Expand Down Expand Up @@ -736,32 +734,6 @@ private Map<String, Object> createLogsParams(List<String> addresses, List<String
return params;
}

@Override
public Observable<PendingTransactionNotification> newPendingTransactionsNotifications() {
return web3jService.subscribe(
new Request<>(
"eth_subscribe",
Arrays.asList("newPendingTransactions"),
web3jService,
EthSubscribe.class),
"eth_unsubscribe",
PendingTransactionNotification.class
);
}

@Override
public Observable<SyncingNotfication> syncingStatusNotifications() {
return web3jService.subscribe(
new Request<>(
"eth_subscribe",
Arrays.asList("syncing"),
web3jService,
EthSubscribe.class),
"eth_unsubscribe",
SyncingNotfication.class
);
}

@Override
public Observable<String> ethBlockHashObservable() {
return web3jRx.ethBlockHashObservable(blockTime);
Expand Down
23 changes: 15 additions & 8 deletions core/src/main/java/org/web3j/protocol/ipc/IpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ protected IOFacade getIO() {

@Override
protected InputStream performIO(String payload) throws IOException {
IOFacade io;
if (ioFacade != null) {
io = ioFacade;
} else {
io = getIO();
}
IOFacade io = getIoFacade();
io.write(payload);
log.debug(">> " + payload);

Expand All @@ -64,10 +59,22 @@ protected InputStream performIO(String payload) throws IOException {
return new ByteArrayInputStream(result.getBytes("UTF-8"));
}

private IOFacade getIoFacade() {
IOFacade io;
if (ioFacade != null) {
io = ioFacade;
} else {
io = getIO();
}
return io;
}

@Override
public void close() throws IOException {
if (ioFacade != null) {
ioFacade.close();
IOFacade io = getIoFacade();

if (io != null) {
io.close();
}
}
}
15 changes: 0 additions & 15 deletions core/src/main/java/org/web3j/protocol/rx/Web3jRx.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,4 @@ Observable<Transaction> catchUpToLatestAndSubscribeToNewTransactionsObservable(
* @return Observable that emits logs included in new blocks
*/
Observable<LogNotification> logsNotifications(List<String> addresses, List<String> topics);

/**
* Creates an observable that emits a notification when a new transaction is added
* to the pending state and is signed with a key that is available in the node.
*
* @return Observable that emits a notification when a new transaction is added
* to the pending state
*/
Observable<PendingTransactionNotification> newPendingTransactionsNotifications();

/**
* Creates an observable that emits a notification when a node starts or stops syncing.
* @return Observalbe that emits changes to syncing status
*/
Observable<SyncingNotfication> syncingStatusNotifications();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public void onMessage(String s) {
try {
log.debug("Received message {} from server {}", s, uri);
listener.onMessage(s);
log.debug("Processed message {} from server {}", s, uri);
} catch (Exception e) {
log.error("Failed to process message '{}' from server {}", s, uri);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
* @param <T> type of a data item that should be returned by the sent request
*/
class WebSocketRequest<T> {
private CompletableFuture<T> completableFuture;
private CompletableFuture<T> onReply;
private Class<T> responseType;

public WebSocketRequest(CompletableFuture<T> completableFuture, Class<T> responseType) {
this.completableFuture = completableFuture;
public WebSocketRequest(CompletableFuture<T> onReply, Class<T> responseType) {
this.onReply = onReply;
this.responseType = responseType;
}

public CompletableFuture<T> getCompletableFuture() {
return completableFuture;
public CompletableFuture<T> getOnReply() {
return onReply;
}

public Class<T> getResponseType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void onMessage(String message) throws IOException {

@Override
public void onError(Exception e) {

log.error("Received error from a WebSocket connection", e);
}

@Override
Expand Down Expand Up @@ -177,7 +177,7 @@ private void setRequestTimeout(long requestId) {
}

void closeRequest(long requestId, Exception e) {
CompletableFuture result = requestForId.get(requestId).getCompletableFuture();
CompletableFuture result = requestForId.get(requestId).getOnReply();
requestForId.remove(requestId);
result.completeExceptionally(e);
}
Expand Down Expand Up @@ -261,14 +261,14 @@ private <T extends Notification<?>> void reportSubscriptionError(
}

private void sendReplyToListener(WebSocketRequest request, Object reply) {
request.getCompletableFuture().complete(reply);
request.getOnReply().complete(reply);
}

private void sendExceptionToListener(
String replyStr,
WebSocketRequest request,
IllegalArgumentException e) {
request.getCompletableFuture().completeExceptionally(
request.getOnReply().completeExceptionally(
new IOException(
String.format(
"Failed to parse '%s' as type %s",
Expand Down Expand Up @@ -431,7 +431,7 @@ void onWebSocketClose() {

private void closeOutstandingRequests() {
requestForId.values().forEach(request -> {
request.getCompletableFuture()
request.getOnReply()
.completeExceptionally(new IOException("Connection was closed"));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,21 @@

import rx.subjects.BehaviorSubject;

/**
* Objects necessary to process a new item received via a WebSocket subscription.
*
* @param <T> type of a data item that should be returned by a WebSocket subscription.
*/
public class WebSocketSubscription<T> {
private BehaviorSubject<T> subject;
private Class<T> responseType;

/**
* Creates WebSocketSubscription.
*
* @param subject used to send new data items to listeners
* @param responseType type of a data item returned by a WebSocket subscription
*/
public WebSocketSubscription(BehaviorSubject<T> subject, Class<T> responseType) {
this.subject = subject;
this.responseType = responseType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@

import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class JsonRpc2_0WebSocketClientJTest {
public class JsonRpc2_0Web3jTest {

private ScheduledExecutorService scheduledExecutorService
= mock(ScheduledExecutorService.class);
Expand Down
18 changes: 0 additions & 18 deletions core/src/test/java/org/web3j/protocol/core/WebSocketEventTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,6 @@ public void testLogsNotificationsWithArguments() {
+ "\"topics\":\\[\"0x2\"]}],\"id\":[0-9]{1,}}"));
}

@Test
public void testPendingTransactionsNotifications() {
web3j.newPendingTransactionsNotifications();

verify(webSocketClient).send(matches(
"\\{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscribe\",\"params\":"
+ "\\[\"newPendingTransactions\"],\"id\":[0-9]{1,}}"));
}

@Test
public void testSyncingStatusNotifications() {
web3j.syncingStatusNotifications();

verify(webSocketClient).send(matches(
"\\{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscribe\","
+ "\"params\":\\[\"syncing\"],\"id\":[0-9]{1,}}"));
}

private int getRequestId(String message) throws IOException {
JsonNode messageJson = objectMapper.readTree(message);
return messageJson.get("id").asInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testSyncRequest() throws Exception {
requestSent.await(2, TimeUnit.SECONDS);
sendGethVersionReply();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
});

Expand Down
21 changes: 20 additions & 1 deletion geth/src/main/java/org/web3j/protocol/geth/Geth.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.web3j.protocol.geth;

import rx.Observable;

import org.web3j.protocol.Web3jService;
import org.web3j.protocol.admin.Admin;
import org.web3j.protocol.admin.methods.response.BooleanResponse;
Expand All @@ -8,6 +10,8 @@
import org.web3j.protocol.core.methods.response.MinerStartResponse;
import org.web3j.protocol.geth.response.PersonalEcRecover;
import org.web3j.protocol.geth.response.PersonalImportRawKey;
import org.web3j.protocol.websocket.events.PendingTransactionNotification;
import org.web3j.protocol.websocket.events.SyncingNotfication;

/**
* JSON-RPC Request object building factory for Geth.
Expand All @@ -18,7 +22,7 @@ static Geth build(Web3jService web3jService) {
}

Request<?, PersonalImportRawKey> personalImportRawKey(String keydata, String password);

Request<?, BooleanResponse> personalLockAccount(String accountId);

Request<?, PersonalSign> personalSign(String message, String accountId, String password);
Expand All @@ -29,4 +33,19 @@ static Geth build(Web3jService web3jService) {

Request<?, BooleanResponse> minerStop();

/**
* Creates an observable that emits a notification when a new transaction is added
* to the pending state and is signed with a key that is available in the node.
*
* @return Observable that emits a notification when a new transaction is added
* to the pending state
*/
Observable<PendingTransactionNotification> newPendingTransactionsNotifications();

/**
* Creates an observable that emits a notification when a node starts or stops syncing.
* @return Observalbe that emits changes to syncing status
*/
Observable<SyncingNotfication> syncingStatusNotifications();

}
29 changes: 29 additions & 0 deletions geth/src/main/java/org/web3j/protocol/geth/JsonRpc2_0Geth.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import java.util.Arrays;
import java.util.Collections;

import rx.Observable;

import org.web3j.protocol.Web3jService;
import org.web3j.protocol.admin.JsonRpc2_0Admin;
import org.web3j.protocol.admin.methods.response.BooleanResponse;
import org.web3j.protocol.admin.methods.response.PersonalSign;
import org.web3j.protocol.core.Request;
import org.web3j.protocol.core.methods.response.EthSubscribe;
import org.web3j.protocol.core.methods.response.MinerStartResponse;
import org.web3j.protocol.geth.response.PersonalEcRecover;
import org.web3j.protocol.geth.response.PersonalImportRawKey;
import org.web3j.protocol.websocket.events.PendingTransactionNotification;
import org.web3j.protocol.websocket.events.SyncingNotfication;

/**
* JSON-RPC 2.0 factory implementation for Geth.
Expand Down Expand Up @@ -78,4 +83,28 @@ public Request<?, BooleanResponse> minerStop() {
BooleanResponse.class);
}

public Observable<PendingTransactionNotification> newPendingTransactionsNotifications() {
return web3jService.subscribe(
new Request<>(
"eth_subscribe",
Arrays.asList("newPendingTransactions"),
web3jService,
EthSubscribe.class),
"eth_unsubscribe",
PendingTransactionNotification.class
);
}

@Override
public Observable<SyncingNotfication> syncingStatusNotifications() {
return web3jService.subscribe(
new Request<>(
"eth_subscribe",
Arrays.asList("syncing"),
web3jService,
EthSubscribe.class),
"eth_unsubscribe",
SyncingNotfication.class
);
}
}
Loading

0 comments on commit e574576

Please sign in to comment.