Skip to content

Commit

Permalink
Merge pull request hyperledger#458 from mushketyk/websockets
Browse files Browse the repository at this point in the history
Websockets support
  • Loading branch information
conor10 committed May 10, 2018
2 parents ed1621c + 4458d10 commit 62757fe
Show file tree
Hide file tree
Showing 33 changed files with 1,806 additions and 21 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ buildscript {
ext.okhttpVersion = '3.8.1'
ext.rxjavaVersion = '1.2.4'
ext.slf4jVersion = '1.7.25'
ext.javaWebSocketVersion = '1.3.8'

// test dependencies
ext.equalsverifierVersion = '2.1.7'
Expand Down
4 changes: 1 addition & 3 deletions config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@
<property name="tokens" value="LITERAL_TRY, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/>
</module>
<module name="NeedBraces"/>
<module name="LeftCurly">
<property name="maxLineLength" value="100"/>
</module>
<module name="LeftCurly"/>
<module name="RightCurly">
<property name="id" value="RightCurlySame"/>
<property name="tokens" value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_DO"/>
Expand Down
6 changes: 4 additions & 2 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ dependencies {
"com.github.jnr:jnr-unixsocket:$jnr_unixsocketVersion",
"com.squareup.okhttp3:okhttp:$okhttpVersion",
"com.squareup.okhttp3:logging-interceptor:$okhttpVersion",
"io.reactivex:rxjava:$rxjavaVersion"
"io.reactivex:rxjava:$rxjavaVersion",
"org.java-websocket:Java-WebSocket:$javaWebSocketVersion"
testCompile project(path: ':crypto', configuration: 'testArtifacts'),
"nl.jqno.equalsverifier:equalsverifier:$equalsverifierVersion"
"nl.jqno.equalsverifier:equalsverifier:$equalsverifierVersion",
"ch.qos.logback:logback-classic:$logbackVersion"
}

task createProperties(dependsOn: processResources) doLast {
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/web3j/protocol/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import rx.Observable;

import org.web3j.protocol.core.Request;
import org.web3j.protocol.core.Response;
import org.web3j.protocol.websocket.events.Notification;
import org.web3j.utils.Async;

/**
Expand Down Expand Up @@ -42,4 +45,15 @@ public <T extends Response> CompletableFuture<T> sendAsync(
Request jsonRpc20Request, Class<T> responseType) {
return Async.run(() -> send(jsonRpc20Request, responseType));
}

@Override
public <T extends Notification<?>> Observable<T> subscribe(
Request request,
String unsubscribeMethod,
Class<T> responseType) {
throw new UnsupportedOperationException(
String.format(
"Service %s does not support subscriptions",
this.getClass().getSimpleName()));
}
}
52 changes: 52 additions & 0 deletions core/src/main/java/org/web3j/protocol/Web3jService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,68 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import rx.Observable;

import org.web3j.protocol.core.Request;
import org.web3j.protocol.core.Response;
import org.web3j.protocol.websocket.events.Notification;

/**
* Services API.
*/
public interface Web3jService {

/**
* Perform a synchronous JSON-RPC request.
*
* @param request request to perform
* @param responseType class of a data item returned by the request
* @param <T> type of a data item returned by the request
* @return deserialized JSON-RPC response
* @throws IOException thrown if failed to perform a request
*/
<T extends Response> T send(
Request request, Class<T> responseType) throws IOException;

/**
* Performs an asynchronous JSON-RPC request.
*
* @param request request to perform
* @param responseType class of a data item returned by the request
* @param <T> type of a data item returned by the request
* @return CompletableFuture that will be completed when a result is returned or if a
* request has failed
*/
<T extends Response> CompletableFuture<T> sendAsync(
Request request, Class<T> responseType);

/**
* Subscribe to a stream of notifications. A stream of notifications is opened by
* by performing a specified JSON-RPC request and is closed by calling
* the unsubscribe method. Different WebSocket implementations use different pair of
* subscribe/unsubscribe methods.
*
* <p>This method creates an Observable that can be used to subscribe to new notifications.
* When a client unsubscribes from this Observable the service unsubscribes from
* the underlying stream of events.
*
* @param request JSON-RPC request that will be send to subscribe to a stream of
* events
* @param unsubscribeMethod method that will be called to unsubscribe from a
* stream of notifications
* @param responseType class of incoming events objects in a stream
* @param <T> type of incoming event objects
* @return Observable that emits incoming events
*/
<T extends Notification<?>> Observable<T> subscribe(
Request request,
String unsubscribeMethod,
Class<T> responseType);

/**
* Closes resources used by the service.
*
* @throws IOException thrown if a service failed to close all resources
*/
void close() throws IOException;
}
53 changes: 53 additions & 0 deletions core/src/main/java/org/web3j/protocol/core/JsonRpc2_0Web3j.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.web3j.protocol.core;

import java.io.IOException;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

import rx.Observable;
Expand Down Expand Up @@ -44,6 +48,7 @@
import org.web3j.protocol.core.methods.response.EthSign;
import org.web3j.protocol.core.methods.response.EthSubmitHashrate;
import org.web3j.protocol.core.methods.response.EthSubmitWork;
import org.web3j.protocol.core.methods.response.EthSubscribe;
import org.web3j.protocol.core.methods.response.EthSyncing;
import org.web3j.protocol.core.methods.response.EthTransaction;
import org.web3j.protocol.core.methods.response.EthUninstallFilter;
Expand All @@ -62,6 +67,8 @@
import org.web3j.protocol.core.methods.response.Web3ClientVersion;
import org.web3j.protocol.core.methods.response.Web3Sha3;
import org.web3j.protocol.rx.JsonRpc2_0Rx;
import org.web3j.protocol.websocket.events.LogNotification;
import org.web3j.protocol.websocket.events.NewHeadsNotification;
import org.web3j.utils.Async;
import org.web3j.utils.Numeric;

Expand Down Expand Up @@ -686,6 +693,47 @@ public Request<?, ShhMessages> shhGetMessages(BigInteger filterId) {
ShhMessages.class);
}

@Override
public Observable<NewHeadsNotification> newHeadsNotifications() {
return web3jService.subscribe(
new Request<>(
"eth_subscribe",
Collections.singletonList("newHeads"),
web3jService,
EthSubscribe.class),
"eth_unsubscribe",
NewHeadsNotification.class
);
}

@Override
public Observable<LogNotification> logsNotifications(
List<String> addresses, List<String> topics) {

Map<String, Object> params = createLogsParams(addresses, topics);

return web3jService.subscribe(
new Request<>(
"eth_subscribe",
Arrays.asList("logs", params),
web3jService,
EthSubscribe.class),
"eth_unsubscribe",
LogNotification.class
);
}

private Map<String, Object> createLogsParams(List<String> addresses, List<String> topics) {
Map<String, Object> params = new HashMap<>();
if (!addresses.isEmpty()) {
params.put("address", addresses);
}
if (!topics.isEmpty()) {
params.put("topics", topics);
}
return params;
}

@Override
public Observable<String> ethBlockHashObservable() {
return web3jRx.ethBlockHashObservable(blockTime);
Expand Down Expand Up @@ -779,5 +827,10 @@ public Observable<EthBlock> catchUpToLatestAndSubscribeToNewBlocksObservable(
@Override
public void shutdown() {
scheduledExecutorService.shutdown();
try {
web3jService.close();
} catch (IOException e) {
throw new RuntimeException("Failed to close web3j service", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.web3j.protocol.core.methods.response;

import org.web3j.protocol.core.Response;

public class EthSubscribe extends Response<String> {
public String getSubscriptionId() {
return getResult();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.web3j.protocol.core.methods.response;

import org.web3j.protocol.core.Response;

public class EthUnsubscribe extends Response<Boolean> {

}
5 changes: 5 additions & 0 deletions core/src/main/java/org/web3j/protocol/http/HttpService.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,9 @@ public void addHeaders(Map<String, String> headersToAdd) {
public HashMap<String, String> getHeaders() {
return headers;
}

@Override
public void close() throws IOException {

}
}
25 changes: 16 additions & 9 deletions core/src/main/java/org/web3j/protocol/ipc/IpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ protected IOFacade getIO() {

@Override
protected InputStream performIO(String payload) throws IOException {
IOFacade io;
if (ioFacade != null) {
io = ioFacade;
} else {
io = getIO();
}
IOFacade io = getIoFacade();
io.write(payload);
log.debug(">> " + payload);

Expand All @@ -64,10 +59,22 @@ protected InputStream performIO(String payload) throws IOException {
return new ByteArrayInputStream(result.getBytes("UTF-8"));
}

@Deprecated
public void close() throws IOException {
private IOFacade getIoFacade() {
IOFacade io;
if (ioFacade != null) {
ioFacade.close();
io = ioFacade;
} else {
io = getIO();
}
return io;
}

@Override
public void close() throws IOException {
IOFacade io = getIoFacade();

if (io != null) {
io.close();
}
}
}
25 changes: 25 additions & 0 deletions core/src/main/java/org/web3j/protocol/rx/Web3jRx.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package org.web3j.protocol.rx;

import java.util.List;

import rx.Observable;

import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.Transaction;
import org.web3j.protocol.websocket.events.LogNotification;
import org.web3j.protocol.websocket.events.NewHeadsNotification;
import org.web3j.protocol.websocket.events.PendingTransactionNotification;
import org.web3j.protocol.websocket.events.SyncingNotfication;

/**
* The Observables JSON-RPC client event API.
Expand Down Expand Up @@ -167,4 +173,23 @@ Observable<EthBlock> catchUpToLatestAndSubscribeToNewBlocksObservable(
*/
Observable<Transaction> catchUpToLatestAndSubscribeToNewTransactionsObservable(
DefaultBlockParameter startBlock);

/**
* Creates an observable that emits a notification when a new header is appended to a chain,
* including chain reorganizations.
*
* @return Observable that emits a notification for every new header
*/
Observable<NewHeadsNotification> newHeadsNotifications();

/**
* Creates an observable that emits notifications for logs included in new imported blocks.
*
* @param addresses only return logs from this list of address. Return logs from all addresses
* if the list is empty
* @param topics only return logs that match specified topics. Returns logs for all topics if
* the list is empty
* @return Observable that emits logs included in new blocks
*/
Observable<LogNotification> logsNotifications(List<String> addresses, List<String> topics);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.web3j.protocol.websocket;

import java.net.URI;

import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Web socket client implementation that connects to a specify URI. Allows to provide a listener
* that will be called when a new message is received by the client.
*/
public class WebSocketClient extends org.java_websocket.client.WebSocketClient {

private static final Logger log = LoggerFactory.getLogger(WebSocketClient.class);

private WebSocketListener listener;

public WebSocketClient(URI serverUri) {
super(serverUri);
}

@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("Opened WebSocket connection to {}", uri);
}

@Override
public void onMessage(String s) {
try {
log.debug("Received message {} from server {}", s, uri);
listener.onMessage(s);
} catch (Exception e) {
log.error("Failed to process message '{}' from server {}", s, uri);
}
}

@Override
public void onClose(int code, String reason, boolean remote) {
log.info("Closed WebSocket connection to {}, because of reason: '{}'."
+ "Conection closed remotely: {}", uri, reason, remote);
listener.onClose();
}

@Override
public void onError(Exception e) {
log.error(String.format("WebSocket connection to {} failed with error", uri), e);
listener.onError(e);
}

/**
* Set a listener that will be called when a new message is received by the client.
*
* @param listener WebSocket listener
*/
public void setListener(WebSocketListener listener) {
this.listener = listener;
}
}
Loading

0 comments on commit 62757fe

Please sign in to comment.