From 71c9bfe912df1a22f6555d15394195d84012df18 Mon Sep 17 00:00:00 2001 From: mw Date: Thu, 28 Jun 2018 17:08:43 +0100 Subject: [PATCH 1/8] [bitfinex] add methods of authenticated API (squashed) --- .../bitfinex/BitfinexStreamingExchange.java | 30 +- .../bitfinex/BitfinexStreamingRawService.java | 303 ++++++++++++++++++ .../bitfinex/BitfinexStreamingService.java | 53 +-- .../dto/BitfinexAuthRequestStatus.java | 6 + .../bitfinex/dto/BitfinexWebSocketAuth.java | 52 +++ .../dto/BitfinexWebSocketAuthBalance.java | 70 ++++ .../dto/BitfinexWebSocketAuthOrder.java | 166 ++++++++++ .../dto/BitfinexWebSocketAuthPreTrade.java | 102 ++++++ .../dto/BitfinexWebSocketAuthTrade.java | 58 ++++ .../bitfinex/BitfinexManualAuthExample.java | 38 +++ .../BitfinexStreamingServiceTest.java | 154 +++++++++ .../src/test/resources/balance.json | 11 + .../src/test/resources/balances.json | 20 ++ .../src/test/resources/orders.json | 40 +++ .../src/test/resources/preTrade.json | 18 ++ .../src/test/resources/trade.json | 17 + .../BitstampStreamingMarketDataService.java | 3 +- .../bitstamp/dto/BitstampOrderBook.java | 8 +- .../bitstamp/BitstampManualExample.java | 7 +- 19 files changed, 1125 insertions(+), 31 deletions(-) create mode 100644 xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java create mode 100644 xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexAuthRequestStatus.java create mode 100644 xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuth.java create mode 100644 xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthBalance.java create mode 100644 xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthOrder.java create mode 100644 xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthPreTrade.java create mode 100644 xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthTrade.java create mode 100644 xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java create mode 100644 xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java create mode 100644 xchange-bitfinex/src/test/resources/balance.json create mode 100644 xchange-bitfinex/src/test/resources/balances.json create mode 100644 xchange-bitfinex/src/test/resources/orders.json create mode 100644 xchange-bitfinex/src/test/resources/preTrade.json create mode 100644 xchange-bitfinex/src/test/resources/trade.json diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java index 0567a6b226d..c76c2bfd9a2 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java @@ -16,15 +16,17 @@ public class BitfinexStreamingExchange extends BitfinexExchange implements Strea private BitfinexStreamingService streamingService; private BitfinexStreamingMarketDataService streamingMarketDataService; + private BitfinexStreamingRawService streamingAuthenticatedDataService; public BitfinexStreamingExchange() { + this.streamingAuthenticatedDataService = new BitfinexStreamingRawService(API_URI); } @Override protected void initServices() { super.initServices(); - streamingService = createStreamingService(); - streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService); + this.streamingService = createStreamingService(); + this.streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService); } private BitfinexStreamingService createStreamingService() { @@ -74,4 +76,28 @@ public StreamingMarketDataService getStreamingMarketDataService() { @Override public void useCompressedMessages(boolean compressedMessages) { streamingService.useCompressedMessages(compressedMessages); } + public Completable connectToAuthenticated() { + return streamingAuthenticatedDataService.connect(); + } + + public void authenticate() { + streamingAuthenticatedDataService.auth(); + } + + public Completable disconnectToAuthenticated() { + return streamingAuthenticatedDataService.disconnect(); + } + + public boolean isAuthenticatedAlive() { + return streamingAuthenticatedDataService.isSocketOpen(); + } + + public void setCredentials(String apiKey, String apiSecret) { + streamingAuthenticatedDataService.setApiKey(apiKey); + streamingAuthenticatedDataService.setApiSecret(apiSecret); + } + + public BitfinexStreamingRawService getStreamingAuthenticatedDataService() { + return streamingAuthenticatedDataService; + } } diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java new file mode 100644 index 00000000000..2b8ba04674a --- /dev/null +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java @@ -0,0 +1,303 @@ +package info.bitrich.xchangestream.bitfinex; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexAuthRequestStatus; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuth; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthOrder; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade; +import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; +import io.reactivex.Observable; +import io.reactivex.subjects.PublishSubject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.xml.bind.DatatypeConverter; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; + +import static org.knowm.xchange.service.BaseParamsDigest.HMAC_SHA_384; + +public class BitfinexStreamingRawService extends JsonNettyStreamingService { + private static final Logger LOG = LoggerFactory.getLogger(BitfinexStreamingRawService.class); + + private static final String AUTH = "auth"; + private static final String STATUS = "status"; + private static final String MESSAGE = "msg"; + private static final String EVENT = "event"; + + private String apiKey; + private String apiSecret; + + private PublishSubject subjectPreTrade = PublishSubject.create(); + private PublishSubject subjectTrade = PublishSubject.create(); + private PublishSubject subjectOrder = PublishSubject.create(); + private PublishSubject subjectBalance = PublishSubject.create(); + private final ObjectMapper objectMapper = new ObjectMapper(); + + public BitfinexStreamingRawService(String apiUrl) { + super(apiUrl, Integer.MAX_VALUE); + } + + @Override + protected String getChannelNameFromMessage(JsonNode message) throws IOException { + return null; + } + + @Override + public String getSubscribeMessage(String channelName, Object... args) throws IOException { + return null; + } + + @Override + public String getUnsubscribeMessage(String channelName) throws IOException { + return null; + } + + @Override + public void messageHandler(String message) { + JsonNode jsonNode; + try { + jsonNode = objectMapper.readTree(message); + } catch (IOException e) { + LOG.error("Error parsing incoming message to JSON: {}", message); + subjectOrder.onError(e); + return; + } + handleMessage(jsonNode); + } + + @Override + protected void handleMessage(JsonNode message) { + LOG.debug("Receiving message: {}", message); + + if (message.isArray()) { + String type = message.get(1).asText(); + if (type.equals("hb")) { + return; + } + } + + JsonNode event = message.get(EVENT); + if (event != null && event.textValue().equalsIgnoreCase("info")) + auth(); + + if (event != null && event.textValue().equals(AUTH) && message.get(STATUS).textValue().equals(BitfinexAuthRequestStatus.FAILED.name())) { + LOG.error("Authentication error: {}", message.get(MESSAGE)); + return; + } + + if (message.isArray() && message.size() == 3) { + String type = message.get(1).asText(); + JsonNode object = message.get(2); + switch (type) { + case "te": + addPreTrade(object); + break; + case "tu": + addTrade(object); + break; + case "os": + addOrder(object); + break; + case "on": + case "ou": + case "oc": + updateOrder(object); + break; + case "ws": + updateBalances(object); + break; + case "wu": + updateBalance(object); + break; + } + } + } + + private void addPreTrade(JsonNode preTrade) { + if (preTrade.size() < 12) { + LOG.error("addPreTrade unexpected record size={}, record={}", preTrade.size(), preTrade.toString()); + return; + } + long id = preTrade.get(0).longValue(); + String pair = preTrade.get(1).textValue(); + long mtsCreate = preTrade.get(2).longValue(); + long orderId = preTrade.get(3).longValue(); + BigDecimal execAmount = preTrade.get(4).decimalValue(); + BigDecimal execPrice = preTrade.get(5).decimalValue(); + String orderType = preTrade.get(6).textValue(); + BigDecimal orderPrice = preTrade.get(7).decimalValue(); + int maker = preTrade.get(8).intValue(); + BitfinexWebSocketAuthPreTrade preTradeObject = new BitfinexWebSocketAuthPreTrade(id, pair, mtsCreate, orderId, + execAmount, execPrice, orderType, orderPrice, maker); + LOG.debug("New pre trade: {}", preTradeObject); + subjectPreTrade.onNext(preTradeObject); + } + + private void addTrade(JsonNode trade) { + if (trade.size() < 11) { + LOG.error("addTrade unexpected record size={}, record={}", trade.size(), trade.toString()); + return; + } + long id = trade.get(0).longValue(); + String pair = trade.get(1).textValue(); + long mtsCreate = trade.get(2).longValue(); + long orderId = trade.get(3).longValue(); + BigDecimal execAmount = trade.get(4).decimalValue(); + BigDecimal execPrice = trade.get(5).decimalValue(); + String orderType = trade.get(6).textValue(); + BigDecimal orderPrice = trade.get(7).decimalValue(); + int maker = trade.get(8).intValue(); + BigDecimal fee = trade.get(9).decimalValue(); + String currency = trade.get(10).textValue(); + BitfinexWebSocketAuthTrade tradeObject = new BitfinexWebSocketAuthTrade( + id, pair, mtsCreate, orderId, execAmount, execPrice, orderType, orderPrice, maker, fee, currency + ); + LOG.debug("New trade: {}", tradeObject); + subjectTrade.onNext(tradeObject); + } + + private void addOrder(JsonNode orders) { + for (final JsonNode order : orders) { + BitfinexWebSocketAuthOrder orderObject = createOrderObject(order); + if (orderObject != null) { + LOG.debug("New order: {}", orderObject); + subjectOrder.onNext(orderObject); + } + } + } + + private void updateOrder(JsonNode order) { + BitfinexWebSocketAuthOrder orderObject = createOrderObject(order); + if (orderObject != null) { + LOG.debug("Updated order: {}", orderObject); + subjectOrder.onNext(orderObject); + } + } + + private void updateBalance(JsonNode balance) { + BitfinexWebSocketAuthBalance balanceObject = createBalanceObject(balance); + if (balanceObject != null) { + LOG.debug("Balance: {}", balanceObject); + subjectBalance.onNext(balanceObject); + } + } + + private void updateBalances(JsonNode balances) { + for (final JsonNode balance : balances) { + BitfinexWebSocketAuthBalance balanceObject = createBalanceObject(balance); + if (balanceObject != null) { + LOG.debug("Balance: {}", balanceObject); + subjectBalance.onNext(balanceObject); + } + } + } + + private BitfinexWebSocketAuthBalance createBalanceObject(JsonNode balance) { + if (balance.size() < 5) { + LOG.error("createBalanceObject unexpected record size={}, record={}", balance.size(), balance.toString()); + return null; + } + + String walletType = balance.get(0).textValue(); + String currency = balance.get(1).textValue(); + BigDecimal balanceValue = balance.get(2).decimalValue(); + BigDecimal unsettledInterest = balance.get(3).decimalValue(); + BigDecimal balanceAvailable = balance.get(4).asText().equals("null") ? null : balance.get(4).decimalValue(); + + return new BitfinexWebSocketAuthBalance(walletType, currency, balanceValue, unsettledInterest, balanceAvailable); + } + + private BitfinexWebSocketAuthOrder createOrderObject(JsonNode order) { + if (order.size() < 32) { + LOG.error("createOrderObject unexpected record size={}, record={}", order.size(), order.toString()); + return null; + } + + long id = order.get(0).longValue(); + long groupId = order.get(1).longValue(); + long cid = order.get(2).longValue(); + String symbol = order.get(3).textValue(); + long mtsCreate = order.get(4).longValue(); + long mtsUpdate = order.get(5).longValue(); + BigDecimal amount = order.get(6).decimalValue(); + BigDecimal amountOrig = order.get(7).decimalValue(); + String type = order.get(8).textValue(); + String typePrev = order.get(9).textValue(); + int flags = order.get(12).intValue(); + String orderStatus = order.get(13).textValue(); + BigDecimal price = order.get(16).decimalValue(); + BigDecimal priceAvg = order.get(17).decimalValue(); + BigDecimal priceTrailing = order.get(18).decimalValue(); + BigDecimal priceAuxLimit = order.get(19).decimalValue(); + long placedId = order.get(25).longValue(); + + return new BitfinexWebSocketAuthOrder( + id, groupId, cid, symbol, mtsCreate, mtsUpdate, amount, amountOrig, + type, typePrev, orderStatus, price, priceAvg, priceTrailing, + priceAuxLimit, placedId, flags + ); + } + + public void auth() { + long nonce = System.currentTimeMillis() * 1000; + String payload = "AUTH" + nonce; + String signature; + try { + Mac macEncoder = Mac.getInstance(HMAC_SHA_384); + SecretKeySpec secretKeySpec = new SecretKeySpec(apiSecret.getBytes(StandardCharsets.UTF_8), HMAC_SHA_384); + macEncoder.init(secretKeySpec); + byte[] result = macEncoder.doFinal(payload.getBytes(StandardCharsets.UTF_8)); + signature = DatatypeConverter.printHexBinary(result); + } catch (NoSuchAlgorithmException | InvalidKeyException e) { + LOG.error("auth. Sign failed error={}", e.getMessage()); + return; + } + BitfinexWebSocketAuth message = new BitfinexWebSocketAuth( + apiKey, payload, String.valueOf(nonce), signature.toLowerCase() + ); + sendMessage(message); + } + + + public void setApiKey(String apiKey) { + this.apiKey = apiKey; + } + + public void setApiSecret(String apiSecret) { + this.apiSecret = apiSecret; + } + + private void sendMessage(Object message) { + try { + sendMessage(objectMapper.writeValueAsString(message)); + } catch (JsonProcessingException e) { + LOG.error("Error creating json message: {}", e.getMessage()); + } + } + + public Observable getAuthenticatedOrders() { + return subjectOrder.share(); + } + + public Observable getAuthenticatedPreTrades() { + return subjectPreTrade.share(); + } + + public Observable getAuthenticatedTrades() { + return subjectTrade.share(); + } + + public Observable getAuthenticatedBalances() { + return subjectBalance.share(); + } +} diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java index 2167438b101..6d99d0bafd2 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexAuthRequestStatus; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketSubscriptionMessage; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketUnSubscriptionMessage; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; @@ -26,6 +27,7 @@ public class BitfinexStreamingService extends JsonNettyStreamingService { private static final String CHANNEL_ID = "chanId"; private static final String SUBSCRIBED = "subscribed"; private static final String UNSUBSCRIBED = "unsubscribed"; + private static final String ERROR_CODE = "code"; private static final int SUBSCRIPTION_FAILED = 10300; @@ -67,31 +69,38 @@ protected void handleMessage(JsonNode message) { JsonNode event = message.get("event"); if (event != null) { - if (event.textValue().equals(INFO)) { - JsonNode version = message.get("version"); - if (version != null) { - LOG.debug("Bitfinex websocket API version: {}.", version.intValue()); + switch (event.textValue()) { + case INFO: + JsonNode version = message.get("version"); + if (version != null) { + LOG.debug("Bitfinex websocket API version: {}.", version.intValue()); + } + break; + case SUBSCRIBED: { + String channel = message.get("channel").asText(); + String pair = message.get("pair").asText(); + String channelId = message.get(CHANNEL_ID).asText(); + try { + String subscriptionUniqueId = getSubscriptionUniqueId(channel, pair); + subscribedChannels.put(channelId, subscriptionUniqueId); + LOG.debug("Register channel {}: {}", subscriptionUniqueId, channelId); + } catch (Exception e) { + LOG.error(e.getMessage()); + } + break; } - } else if (event.textValue().equals(SUBSCRIBED)) { - String channel = message.get("channel").asText(); - String pair = message.get("pair").asText(); - String channelId = message.get(CHANNEL_ID).asText(); - try { - String subscriptionUniqueId = getSubscriptionUniqueId(channel, pair); - subscribedChannels.put(channelId, subscriptionUniqueId); - LOG.debug("Register channel {}: {}", subscriptionUniqueId, channelId); - } catch (Exception e) { - LOG.error(e.getMessage()); + case UNSUBSCRIBED: { + String channelId = message.get(CHANNEL_ID).asText(); + subscribedChannels.remove(channelId); + break; } - } else if (event.textValue().equals(UNSUBSCRIBED)) { - String channelId = message.get(CHANNEL_ID).asText(); - subscribedChannels.remove(channelId); - } else if (event.textValue().equals(ERROR)) { - if (message.get("code").asInt() == SUBSCRIPTION_FAILED) { + case ERROR: + if (message.get("code").asInt() == SUBSCRIPTION_FAILED) { LOG.error("Error with message: " + message.get("symbol") + " " + message.get("msg")); - return; - } - super.handleError(message, new ExchangeException("Error code: " + message.get("code").asText())); + return; + } + super.handleError(message, new ExchangeException("Error code: " + message.get(ERROR_CODE).asText())); + break; } } else super.handleMessage(message); } diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexAuthRequestStatus.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexAuthRequestStatus.java new file mode 100644 index 00000000000..f095ae0db49 --- /dev/null +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexAuthRequestStatus.java @@ -0,0 +1,6 @@ +package info.bitrich.xchangestream.bitfinex.dto; + +public enum BitfinexAuthRequestStatus { + OK, + FAILED +} diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuth.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuth.java new file mode 100644 index 00000000000..b0aba70dca0 --- /dev/null +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuth.java @@ -0,0 +1,52 @@ +package info.bitrich.xchangestream.bitfinex.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class BitfinexWebSocketAuth { + + private final String apiKey; + private final String authPayload; + private final String authNonce; + private final String authSig; + private final String event; + + public BitfinexWebSocketAuth(@JsonProperty("apiKey") String apiKey, @JsonProperty("authPayload") String authPayload, + @JsonProperty("authNonce") String authNonce, @JsonProperty("authSig") String authSig) { + this.apiKey = apiKey; + this.event = "auth"; + this.authPayload = authPayload; + this.authNonce = authNonce; + this.authSig = authSig; + } + + @Override + public String toString() { + return "BitfinexWebSocketAuth{" + + "apiKey='" + apiKey + '\'' + + ", authPayload='" + authPayload + '\'' + + ", authNonce='" + authNonce + '\'' + + ", authSig='" + authSig + '\'' + + ", event='" + event + '\'' + + '}'; + } + + public String getApiKey() { + return apiKey; + } + + public String getAuthPayload() { + return authPayload; + } + + public String getAuthNonce() { + return authNonce; + } + + public String getAuthSig() { + return authSig; + } + + public String getEvent() { + return event; + } +} diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthBalance.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthBalance.java new file mode 100644 index 00000000000..d079ff357dc --- /dev/null +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthBalance.java @@ -0,0 +1,70 @@ +package info.bitrich.xchangestream.bitfinex.dto; + +import java.math.BigDecimal; +import java.util.Objects; + +public class BitfinexWebSocketAuthBalance { + private String walletType; + private String currency; + private BigDecimal balance; + private BigDecimal unsettledInterest; + private BigDecimal balanceAvailable; + + public BitfinexWebSocketAuthBalance(String walletType, String currency, BigDecimal balance, + BigDecimal unsettledInterest, BigDecimal balanceAvailable) { + this.walletType = walletType; + this.currency = currency; + this.balance = balance; + this.unsettledInterest = unsettledInterest; + this.balanceAvailable = balanceAvailable; + } + + public String getWalletType() { + return walletType; + } + + public String getCurrency() { + return currency; + } + + public BigDecimal getBalance() { + return balance; + } + + public BigDecimal getUnsettledInterest() { + return unsettledInterest; + } + + public BigDecimal getBalanceAvailable() { + return balanceAvailable; + } + + @Override + public String toString() { + return "BitfinexWebSocketAuthBalance{" + + "walletType='" + walletType + '\'' + + ", currency='" + currency + '\'' + + ", balance=" + balance + + ", unsettledInterest=" + unsettledInterest + + ", balanceAvailable=" + balanceAvailable + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof BitfinexWebSocketAuthBalance)) return false; + BitfinexWebSocketAuthBalance that = (BitfinexWebSocketAuthBalance) o; + return Objects.equals(walletType, that.walletType) && + Objects.equals(currency, that.currency) && + Objects.equals(balance, that.balance) && + Objects.equals(unsettledInterest, that.unsettledInterest) && + Objects.equals(balanceAvailable, that.balanceAvailable); + } + + @Override + public int hashCode() { + + return Objects.hash(walletType, currency, balance, unsettledInterest, balanceAvailable); + } +} diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthOrder.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthOrder.java new file mode 100644 index 00000000000..e94a8f24cd8 --- /dev/null +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthOrder.java @@ -0,0 +1,166 @@ +package info.bitrich.xchangestream.bitfinex.dto; + +import java.math.BigDecimal; +import java.util.Objects; + +public class BitfinexWebSocketAuthOrder { + private long id; + private long groupId; + private long cid; + private String symbol; + private long mtsCreate; + private long mtsUpdate; + private BigDecimal amount; + private BigDecimal amountOrig; + private String type; + private String typePrev; + private String orderStatus; + private BigDecimal price; + private BigDecimal priceAvg; + private BigDecimal priceTrailing; + private BigDecimal priceAuxLimit; + private long placedId; + private int flags; + + public BitfinexWebSocketAuthOrder(long id, long groupId, long cid, String symbol, long mtsCreate, long mtsUpdate, BigDecimal amount, BigDecimal amountOrig, String type, String typePrev, + String orderStatus, BigDecimal price, BigDecimal priceAvg, BigDecimal priceTrailing, BigDecimal priceAuxLimit, long placedId, int flags) { + this.id = id; + this.groupId = groupId; + this.cid = cid; + this.symbol = symbol; + this.mtsCreate = mtsCreate; + this.mtsUpdate = mtsUpdate; + this.amount = amount; + this.amountOrig = amountOrig; + this.type = type; + this.typePrev = typePrev; + this.orderStatus = orderStatus; + this.price = price; + this.priceAvg = priceAvg; + this.priceTrailing = priceTrailing; + this.priceAuxLimit = priceAuxLimit; + this.placedId = placedId; + this.flags = flags; + } + + @Override + public String toString() { + return "BitfinexWebSocketAuthenticatedOrder{" + + "id=" + id + + ", groupId=" + groupId + + ", cid=" + cid + + ", symbol='" + symbol + '\'' + + ", mtsCreate=" + mtsCreate + + ", mtsUpdate=" + mtsUpdate + + ", amount=" + amount + + ", amountOrig=" + amountOrig + + ", type='" + type + '\'' + + ", typePrev='" + typePrev + '\'' + + ", orderStatus='" + orderStatus + '\'' + + ", price=" + price + + ", priceAvg=" + priceAvg + + ", priceTrailing=" + priceTrailing + + ", priceAuxLimit=" + priceAuxLimit + + ", placedId=" + placedId + + ", flags=" + flags + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof BitfinexWebSocketAuthOrder)) return false; + BitfinexWebSocketAuthOrder that = (BitfinexWebSocketAuthOrder) o; + return id == that.id && + groupId == that.groupId && + cid == that.cid && + mtsCreate == that.mtsCreate && + mtsUpdate == that.mtsUpdate && + placedId == that.placedId && + flags == that.flags && + Objects.equals(symbol, that.symbol) && + Objects.equals(amount, that.amount) && + Objects.equals(amountOrig, that.amountOrig) && + Objects.equals(type, that.type) && + Objects.equals(typePrev, that.typePrev) && + Objects.equals(orderStatus, that.orderStatus) && + Objects.equals(price, that.price) && + Objects.equals(priceAvg, that.priceAvg) && + Objects.equals(priceTrailing, that.priceTrailing) && + Objects.equals(priceAuxLimit, that.priceAuxLimit); + } + + @Override + public int hashCode() { + + return Objects.hash(id, groupId, cid, symbol, mtsCreate, mtsUpdate, amount, amountOrig, type, typePrev, orderStatus, price, priceAvg, priceTrailing, priceAuxLimit, placedId, flags); + } + + public long getId() { + return id; + } + + public long getGroupId() { + return groupId; + } + + public long getCid() { + return cid; + } + + public String getSymbol() { + return symbol; + } + + public long getMtsCreate() { + return mtsCreate; + } + + public long getMtsUpdate() { + return mtsUpdate; + } + + public BigDecimal getAmount() { + return amount; + } + + public BigDecimal getAmountOrig() { + return amountOrig; + } + + public String getType() { + return type; + } + + public String getTypePrev() { + return typePrev; + } + + public String getOrderStatus() { + return orderStatus; + } + + public BigDecimal getPrice() { + return price; + } + + public BigDecimal getPriceAvg() { + return priceAvg; + } + + public BigDecimal getPriceTrailing() { + return priceTrailing; + } + + public BigDecimal getPriceAuxLimit() { + return priceAuxLimit; + } + + public long getPlacedId() { + return placedId; + } + + public int getFlags() { + return flags; + } +} diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthPreTrade.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthPreTrade.java new file mode 100644 index 00000000000..c0bf1d41213 --- /dev/null +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthPreTrade.java @@ -0,0 +1,102 @@ +package info.bitrich.xchangestream.bitfinex.dto; + +import java.math.BigDecimal; +import java.util.Objects; + +public class BitfinexWebSocketAuthPreTrade { + private long id; + private String pair; + private long mtsCreate; + private long orderId; + private BigDecimal execAmount; + private BigDecimal execPrice; + private String orderType; + private BigDecimal orderPrice; + private int maker; + + public BitfinexWebSocketAuthPreTrade(long id, String pair, long mtsCreate, long orderId, + BigDecimal execAmount, BigDecimal execPrice, + String orderType, BigDecimal orderPrice, int maker) { + this.id = id; + this.pair = pair; + this.mtsCreate = mtsCreate; + this.orderId = orderId; + this.execAmount = execAmount; + this.execPrice = execPrice; + this.orderType = orderType; + this.orderPrice = orderPrice; + this.maker = maker; + } + + public long getId() { + return id; + } + + public String getPair() { + return pair; + } + + public long getMtsCreate() { + return mtsCreate; + } + + public long getOrderId() { + return orderId; + } + + public BigDecimal getExecAmount() { + return execAmount; + } + + public BigDecimal getExecPrice() { + return execPrice; + } + + public String getOrderType() { + return orderType; + } + + public BigDecimal getOrderPrice() { + return orderPrice; + } + + public int getMaker() { + return maker; + } + + @Override + public String toString() { + return "BitfinexWebSocketAuthPreTrade{" + + "id=" + id + + ", pair='" + pair + '\'' + + ", mtsCreate=" + mtsCreate + + ", orderId=" + orderId + + ", execAmount=" + execAmount + + ", execPrice=" + execPrice + + ", orderType='" + orderType + '\'' + + ", orderPrice=" + orderPrice + + ", maker=" + maker + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof BitfinexWebSocketAuthPreTrade)) return false; + BitfinexWebSocketAuthPreTrade that = (BitfinexWebSocketAuthPreTrade) o; + return getId() == that.getId() && + getMtsCreate() == that.getMtsCreate() && + getOrderId() == that.getOrderId() && + getMaker() == that.getMaker() && + Objects.equals(getPair(), that.getPair()) && + Objects.equals(getExecAmount(), that.getExecAmount()) && + Objects.equals(getExecPrice(), that.getExecPrice()) && + Objects.equals(getOrderType(), that.getOrderType()) && + Objects.equals(getOrderPrice(), that.getOrderPrice()); + } + + @Override + public int hashCode() { + return Objects.hash(getId(), getPair(), getMtsCreate(), getOrderId(), getExecAmount(), getExecPrice(), getOrderType(), getOrderPrice(), getMaker()); + } +} diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthTrade.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthTrade.java new file mode 100644 index 00000000000..3762b0609df --- /dev/null +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/dto/BitfinexWebSocketAuthTrade.java @@ -0,0 +1,58 @@ +package info.bitrich.xchangestream.bitfinex.dto; + +import java.math.BigDecimal; +import java.util.Objects; + +public class BitfinexWebSocketAuthTrade extends BitfinexWebSocketAuthPreTrade { + private BigDecimal fee; + private String feeCurrency; + + public BitfinexWebSocketAuthTrade(long id, String pair, long mtsCreate, + long orderId, BigDecimal execAmount, BigDecimal execPrice, + String orderType, BigDecimal orderPrice, + int maker, BigDecimal fee, String feeCurrency) { + super(id, pair, mtsCreate, orderId, execAmount, execPrice, orderType, orderPrice, maker); + this.fee = fee; + this.feeCurrency = feeCurrency; + } + + public BigDecimal getFee() { + return fee; + } + + public String getFeeCurrency() { + return feeCurrency; + } + + @Override + public String toString() { + return "BitfinexWebSocketAuthenticatedTrade{" + + "id=" + getId() + + ", pair='" + getPair() + '\'' + + ", mtsCreate=" + getMtsCreate() + + ", orderId=" + getOrderId() + + ", execAmount=" + getExecAmount() + + ", execPrice=" + getExecPrice() + + ", orderType='" + getOrderType() + '\'' + + ", orderPrice=" + getOrderPrice() + + ", maker=" + getMtsCreate() + + ", fee=" + fee + + ", feeCurrency='" + feeCurrency + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof BitfinexWebSocketAuthTrade)) return false; + if (!super.equals(o)) return false; + BitfinexWebSocketAuthTrade that = (BitfinexWebSocketAuthTrade) o; + return Objects.equals(fee, that.fee) && + Objects.equals(feeCurrency, that.feeCurrency); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), fee, feeCurrency); + } +} diff --git a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java new file mode 100644 index 00000000000..0b8764bdb47 --- /dev/null +++ b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java @@ -0,0 +1,38 @@ +package info.bitrich.xchangestream.bitfinex; + +import info.bitrich.xchangestream.core.StreamingExchangeFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BitfinexManualAuthExample { + private static final Logger LOG = LoggerFactory.getLogger(BitfinexManualAuthExample.class); + + public static void main(String[] args) { + BitfinexStreamingExchange exchange = (BitfinexStreamingExchange) StreamingExchangeFactory.INSTANCE.createExchange( + BitfinexStreamingExchange.class.getName()); + exchange.setCredentials("API-KEY", "API-SECRET"); + + exchange.connectToAuthenticated().blockingAwait(); + exchange.getStreamingAuthenticatedDataService().getAuthenticatedTrades().subscribe( + t -> LOG.info("AUTH TRADE: {}", t), + throwable -> LOG.error("ERROR: ", throwable) + ); + exchange.getStreamingAuthenticatedDataService().getAuthenticatedPreTrades().subscribe( + t -> LOG.info("AUTH PRE TRADE: {}", t), + throwable -> LOG.error("ERROR: ", throwable) + ); + exchange.getStreamingAuthenticatedDataService().getAuthenticatedOrders().subscribe( + t -> LOG.info("AUTH ORDER: {}", t), + throwable -> LOG.error("ERROR: ", throwable) + ); + exchange.getStreamingAuthenticatedDataService().getAuthenticatedBalances().subscribe( + t -> LOG.info("AUTH BALANCE: {}", t), + throwable -> LOG.error("ERROR: ", throwable) + ); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java new file mode 100644 index 00000000000..e3b676f1e08 --- /dev/null +++ b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java @@ -0,0 +1,154 @@ +package info.bitrich.xchangestream.bitfinex; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthOrder; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade; +import io.reactivex.observers.TestObserver; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.math.BigDecimal; + +import static org.assertj.core.api.Assertions.assertThat; + +public class BitfinexStreamingServiceTest { + + private BitfinexStreamingRawService service; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Before + public void setUp() { + BitfinexStreamingExchange bitfinexStreamingExchange = new BitfinexStreamingExchange(); + service = bitfinexStreamingExchange.getStreamingAuthenticatedDataService(); + } + + @Test + public void testGetOrders() throws Exception { + + JsonNode jsonNode = objectMapper.readTree(ClassLoader.getSystemClassLoader().getResourceAsStream("orders.json")); + + TestObserver test = service.getAuthenticatedOrders().test(); + + service.handleMessage(jsonNode); + BitfinexWebSocketAuthOrder expected = + new BitfinexWebSocketAuthOrder( + 13759731408L, + 0, + 50999677532L, + "tETHUSD", + 1530108599707L, + 1530108599726L, + new BigDecimal("-0.02"), + new BigDecimal("-0.02"), + "EXCHANGE LIMIT", + null, + "ACTIVE", + new BigDecimal("431.19"), + BigDecimal.ZERO, + BigDecimal.ZERO, + BigDecimal.ZERO, + 0L, + 0); + test.assertValue(expected); + } + + @Test + public void testGetPreTrades() throws Exception { + + JsonNode jsonNode = objectMapper.readTree(ClassLoader.getSystemClassLoader().getResourceAsStream("preTrade.json")); + + TestObserver test = service.getAuthenticatedPreTrades().test(); + + service.handleMessage(jsonNode); + + BitfinexWebSocketAuthPreTrade expected = + new BitfinexWebSocketAuthPreTrade( + 262861164L, + "tETHUSD", + 1530187145559L, + 13787457748L, + new BigDecimal("-0.04"), + new BigDecimal("435.8"), + "EXCHANGE LIMIT", + new BigDecimal("435.8"), + 1); + test.assertValue(expected); + } + + @Test + public void testGetTrades() throws Exception { + + JsonNode jsonNode = objectMapper.readTree(ClassLoader.getSystemClassLoader().getResourceAsStream("trade.json")); + + TestObserver test = service.getAuthenticatedTrades().test(); + + service.handleMessage(jsonNode); + + BitfinexWebSocketAuthTrade expected = + new BitfinexWebSocketAuthTrade( + 262861164L, + "tETHUSD", + 1530187145559L, + 13787457748L, + new BigDecimal("-0.04"), + new BigDecimal("435.8"), + "EXCHANGE LIMIT", + new BigDecimal("435.8"), + 1, + new BigDecimal("-0.0104592"), + "USD"); + test.assertValue(expected); + } + + @Test + public void testGetBalances() throws Exception { + JsonNode jsonNode = objectMapper.readTree(ClassLoader.getSystemClassLoader().getResourceAsStream("balances.json")); + + TestObserver test = service.getAuthenticatedBalances().test(); + + service.handleMessage(jsonNode); + + BitfinexWebSocketAuthBalance expected = + new BitfinexWebSocketAuthBalance( + "exchange", + "ETH", + new BigDecimal("0.38772"), + BigDecimal.ZERO, + null + ); + + BitfinexWebSocketAuthBalance expected1 = + new BitfinexWebSocketAuthBalance( + "exchange", + "USD", + new BigDecimal("69.4747619"), + BigDecimal.ZERO, + null + ); + test.assertNoErrors(); + test.assertValueCount(2); + assertThat(test.values().contains(expected)); + assertThat(test.values().contains(expected1)); + } + + @Test + public void testGetBalance() throws IOException { + JsonNode jsonNode = objectMapper.readTree(ClassLoader.getSystemClassLoader().getResourceAsStream("balance.json")); + TestObserver test = service.getAuthenticatedBalances().test(); + service.handleMessage(jsonNode); + + BitfinexWebSocketAuthBalance balance = new BitfinexWebSocketAuthBalance( + "exchange", + "USD", + new BigDecimal("78.5441867"), + BigDecimal.ZERO, + null + ); + + test.assertValue(balance); + } +} diff --git a/xchange-bitfinex/src/test/resources/balance.json b/xchange-bitfinex/src/test/resources/balance.json new file mode 100644 index 00000000000..bbc745b4f39 --- /dev/null +++ b/xchange-bitfinex/src/test/resources/balance.json @@ -0,0 +1,11 @@ +[ + 0, + "wu", + [ + "exchange", + "USD", + 78.5441867, + 0, + null + ] +] \ No newline at end of file diff --git a/xchange-bitfinex/src/test/resources/balances.json b/xchange-bitfinex/src/test/resources/balances.json new file mode 100644 index 00000000000..37b0a44a202 --- /dev/null +++ b/xchange-bitfinex/src/test/resources/balances.json @@ -0,0 +1,20 @@ +[ + 0, + "ws", + [ + [ + "exchange", + "ETH", + 0.38772, + 0, + null + ], + [ + "exchange", + "USD", + 69.4747619, + 0, + null + ] + ] +] \ No newline at end of file diff --git a/xchange-bitfinex/src/test/resources/orders.json b/xchange-bitfinex/src/test/resources/orders.json new file mode 100644 index 00000000000..e92fb949756 --- /dev/null +++ b/xchange-bitfinex/src/test/resources/orders.json @@ -0,0 +1,40 @@ +[ + 0, + "os", + [ + [ + 13759731408, + null, + 50999677532, + "tETHUSD", + 1530108599707, + 1530108599726, + -0.02, + -0.02, + "EXCHANGE LIMIT", + null, + null, + null, + 0, + "ACTIVE", + null, + null, + 431.19, + 0, + null, + null, + null, + null, + null, + 0, + 0, + 0, + null, + null, + "API>BFX", + null, + null, + null + ] + ] +] \ No newline at end of file diff --git a/xchange-bitfinex/src/test/resources/preTrade.json b/xchange-bitfinex/src/test/resources/preTrade.json new file mode 100644 index 00000000000..4d57190bd59 --- /dev/null +++ b/xchange-bitfinex/src/test/resources/preTrade.json @@ -0,0 +1,18 @@ +[ + 0, + "te", + [ + 262861164, + "tETHUSD", + 1530187145559, + 13787457748, + -0.04, + 435.8, + "EXCHANGE LIMIT", + 435.8, + 1, + null, + null, + null + ] +] \ No newline at end of file diff --git a/xchange-bitfinex/src/test/resources/trade.json b/xchange-bitfinex/src/test/resources/trade.json new file mode 100644 index 00000000000..4a9c9bcd4db --- /dev/null +++ b/xchange-bitfinex/src/test/resources/trade.json @@ -0,0 +1,17 @@ +[ + 0, + "tu", + [ + 262861164, + "tETHUSD", + 1530187145559, + 13787457748, + -0.04, + 435.8, + "EXCHANGE LIMIT", + 435.8, + 1, + -0.0104592, + "USD" + ] +] \ No newline at end of file diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java index 67d10c0c22a..1805ceee487 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/BitstampStreamingMarketDataService.java @@ -42,10 +42,9 @@ private Observable getOrderBook(String channelPrefix, CurrencyPair cu BitstampOrderBook orderBook = mapper.readValue(s, BitstampOrderBook.class); org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook bitstampOrderBook = new org.knowm.xchange.bitstamp.dto.marketdata.BitstampOrderBook( - new Date().getTime() / 1000L, + orderBook.getTimestamp(), orderBook.getBids(), orderBook.getAsks()); - return BitstampAdapters.adaptOrderBook(bitstampOrderBook, currencyPair); }); } diff --git a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java index c7e1f80d3f3..bdb7d112202 100644 --- a/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java +++ b/xchange-bitstamp/src/main/java/info/bitrich/xchangestream/bitstamp/dto/BitstampOrderBook.java @@ -6,10 +6,12 @@ import java.util.List; public class BitstampOrderBook { + private final long timestamp; private final List> bids; private final List> asks; - public BitstampOrderBook(@JsonProperty("bids") List> bids, @JsonProperty("asks") List> asks) { + public BitstampOrderBook(@JsonProperty("timestamp") long timestamp, @JsonProperty("bids") List> bids, @JsonProperty("asks") List> asks) { + this.timestamp = timestamp; this.bids = bids; this.asks = asks; } @@ -21,4 +23,8 @@ public List> getBids() { public List> getAsks() { return asks; } + + public long getTimestamp() { + return timestamp; + } } diff --git a/xchange-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampManualExample.java b/xchange-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampManualExample.java index 894b2af99b5..c2941bc6810 100644 --- a/xchange-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampManualExample.java +++ b/xchange-bitstamp/src/test/java/info/bitrich/xchangestream/bitstamp/BitstampManualExample.java @@ -24,14 +24,13 @@ public static void main(String[] args) { LOG.info("Trade {}", trade); }); - subscribe.dispose(); - - exchange.disconnect().subscribe(() -> LOG.info("Disconnected from the Exchange")); - try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } + + subscribe.dispose(); + exchange.disconnect().subscribe(() -> LOG.info("Disconnected from the Exchange")); } } From d08fb17ee2ee47bc4bfc0112660465599ca0e925 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 27 Jan 2019 12:04:07 +0000 Subject: [PATCH 2/8] Get authentication credentials from ExchangeSpecification as per standard --- .../bitfinex/BitfinexStreamingExchange.java | 19 ++++++++++--------- .../bitfinex/BitfinexManualAuthExample.java | 19 ++++++++++++++++--- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java index c76c2bfd9a2..1806c8742a9 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java @@ -3,8 +3,10 @@ import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingExchange; import info.bitrich.xchangestream.core.StreamingMarketDataService; + import io.reactivex.Completable; import io.reactivex.Observable; + import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.bitfinex.v1.BitfinexExchange; @@ -18,15 +20,19 @@ public class BitfinexStreamingExchange extends BitfinexExchange implements Strea private BitfinexStreamingMarketDataService streamingMarketDataService; private BitfinexStreamingRawService streamingAuthenticatedDataService; - public BitfinexStreamingExchange() { - this.streamingAuthenticatedDataService = new BitfinexStreamingRawService(API_URI); - } - @Override protected void initServices() { super.initServices(); this.streamingService = createStreamingService(); this.streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService); + this.streamingAuthenticatedDataService = createAuthenticatedStreamingService(); + } + + private BitfinexStreamingRawService createAuthenticatedStreamingService() { + BitfinexStreamingRawService result = new BitfinexStreamingRawService(API_URI); + result.setApiKey(exchangeSpecification.getApiKey()); + result.setApiSecret(exchangeSpecification.getSecretKey()); + return result; } private BitfinexStreamingService createStreamingService() { @@ -92,11 +98,6 @@ public boolean isAuthenticatedAlive() { return streamingAuthenticatedDataService.isSocketOpen(); } - public void setCredentials(String apiKey, String apiSecret) { - streamingAuthenticatedDataService.setApiKey(apiKey); - streamingAuthenticatedDataService.setApiSecret(apiSecret); - } - public BitfinexStreamingRawService getStreamingAuthenticatedDataService() { return streamingAuthenticatedDataService; } diff --git a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java index 0b8764bdb47..cebfff10dd1 100644 --- a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java +++ b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java @@ -1,6 +1,9 @@ package info.bitrich.xchangestream.bitfinex; import info.bitrich.xchangestream.core.StreamingExchangeFactory; + +import org.apache.commons.lang3.StringUtils; +import org.knowm.xchange.ExchangeSpecification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -8,9 +11,19 @@ public class BitfinexManualAuthExample { private static final Logger LOG = LoggerFactory.getLogger(BitfinexManualAuthExample.class); public static void main(String[] args) { - BitfinexStreamingExchange exchange = (BitfinexStreamingExchange) StreamingExchangeFactory.INSTANCE.createExchange( - BitfinexStreamingExchange.class.getName()); - exchange.setCredentials("API-KEY", "API-SECRET"); + + // Far safer than temporarily adding these to code that might get committed to VCS + String apiKey = System.getProperty("bitfinex-api-key"); + String apiSecret = System.getProperty("bitfinex-api-secret"); + if (StringUtils.isEmpty(apiKey) || StringUtils.isEmpty(apiSecret)) { + throw new IllegalArgumentException("Supply api details in VM args"); + } + + ExchangeSpecification spec = StreamingExchangeFactory.INSTANCE.createExchange( + BitfinexStreamingExchange.class.getName()).getDefaultExchangeSpecification(); + spec.setApiKey(apiKey); + spec.setSecretKey(apiSecret); + BitfinexStreamingExchange exchange = (BitfinexStreamingExchange) StreamingExchangeFactory.INSTANCE.createExchange(spec); exchange.connectToAuthenticated().blockingAwait(); exchange.getStreamingAuthenticatedDataService().getAuthenticatedTrades().subscribe( From 4fad1bbe5e832a8f9e3126986a3d6769549fc51e Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 27 Jan 2019 12:20:26 +0000 Subject: [PATCH 3/8] Connect to authenticated stream at the same time as connecting to the open stream, as per Binance/Coinbase Pro. Also don't require manual authentication. This should happen automatically on reconnection. --- .../bitfinex/BitfinexStreamingExchange.java | 47 +++++++++++-------- .../bitfinex/BitfinexStreamingRawService.java | 14 ++++-- .../bitfinex/BitfinexManualAuthExample.java | 44 +++++++++-------- 3 files changed, 60 insertions(+), 45 deletions(-) diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java index 1806c8742a9..cb3704b468d 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java @@ -7,9 +7,12 @@ import io.reactivex.Completable; import io.reactivex.Observable; +import org.apache.commons.lang3.StringUtils; import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.bitfinex.v1.BitfinexExchange; +import java.util.ArrayList; + /** * Created by Lukas Zaoralek on 7.11.17. */ @@ -24,15 +27,17 @@ public class BitfinexStreamingExchange extends BitfinexExchange implements Strea protected void initServices() { super.initServices(); this.streamingService = createStreamingService(); - this.streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService); this.streamingAuthenticatedDataService = createAuthenticatedStreamingService(); + this.streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService); } private BitfinexStreamingRawService createAuthenticatedStreamingService() { - BitfinexStreamingRawService result = new BitfinexStreamingRawService(API_URI); - result.setApiKey(exchangeSpecification.getApiKey()); - result.setApiSecret(exchangeSpecification.getSecretKey()); - return result; + if (StringUtils.isEmpty(exchangeSpecification.getApiKey())) + return null; + BitfinexStreamingRawService result = new BitfinexStreamingRawService(API_URI); + result.setApiKey(exchangeSpecification.getApiKey()); + result.setApiSecret(exchangeSpecification.getSecretKey()); + return result; } private BitfinexStreamingService createStreamingService() { @@ -43,12 +48,26 @@ private BitfinexStreamingService createStreamingService() { @Override public Completable connect(ProductSubscription... args) { - return streamingService.connect(); + if (streamingAuthenticatedDataService == null) { + return streamingService.connect(); + } else { + ArrayList result = new ArrayList<>(); + result.add(streamingService.connect()); + result.add(streamingAuthenticatedDataService.connect()); + return Completable.concat(result); + } } @Override public Completable disconnect() { - return streamingService.disconnect(); + if (streamingAuthenticatedDataService == null) { + return streamingService.disconnect(); + } else { + ArrayList result = new ArrayList<>(); + result.add(streamingService.disconnect()); + result.add(streamingAuthenticatedDataService.disconnect()); + return Completable.concat(result); + } } @Override @@ -82,20 +101,8 @@ public StreamingMarketDataService getStreamingMarketDataService() { @Override public void useCompressedMessages(boolean compressedMessages) { streamingService.useCompressedMessages(compressedMessages); } - public Completable connectToAuthenticated() { - return streamingAuthenticatedDataService.connect(); - } - - public void authenticate() { - streamingAuthenticatedDataService.auth(); - } - - public Completable disconnectToAuthenticated() { - return streamingAuthenticatedDataService.disconnect(); - } - public boolean isAuthenticatedAlive() { - return streamingAuthenticatedDataService.isSocketOpen(); + return streamingAuthenticatedDataService != null && streamingAuthenticatedDataService.isSocketOpen(); } public BitfinexStreamingRawService getStreamingAuthenticatedDataService() { diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java index 2b8ba04674a..30f483be673 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; + import info.bitrich.xchangestream.bitfinex.dto.BitfinexAuthRequestStatus; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuth; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance; @@ -10,14 +11,17 @@ import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; + import io.reactivex.Observable; import io.reactivex.subjects.PublishSubject; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import javax.xml.bind.DatatypeConverter; + import java.io.IOException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; @@ -37,10 +41,10 @@ public class BitfinexStreamingRawService extends JsonNettyStreamingService { private String apiKey; private String apiSecret; - private PublishSubject subjectPreTrade = PublishSubject.create(); - private PublishSubject subjectTrade = PublishSubject.create(); - private PublishSubject subjectOrder = PublishSubject.create(); - private PublishSubject subjectBalance = PublishSubject.create(); + private final PublishSubject subjectPreTrade = PublishSubject.create(); + private final PublishSubject subjectTrade = PublishSubject.create(); + private final PublishSubject subjectOrder = PublishSubject.create(); + private final PublishSubject subjectBalance = PublishSubject.create(); private final ObjectMapper objectMapper = new ObjectMapper(); public BitfinexStreamingRawService(String apiUrl) { @@ -248,7 +252,7 @@ private BitfinexWebSocketAuthOrder createOrderObject(JsonNode order) { ); } - public void auth() { + private void auth() { long nonce = System.currentTimeMillis() * 1000; String payload = "AUTH" + nonce; String signature; diff --git a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java index cebfff10dd1..68072b49419 100644 --- a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java +++ b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java @@ -25,27 +25,31 @@ public static void main(String[] args) { spec.setSecretKey(apiSecret); BitfinexStreamingExchange exchange = (BitfinexStreamingExchange) StreamingExchangeFactory.INSTANCE.createExchange(spec); - exchange.connectToAuthenticated().blockingAwait(); - exchange.getStreamingAuthenticatedDataService().getAuthenticatedTrades().subscribe( - t -> LOG.info("AUTH TRADE: {}", t), - throwable -> LOG.error("ERROR: ", throwable) - ); - exchange.getStreamingAuthenticatedDataService().getAuthenticatedPreTrades().subscribe( - t -> LOG.info("AUTH PRE TRADE: {}", t), - throwable -> LOG.error("ERROR: ", throwable) - ); - exchange.getStreamingAuthenticatedDataService().getAuthenticatedOrders().subscribe( - t -> LOG.info("AUTH ORDER: {}", t), - throwable -> LOG.error("ERROR: ", throwable) - ); - exchange.getStreamingAuthenticatedDataService().getAuthenticatedBalances().subscribe( - t -> LOG.info("AUTH BALANCE: {}", t), - throwable -> LOG.error("ERROR: ", throwable) - ); + exchange.connect().blockingAwait(); try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); + exchange.getStreamingAuthenticatedDataService().getAuthenticatedTrades().subscribe( + t -> LOG.info("AUTH TRADE: {}", t), + throwable -> LOG.error("ERROR: ", throwable) + ); + exchange.getStreamingAuthenticatedDataService().getAuthenticatedPreTrades().subscribe( + t -> LOG.info("AUTH PRE TRADE: {}", t), + throwable -> LOG.error("ERROR: ", throwable) + ); + exchange.getStreamingAuthenticatedDataService().getAuthenticatedOrders().subscribe( + t -> LOG.info("AUTH ORDER: {}", t), + throwable -> LOG.error("ERROR: ", throwable) + ); + exchange.getStreamingAuthenticatedDataService().getAuthenticatedBalances().subscribe( + t -> LOG.info("AUTH BALANCE: {}", t), + throwable -> LOG.error("ERROR: ", throwable) + ); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } finally { + exchange.disconnect().blockingAwait(); } } } From 8723e53a7ad7d9eb14ba066dac93cf3b8d2a6944 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 27 Jan 2019 12:22:01 +0000 Subject: [PATCH 4/8] Fix warnings --- .../BitfinexStreamingMarketDataService.java | 22 +++++++++++++------ .../bitfinex/BitfinexStreamingService.java | 4 +++- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java index 6d3c9be9c35..8b0412a55a6 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java @@ -1,33 +1,41 @@ package info.bitrich.xchangestream.bitfinex; -import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.bitfinex.dto.*; + +import info.bitrich.xchangestream.bitfinex.dto.BitfinexOrderbook; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketOrderbookTransaction; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketSnapshotOrderbook; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketSnapshotTrades; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketTickerTransaction; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketTradesTransaction; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketUpdateOrderbook; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebsocketUpdateTrade; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; + import io.reactivex.Observable; + import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; import org.knowm.xchange.dto.marketdata.Trade; import org.knowm.xchange.dto.marketdata.Trades; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import static org.knowm.xchange.bitfinex.v1.BitfinexAdapters.*; +import static org.knowm.xchange.bitfinex.v1.BitfinexAdapters.adaptOrderBook; +import static org.knowm.xchange.bitfinex.v1.BitfinexAdapters.adaptTicker; +import static org.knowm.xchange.bitfinex.v1.BitfinexAdapters.adaptTrades; /** * Created by Lukas Zaoralek on 7.11.17. */ public class BitfinexStreamingMarketDataService implements StreamingMarketDataService { - private static final Logger LOG = LoggerFactory.getLogger(BitfinexStreamingMarketDataService.class); private final BitfinexStreamingService service; - private Map orderbooks = new HashMap<>(); + private final Map orderbooks = new HashMap<>(); public BitfinexStreamingMarketDataService(BitfinexStreamingService service) { this.service = service; diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java index 6d99d0bafd2..4c4ec930499 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java @@ -2,12 +2,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.bitfinex.dto.BitfinexAuthRequestStatus; + import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketSubscriptionMessage; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketUnSubscriptionMessage; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; + import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; + import org.knowm.xchange.exceptions.ExchangeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 0cadf03562dad000c587187e571d93c35c1fc43d Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 27 Jan 2019 12:24:31 +0000 Subject: [PATCH 5/8] Use the object mapper from the superclass --- .../xchangestream/bitfinex/BitfinexStreamingRawService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java index 30f483be673..c8a0274ad27 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import info.bitrich.xchangestream.bitfinex.dto.BitfinexAuthRequestStatus; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuth; @@ -45,7 +44,6 @@ public class BitfinexStreamingRawService extends JsonNettyStreamingService { private final PublishSubject subjectTrade = PublishSubject.create(); private final PublishSubject subjectOrder = PublishSubject.create(); private final PublishSubject subjectBalance = PublishSubject.create(); - private final ObjectMapper objectMapper = new ObjectMapper(); public BitfinexStreamingRawService(String apiUrl) { super(apiUrl, Integer.MAX_VALUE); From fde38843f3a3bf52d57fbbacc055c41137593088 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 27 Jan 2019 12:33:08 +0000 Subject: [PATCH 6/8] Move authenticated streams up to the market data service in line with Binance --- .../bitfinex/BitfinexStreamingExchange.java | 12 +++---- .../BitfinexStreamingMarketDataService.java | 32 ++++++++++++++++++- .../bitfinex/BitfinexStreamingRawService.java | 8 ++--- .../bitfinex/BitfinexManualAuthExample.java | 8 ++--- .../BitfinexStreamingServiceTest.java | 6 ++-- 5 files changed, 47 insertions(+), 19 deletions(-) diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java index cb3704b468d..80d3fe35c60 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java @@ -2,7 +2,6 @@ import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingExchange; -import info.bitrich.xchangestream.core.StreamingMarketDataService; import io.reactivex.Completable; import io.reactivex.Observable; @@ -17,7 +16,8 @@ * Created by Lukas Zaoralek on 7.11.17. */ public class BitfinexStreamingExchange extends BitfinexExchange implements StreamingExchange { - private static final String API_URI = "wss://api.bitfinex.com/ws/2"; + + static final String API_URI = "wss://api.bitfinex.com/ws/2"; private BitfinexStreamingService streamingService; private BitfinexStreamingMarketDataService streamingMarketDataService; @@ -28,7 +28,7 @@ protected void initServices() { super.initServices(); this.streamingService = createStreamingService(); this.streamingAuthenticatedDataService = createAuthenticatedStreamingService(); - this.streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService); + this.streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService, streamingAuthenticatedDataService); } private BitfinexStreamingRawService createAuthenticatedStreamingService() { @@ -94,7 +94,7 @@ public ExchangeSpecification getDefaultExchangeSpecification() { } @Override - public StreamingMarketDataService getStreamingMarketDataService() { + public BitfinexStreamingMarketDataService getStreamingMarketDataService() { return streamingMarketDataService; } @@ -104,8 +104,4 @@ public StreamingMarketDataService getStreamingMarketDataService() { public boolean isAuthenticatedAlive() { return streamingAuthenticatedDataService != null && streamingAuthenticatedDataService.isSocketOpen(); } - - public BitfinexStreamingRawService getStreamingAuthenticatedDataService() { - return streamingAuthenticatedDataService; - } } diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java index 8b0412a55a6..45430b57331 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java @@ -3,6 +3,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import info.bitrich.xchangestream.bitfinex.dto.BitfinexOrderbook; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthOrder; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketOrderbookTransaction; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketSnapshotOrderbook; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketSnapshotTrades; @@ -20,6 +24,7 @@ import org.knowm.xchange.dto.marketdata.Ticker; import org.knowm.xchange.dto.marketdata.Trade; import org.knowm.xchange.dto.marketdata.Trades; +import org.knowm.xchange.exceptions.ExchangeSecurityException; import java.util.HashMap; import java.util.Map; @@ -34,11 +39,13 @@ public class BitfinexStreamingMarketDataService implements StreamingMarketDataService { private final BitfinexStreamingService service; + private final BitfinexStreamingRawService authenticatedService; private final Map orderbooks = new HashMap<>(); - public BitfinexStreamingMarketDataService(BitfinexStreamingService service) { + public BitfinexStreamingMarketDataService(BitfinexStreamingService service, BitfinexStreamingRawService authenticatedService) { this.service = service; + this.authenticatedService = authenticatedService; } @Override @@ -103,4 +110,27 @@ public Observable getTrades(CurrencyPair currencyPair, Object... args) { return adaptedTrades.getTrades(); }); } + + public Observable getRawAuthenticatedOrders() { + return checkAuthenticated().getAuthenticatedOrders(); + } + + public Observable getRawAuthenticatedPreTrades() { + return checkAuthenticated().getAuthenticatedPreTrades(); + } + + public Observable getRawAuthenticatedTrades() { + return checkAuthenticated().getAuthenticatedTrades(); + } + + public Observable getRawAuthenticatedBalances() { + return checkAuthenticated().getAuthenticatedBalances(); + } + + private BitfinexStreamingRawService checkAuthenticated() { + if (authenticatedService == null) { + throw new ExchangeSecurityException("Cannot return authenticated orders. Not authenticated."); + } + return authenticatedService; + } } diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java index c8a0274ad27..c59ce407076 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java @@ -287,19 +287,19 @@ private void sendMessage(Object message) { } } - public Observable getAuthenticatedOrders() { + Observable getAuthenticatedOrders() { return subjectOrder.share(); } - public Observable getAuthenticatedPreTrades() { + Observable getAuthenticatedPreTrades() { return subjectPreTrade.share(); } - public Observable getAuthenticatedTrades() { + Observable getAuthenticatedTrades() { return subjectTrade.share(); } - public Observable getAuthenticatedBalances() { + Observable getAuthenticatedBalances() { return subjectBalance.share(); } } diff --git a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java index 68072b49419..9bf1213269f 100644 --- a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java +++ b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java @@ -27,19 +27,19 @@ public static void main(String[] args) { exchange.connect().blockingAwait(); try { - exchange.getStreamingAuthenticatedDataService().getAuthenticatedTrades().subscribe( + exchange.getStreamingMarketDataService().getRawAuthenticatedTrades().subscribe( t -> LOG.info("AUTH TRADE: {}", t), throwable -> LOG.error("ERROR: ", throwable) ); - exchange.getStreamingAuthenticatedDataService().getAuthenticatedPreTrades().subscribe( + exchange.getStreamingMarketDataService().getRawAuthenticatedPreTrades().subscribe( t -> LOG.info("AUTH PRE TRADE: {}", t), throwable -> LOG.error("ERROR: ", throwable) ); - exchange.getStreamingAuthenticatedDataService().getAuthenticatedOrders().subscribe( + exchange.getStreamingMarketDataService().getRawAuthenticatedOrders().subscribe( t -> LOG.info("AUTH ORDER: {}", t), throwable -> LOG.error("ERROR: ", throwable) ); - exchange.getStreamingAuthenticatedDataService().getAuthenticatedBalances().subscribe( + exchange.getStreamingMarketDataService().getRawAuthenticatedBalances().subscribe( t -> LOG.info("AUTH BALANCE: {}", t), throwable -> LOG.error("ERROR: ", throwable) ); diff --git a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java index e3b676f1e08..be2d09150a8 100644 --- a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java +++ b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java @@ -2,11 +2,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; + import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthOrder; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade; + import io.reactivex.observers.TestObserver; + import org.junit.Before; import org.junit.Test; @@ -22,8 +25,7 @@ public class BitfinexStreamingServiceTest { @Before public void setUp() { - BitfinexStreamingExchange bitfinexStreamingExchange = new BitfinexStreamingExchange(); - service = bitfinexStreamingExchange.getStreamingAuthenticatedDataService(); + service = new BitfinexStreamingRawService(BitfinexStreamingExchange.API_URI); } @Test From c58d0eacf7cf81662657058d01af098ee6a7ce88 Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 27 Jan 2019 14:47:31 +0000 Subject: [PATCH 7/8] Moved authenticated data into the same socket connection as the rest of the --- .../netty/JsonNettyStreamingService.java | 10 + .../bitfinex/BitfinexStreamingAdapters.java | 144 +++++++++ .../bitfinex/BitfinexStreamingExchange.java | 41 +-- .../BitfinexStreamingMarketDataService.java | 20 +- .../bitfinex/BitfinexStreamingRawService.java | 305 ------------------ .../bitfinex/BitfinexStreamingService.java | 164 +++++++++- .../bitfinex/BitfinexManualAuthExample.java | 7 + .../BitfinexStreamingServiceTest.java | 11 +- 8 files changed, 341 insertions(+), 361 deletions(-) create mode 100644 xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingAdapters.java delete mode 100644 xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java diff --git a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/JsonNettyStreamingService.java b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/JsonNettyStreamingService.java index fe73f87e279..df36b5cd409 100644 --- a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/JsonNettyStreamingService.java +++ b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/JsonNettyStreamingService.java @@ -1,8 +1,10 @@ package info.bitrich.xchangestream.service.netty; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeType; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,4 +44,12 @@ public void messageHandler(String message) { handleMessage(jsonNode); } } + + protected void sendObjectMessage(Object message) { + try { + sendMessage(objectMapper.writeValueAsString(message)); + } catch (JsonProcessingException e) { + LOG.error("Error creating json message: {}", e.getMessage()); + } + } } diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingAdapters.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingAdapters.java new file mode 100644 index 00000000000..f6a0085607a --- /dev/null +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingAdapters.java @@ -0,0 +1,144 @@ +package info.bitrich.xchangestream.bitfinex; + +import com.fasterxml.jackson.databind.JsonNode; + +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthOrder; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.util.stream.Stream; + +import static java.util.stream.StreamSupport.stream; + +class BitfinexStreamingAdapters { + + private static final Logger LOG = LoggerFactory.getLogger(BitfinexStreamingAdapters.class); + + static BitfinexWebSocketAuthPreTrade adaptPreTrade(JsonNode preTrade) { + if (preTrade.size() < 12) { + LOG.error("addPreTrade unexpected record size={}, record={}", preTrade.size(), preTrade.toString()); + return null; + } + long id = preTrade.get(0).longValue(); + String pair = preTrade.get(1).textValue(); + long mtsCreate = preTrade.get(2).longValue(); + long orderId = preTrade.get(3).longValue(); + BigDecimal execAmount = preTrade.get(4).decimalValue(); + BigDecimal execPrice = preTrade.get(5).decimalValue(); + String orderType = preTrade.get(6).textValue(); + BigDecimal orderPrice = preTrade.get(7).decimalValue(); + int maker = preTrade.get(8).intValue(); + BitfinexWebSocketAuthPreTrade preTradeObject = new BitfinexWebSocketAuthPreTrade(id, pair, mtsCreate, orderId, + execAmount, execPrice, orderType, orderPrice, maker); + LOG.debug("New pre trade: {}", preTradeObject); + return preTradeObject; + } + + static BitfinexWebSocketAuthTrade adaptTrade(JsonNode trade) { + if (trade.size() < 11) { + LOG.error("addTrade unexpected record size={}, record={}", trade.size(), trade.toString()); + return null; + } + long id = trade.get(0).longValue(); + String pair = trade.get(1).textValue(); + long mtsCreate = trade.get(2).longValue(); + long orderId = trade.get(3).longValue(); + BigDecimal execAmount = trade.get(4).decimalValue(); + BigDecimal execPrice = trade.get(5).decimalValue(); + String orderType = trade.get(6).textValue(); + BigDecimal orderPrice = trade.get(7).decimalValue(); + int maker = trade.get(8).intValue(); + BigDecimal fee = trade.get(9).decimalValue(); + String currency = trade.get(10).textValue(); + BitfinexWebSocketAuthTrade tradeObject = new BitfinexWebSocketAuthTrade( + id, pair, mtsCreate, orderId, execAmount, execPrice, orderType, orderPrice, maker, fee, currency + ); + LOG.debug("New trade: {}", tradeObject); + return tradeObject; + } + + static Stream adaptOrders(JsonNode orders) { + Iterable iterator = () -> orders.iterator(); + return stream(iterator.spliterator(), false) + .map(BitfinexStreamingAdapters::createOrderObject) + .filter(o -> o != null) + .peek(o -> LOG.debug("New order: {}", o)); + } + + static BitfinexWebSocketAuthOrder adaptOrder(JsonNode order) { + BitfinexWebSocketAuthOrder orderObject = createOrderObject(order); + if (orderObject == null) { + return null; + } + LOG.debug("Updated order: {}", orderObject); + return orderObject; + } + + static BitfinexWebSocketAuthBalance adaptBalance(JsonNode balance) { + BitfinexWebSocketAuthBalance balanceObject = createBalanceObject(balance); + if (balanceObject == null) { + return null; + } + LOG.debug("Balance: {}", balanceObject); + return balanceObject; + } + + static Stream adaptBalances(JsonNode balances) { + Iterable iterator = () -> balances.iterator(); + return stream(iterator.spliterator(), false) + .map(BitfinexStreamingAdapters::createBalanceObject) + .filter(o -> o != null) + .peek(o -> LOG.debug("Balance: {}", o)); + } + + static private BitfinexWebSocketAuthBalance createBalanceObject(JsonNode balance) { + if (balance.size() < 5) { + LOG.error("createBalanceObject unexpected record size={}, record={}", balance.size(), balance.toString()); + return null; + } + + String walletType = balance.get(0).textValue(); + String currency = balance.get(1).textValue(); + BigDecimal balanceValue = balance.get(2).decimalValue(); + BigDecimal unsettledInterest = balance.get(3).decimalValue(); + BigDecimal balanceAvailable = balance.get(4).asText().equals("null") ? null : balance.get(4).decimalValue(); + + return new BitfinexWebSocketAuthBalance(walletType, currency, balanceValue, unsettledInterest, balanceAvailable); + } + + static private BitfinexWebSocketAuthOrder createOrderObject(JsonNode order) { + if (order.size() < 32) { + LOG.error("createOrderObject unexpected record size={}, record={}", order.size(), order.toString()); + return null; + } + + long id = order.get(0).longValue(); + long groupId = order.get(1).longValue(); + long cid = order.get(2).longValue(); + String symbol = order.get(3).textValue(); + long mtsCreate = order.get(4).longValue(); + long mtsUpdate = order.get(5).longValue(); + BigDecimal amount = order.get(6).decimalValue(); + BigDecimal amountOrig = order.get(7).decimalValue(); + String type = order.get(8).textValue(); + String typePrev = order.get(9).textValue(); + int flags = order.get(12).intValue(); + String orderStatus = order.get(13).textValue(); + BigDecimal price = order.get(16).decimalValue(); + BigDecimal priceAvg = order.get(17).decimalValue(); + BigDecimal priceTrailing = order.get(18).decimalValue(); + BigDecimal priceAuxLimit = order.get(19).decimalValue(); + long placedId = order.get(25).longValue(); + + return new BitfinexWebSocketAuthOrder( + id, groupId, cid, symbol, mtsCreate, mtsUpdate, amount, amountOrig, + type, typePrev, orderStatus, price, priceAvg, priceTrailing, + priceAuxLimit, placedId, flags + ); + } +} \ No newline at end of file diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java index 80d3fe35c60..1b836878066 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingExchange.java @@ -10,8 +10,6 @@ import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.bitfinex.v1.BitfinexExchange; -import java.util.ArrayList; - /** * Created by Lukas Zaoralek on 7.11.17. */ @@ -21,53 +19,32 @@ public class BitfinexStreamingExchange extends BitfinexExchange implements Strea private BitfinexStreamingService streamingService; private BitfinexStreamingMarketDataService streamingMarketDataService; - private BitfinexStreamingRawService streamingAuthenticatedDataService; @Override protected void initServices() { super.initServices(); this.streamingService = createStreamingService(); - this.streamingAuthenticatedDataService = createAuthenticatedStreamingService(); - this.streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService, streamingAuthenticatedDataService); - } - - private BitfinexStreamingRawService createAuthenticatedStreamingService() { - if (StringUtils.isEmpty(exchangeSpecification.getApiKey())) - return null; - BitfinexStreamingRawService result = new BitfinexStreamingRawService(API_URI); - result.setApiKey(exchangeSpecification.getApiKey()); - result.setApiSecret(exchangeSpecification.getSecretKey()); - return result; + this.streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService); } private BitfinexStreamingService createStreamingService() { - BitfinexStreamingService streamingService = new BitfinexStreamingService(API_URI); + BitfinexStreamingService streamingService = new BitfinexStreamingService(API_URI, getNonceFactory()); applyStreamingSpecification(getExchangeSpecification(), streamingService); + if (StringUtils.isNotEmpty(exchangeSpecification.getApiKey())) { + streamingService.setApiKey(exchangeSpecification.getApiKey()); + streamingService.setApiSecret(exchangeSpecification.getSecretKey()); + } return streamingService; } @Override public Completable connect(ProductSubscription... args) { - if (streamingAuthenticatedDataService == null) { - return streamingService.connect(); - } else { - ArrayList result = new ArrayList<>(); - result.add(streamingService.connect()); - result.add(streamingAuthenticatedDataService.connect()); - return Completable.concat(result); - } + return streamingService.connect(); } @Override public Completable disconnect() { - if (streamingAuthenticatedDataService == null) { - return streamingService.disconnect(); - } else { - ArrayList result = new ArrayList<>(); - result.add(streamingService.disconnect()); - result.add(streamingAuthenticatedDataService.disconnect()); - return Completable.concat(result); - } + return streamingService.disconnect(); } @Override @@ -102,6 +79,6 @@ public BitfinexStreamingMarketDataService getStreamingMarketDataService() { public void useCompressedMessages(boolean compressedMessages) { streamingService.useCompressedMessages(compressedMessages); } public boolean isAuthenticatedAlive() { - return streamingAuthenticatedDataService != null && streamingAuthenticatedDataService.isSocketOpen(); + return streamingService != null && streamingService.isAuthenticated(); } } diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java index 45430b57331..25d37911fbb 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingMarketDataService.java @@ -24,7 +24,6 @@ import org.knowm.xchange.dto.marketdata.Ticker; import org.knowm.xchange.dto.marketdata.Trade; import org.knowm.xchange.dto.marketdata.Trades; -import org.knowm.xchange.exceptions.ExchangeSecurityException; import java.util.HashMap; import java.util.Map; @@ -39,13 +38,11 @@ public class BitfinexStreamingMarketDataService implements StreamingMarketDataService { private final BitfinexStreamingService service; - private final BitfinexStreamingRawService authenticatedService; private final Map orderbooks = new HashMap<>(); - public BitfinexStreamingMarketDataService(BitfinexStreamingService service, BitfinexStreamingRawService authenticatedService) { + public BitfinexStreamingMarketDataService(BitfinexStreamingService service) { this.service = service; - this.authenticatedService = authenticatedService; } @Override @@ -112,25 +109,18 @@ public Observable getTrades(CurrencyPair currencyPair, Object... args) { } public Observable getRawAuthenticatedOrders() { - return checkAuthenticated().getAuthenticatedOrders(); + return service.getAuthenticatedOrders(); } public Observable getRawAuthenticatedPreTrades() { - return checkAuthenticated().getAuthenticatedPreTrades(); + return service.getAuthenticatedPreTrades(); } public Observable getRawAuthenticatedTrades() { - return checkAuthenticated().getAuthenticatedTrades(); + return service.getAuthenticatedTrades(); } public Observable getRawAuthenticatedBalances() { - return checkAuthenticated().getAuthenticatedBalances(); - } - - private BitfinexStreamingRawService checkAuthenticated() { - if (authenticatedService == null) { - throw new ExchangeSecurityException("Cannot return authenticated orders. Not authenticated."); - } - return authenticatedService; + return service.getAuthenticatedBalances(); } } diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java deleted file mode 100644 index c59ce407076..00000000000 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingRawService.java +++ /dev/null @@ -1,305 +0,0 @@ -package info.bitrich.xchangestream.bitfinex; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; - -import info.bitrich.xchangestream.bitfinex.dto.BitfinexAuthRequestStatus; -import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuth; -import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance; -import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthOrder; -import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade; -import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade; -import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; - -import io.reactivex.Observable; -import io.reactivex.subjects.PublishSubject; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.crypto.Mac; -import javax.crypto.spec.SecretKeySpec; -import javax.xml.bind.DatatypeConverter; - -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; - -import static org.knowm.xchange.service.BaseParamsDigest.HMAC_SHA_384; - -public class BitfinexStreamingRawService extends JsonNettyStreamingService { - private static final Logger LOG = LoggerFactory.getLogger(BitfinexStreamingRawService.class); - - private static final String AUTH = "auth"; - private static final String STATUS = "status"; - private static final String MESSAGE = "msg"; - private static final String EVENT = "event"; - - private String apiKey; - private String apiSecret; - - private final PublishSubject subjectPreTrade = PublishSubject.create(); - private final PublishSubject subjectTrade = PublishSubject.create(); - private final PublishSubject subjectOrder = PublishSubject.create(); - private final PublishSubject subjectBalance = PublishSubject.create(); - - public BitfinexStreamingRawService(String apiUrl) { - super(apiUrl, Integer.MAX_VALUE); - } - - @Override - protected String getChannelNameFromMessage(JsonNode message) throws IOException { - return null; - } - - @Override - public String getSubscribeMessage(String channelName, Object... args) throws IOException { - return null; - } - - @Override - public String getUnsubscribeMessage(String channelName) throws IOException { - return null; - } - - @Override - public void messageHandler(String message) { - JsonNode jsonNode; - try { - jsonNode = objectMapper.readTree(message); - } catch (IOException e) { - LOG.error("Error parsing incoming message to JSON: {}", message); - subjectOrder.onError(e); - return; - } - handleMessage(jsonNode); - } - - @Override - protected void handleMessage(JsonNode message) { - LOG.debug("Receiving message: {}", message); - - if (message.isArray()) { - String type = message.get(1).asText(); - if (type.equals("hb")) { - return; - } - } - - JsonNode event = message.get(EVENT); - if (event != null && event.textValue().equalsIgnoreCase("info")) - auth(); - - if (event != null && event.textValue().equals(AUTH) && message.get(STATUS).textValue().equals(BitfinexAuthRequestStatus.FAILED.name())) { - LOG.error("Authentication error: {}", message.get(MESSAGE)); - return; - } - - if (message.isArray() && message.size() == 3) { - String type = message.get(1).asText(); - JsonNode object = message.get(2); - switch (type) { - case "te": - addPreTrade(object); - break; - case "tu": - addTrade(object); - break; - case "os": - addOrder(object); - break; - case "on": - case "ou": - case "oc": - updateOrder(object); - break; - case "ws": - updateBalances(object); - break; - case "wu": - updateBalance(object); - break; - } - } - } - - private void addPreTrade(JsonNode preTrade) { - if (preTrade.size() < 12) { - LOG.error("addPreTrade unexpected record size={}, record={}", preTrade.size(), preTrade.toString()); - return; - } - long id = preTrade.get(0).longValue(); - String pair = preTrade.get(1).textValue(); - long mtsCreate = preTrade.get(2).longValue(); - long orderId = preTrade.get(3).longValue(); - BigDecimal execAmount = preTrade.get(4).decimalValue(); - BigDecimal execPrice = preTrade.get(5).decimalValue(); - String orderType = preTrade.get(6).textValue(); - BigDecimal orderPrice = preTrade.get(7).decimalValue(); - int maker = preTrade.get(8).intValue(); - BitfinexWebSocketAuthPreTrade preTradeObject = new BitfinexWebSocketAuthPreTrade(id, pair, mtsCreate, orderId, - execAmount, execPrice, orderType, orderPrice, maker); - LOG.debug("New pre trade: {}", preTradeObject); - subjectPreTrade.onNext(preTradeObject); - } - - private void addTrade(JsonNode trade) { - if (trade.size() < 11) { - LOG.error("addTrade unexpected record size={}, record={}", trade.size(), trade.toString()); - return; - } - long id = trade.get(0).longValue(); - String pair = trade.get(1).textValue(); - long mtsCreate = trade.get(2).longValue(); - long orderId = trade.get(3).longValue(); - BigDecimal execAmount = trade.get(4).decimalValue(); - BigDecimal execPrice = trade.get(5).decimalValue(); - String orderType = trade.get(6).textValue(); - BigDecimal orderPrice = trade.get(7).decimalValue(); - int maker = trade.get(8).intValue(); - BigDecimal fee = trade.get(9).decimalValue(); - String currency = trade.get(10).textValue(); - BitfinexWebSocketAuthTrade tradeObject = new BitfinexWebSocketAuthTrade( - id, pair, mtsCreate, orderId, execAmount, execPrice, orderType, orderPrice, maker, fee, currency - ); - LOG.debug("New trade: {}", tradeObject); - subjectTrade.onNext(tradeObject); - } - - private void addOrder(JsonNode orders) { - for (final JsonNode order : orders) { - BitfinexWebSocketAuthOrder orderObject = createOrderObject(order); - if (orderObject != null) { - LOG.debug("New order: {}", orderObject); - subjectOrder.onNext(orderObject); - } - } - } - - private void updateOrder(JsonNode order) { - BitfinexWebSocketAuthOrder orderObject = createOrderObject(order); - if (orderObject != null) { - LOG.debug("Updated order: {}", orderObject); - subjectOrder.onNext(orderObject); - } - } - - private void updateBalance(JsonNode balance) { - BitfinexWebSocketAuthBalance balanceObject = createBalanceObject(balance); - if (balanceObject != null) { - LOG.debug("Balance: {}", balanceObject); - subjectBalance.onNext(balanceObject); - } - } - - private void updateBalances(JsonNode balances) { - for (final JsonNode balance : balances) { - BitfinexWebSocketAuthBalance balanceObject = createBalanceObject(balance); - if (balanceObject != null) { - LOG.debug("Balance: {}", balanceObject); - subjectBalance.onNext(balanceObject); - } - } - } - - private BitfinexWebSocketAuthBalance createBalanceObject(JsonNode balance) { - if (balance.size() < 5) { - LOG.error("createBalanceObject unexpected record size={}, record={}", balance.size(), balance.toString()); - return null; - } - - String walletType = balance.get(0).textValue(); - String currency = balance.get(1).textValue(); - BigDecimal balanceValue = balance.get(2).decimalValue(); - BigDecimal unsettledInterest = balance.get(3).decimalValue(); - BigDecimal balanceAvailable = balance.get(4).asText().equals("null") ? null : balance.get(4).decimalValue(); - - return new BitfinexWebSocketAuthBalance(walletType, currency, balanceValue, unsettledInterest, balanceAvailable); - } - - private BitfinexWebSocketAuthOrder createOrderObject(JsonNode order) { - if (order.size() < 32) { - LOG.error("createOrderObject unexpected record size={}, record={}", order.size(), order.toString()); - return null; - } - - long id = order.get(0).longValue(); - long groupId = order.get(1).longValue(); - long cid = order.get(2).longValue(); - String symbol = order.get(3).textValue(); - long mtsCreate = order.get(4).longValue(); - long mtsUpdate = order.get(5).longValue(); - BigDecimal amount = order.get(6).decimalValue(); - BigDecimal amountOrig = order.get(7).decimalValue(); - String type = order.get(8).textValue(); - String typePrev = order.get(9).textValue(); - int flags = order.get(12).intValue(); - String orderStatus = order.get(13).textValue(); - BigDecimal price = order.get(16).decimalValue(); - BigDecimal priceAvg = order.get(17).decimalValue(); - BigDecimal priceTrailing = order.get(18).decimalValue(); - BigDecimal priceAuxLimit = order.get(19).decimalValue(); - long placedId = order.get(25).longValue(); - - return new BitfinexWebSocketAuthOrder( - id, groupId, cid, symbol, mtsCreate, mtsUpdate, amount, amountOrig, - type, typePrev, orderStatus, price, priceAvg, priceTrailing, - priceAuxLimit, placedId, flags - ); - } - - private void auth() { - long nonce = System.currentTimeMillis() * 1000; - String payload = "AUTH" + nonce; - String signature; - try { - Mac macEncoder = Mac.getInstance(HMAC_SHA_384); - SecretKeySpec secretKeySpec = new SecretKeySpec(apiSecret.getBytes(StandardCharsets.UTF_8), HMAC_SHA_384); - macEncoder.init(secretKeySpec); - byte[] result = macEncoder.doFinal(payload.getBytes(StandardCharsets.UTF_8)); - signature = DatatypeConverter.printHexBinary(result); - } catch (NoSuchAlgorithmException | InvalidKeyException e) { - LOG.error("auth. Sign failed error={}", e.getMessage()); - return; - } - BitfinexWebSocketAuth message = new BitfinexWebSocketAuth( - apiKey, payload, String.valueOf(nonce), signature.toLowerCase() - ); - sendMessage(message); - } - - - public void setApiKey(String apiKey) { - this.apiKey = apiKey; - } - - public void setApiSecret(String apiSecret) { - this.apiSecret = apiSecret; - } - - private void sendMessage(Object message) { - try { - sendMessage(objectMapper.writeValueAsString(message)); - } catch (JsonProcessingException e) { - LOG.error("Error creating json message: {}", e.getMessage()); - } - } - - Observable getAuthenticatedOrders() { - return subjectOrder.share(); - } - - Observable getAuthenticatedPreTrades() { - return subjectPreTrade.share(); - } - - Observable getAuthenticatedTrades() { - return subjectTrade.share(); - } - - Observable getAuthenticatedBalances() { - return subjectBalance.share(); - } -} diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java index 4c4ec930499..a015a0fedd5 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java @@ -3,40 +3,86 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexAuthRequestStatus; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuth; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthOrder; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade; +import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketSubscriptionMessage; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketUnSubscriptionMessage; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; +import io.reactivex.Observable; +import io.reactivex.subjects.PublishSubject; +import org.apache.commons.lang3.StringUtils; import org.knowm.xchange.exceptions.ExchangeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.xml.bind.DatatypeConverter; + import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; +import static org.knowm.xchange.service.BaseParamsDigest.HMAC_SHA_384; + +import si.mazi.rescu.SynchronizedValueFactory; + /** * Created by Lukas Zaoralek on 7.11.17. */ public class BitfinexStreamingService extends JsonNettyStreamingService { + private static final Logger LOG = LoggerFactory.getLogger(BitfinexStreamingService.class); + static final String CHANNEL_USER_POSITIONS = "userPositions"; + static final String CHANNEL_USER_BALANCE_UPDATES = "userBalanceUpdates"; + static final String CHANNEL_USER_BALANCES = "userBalances"; + static final String CHANNEL_USER_ORDER_UPDATES = "userOrderUpdates"; + static final String CHANNEL_USER_ORDERS = "userOrders"; + static final String CHANNEL_USER_TRADES = "userTrades"; + static final String CHANNEL_USER_PRE_TRADES = "userPreTrades"; + private static final String INFO = "info"; private static final String ERROR = "error"; private static final String CHANNEL_ID = "chanId"; private static final String SUBSCRIBED = "subscribed"; private static final String UNSUBSCRIBED = "unsubscribed"; private static final String ERROR_CODE = "code"; + private static final String AUTH = "auth"; + private static final String STATUS = "status"; + private static final String MESSAGE = "msg"; + private static final String EVENT = "event"; + private static final String VERSION = "version"; + + private final PublishSubject subjectPreTrade = PublishSubject.create(); + private final PublishSubject subjectTrade = PublishSubject.create(); + private final PublishSubject subjectOrder = PublishSubject.create(); + private final PublishSubject subjectBalance = PublishSubject.create(); private static final int SUBSCRIPTION_FAILED = 10300; + private String apiKey; + private String apiSecret; + private final Map subscribedChannels = new HashMap<>(); - public BitfinexStreamingService(String apiUrl) { + private final SynchronizedValueFactory nonceFactory; + + public BitfinexStreamingService(String apiUrl, + SynchronizedValueFactory nonceFactory) { super(apiUrl, Integer.MAX_VALUE); + this.nonceFactory = nonceFactory; } @Override @@ -62,6 +108,7 @@ public void messageHandler(String message) { @Override protected void handleMessage(JsonNode message) { + if (message.isArray()) { String type = message.get(1).asText(); if (type.equals("hb")) { @@ -69,14 +116,24 @@ protected void handleMessage(JsonNode message) { } } - JsonNode event = message.get("event"); + JsonNode event = message.get(EVENT); if (event != null) { switch (event.textValue()) { case INFO: - JsonNode version = message.get("version"); + JsonNode version = message.get(VERSION); if (version != null) { LOG.debug("Bitfinex websocket API version: {}.", version.intValue()); } + if (isAuthenticated()) + auth(); + break; + case AUTH: + if (message.get(STATUS).textValue().equals(BitfinexAuthRequestStatus.FAILED.name())) { + LOG.error("Authentication error: {}", message.get(MESSAGE)); + } + if (message.get(STATUS).textValue().equals(BitfinexAuthRequestStatus.OK.name())) { + LOG.info("Authenticated successfully"); + } break; case SUBSCRIBED: { String channel = message.get("channel").asText(); @@ -104,12 +161,55 @@ protected void handleMessage(JsonNode message) { super.handleError(message, new ExchangeException("Error code: " + message.get(ERROR_CODE).asText())); break; } - } else super.handleMessage(message); + } else { + try { + if ("0".equals(getChannelNameFromMessage(message)) && message.isArray() && message.size() == 3) { + processAuthenticatedMessage(message); + return; + } + } catch (IOException e) { + throw new RuntimeException("Failed to get channel name from message", e); + } + super.handleMessage(message); + } + } + + private void processAuthenticatedMessage(JsonNode message) { + String type = message.get(1).asText(); + JsonNode object = message.get(2); + switch (type) { + case "te": + subjectPreTrade.onNext(BitfinexStreamingAdapters.adaptPreTrade(object)); + break; + case "tu": + subjectTrade.onNext(BitfinexStreamingAdapters.adaptTrade(object)); + break; + case "os": + BitfinexStreamingAdapters.adaptOrders(object).forEach(subjectOrder::onNext); + break; + case "on": + case "ou": + case "oc": + subjectOrder.onNext(BitfinexStreamingAdapters.adaptOrder(object)); + break; + case "ws": + BitfinexStreamingAdapters.adaptBalances(object).forEach(subjectBalance::onNext); + break; + case "wu": + subjectBalance.onNext(BitfinexStreamingAdapters.adaptBalance(object)); + break; + default: + // In case bitfinex adds new channels, ignore + } } @Override public String getSubscriptionUniqueId(String channelName, Object... args) { - return channelName + "-" + args[0].toString(); + if (args.length > 0) { + return channelName + "-" + args[0].toString(); + } else { + return channelName; + } } @Override @@ -120,9 +220,11 @@ protected String getChannelNameFromMessage(JsonNode message) throws IOException } else { chanId = message.get(0).asText(); } - if (chanId == null) throw new IOException("Can't find CHANNEL_ID value"); - return subscribedChannels.get(chanId); + String subscribedChannel = subscribedChannels.get(chanId); + if (subscribedChannel != null) + return subscribedChannel; + return chanId; // In case bitfinex adds new channels, just fallback to the name in the message } @Override @@ -158,4 +260,52 @@ public String getUnsubscribeMessage(String channelName) throws IOException { ObjectMapper objectMapper = StreamingObjectMapperHelper.getObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); } + + void setApiKey(String apiKey) { + this.apiKey = apiKey; + } + + void setApiSecret(String apiSecret) { + this.apiSecret = apiSecret; + } + + boolean isAuthenticated() { + return StringUtils.isNotEmpty(apiKey); + } + + private void auth() { + long nonce = nonceFactory.createValue(); + String payload = "AUTH" + nonce; + String signature; + try { + Mac macEncoder = Mac.getInstance(HMAC_SHA_384); + SecretKeySpec secretKeySpec = new SecretKeySpec(apiSecret.getBytes(StandardCharsets.UTF_8), HMAC_SHA_384); + macEncoder.init(secretKeySpec); + byte[] result = macEncoder.doFinal(payload.getBytes(StandardCharsets.UTF_8)); + signature = DatatypeConverter.printHexBinary(result); + } catch (NoSuchAlgorithmException | InvalidKeyException e) { + LOG.error("auth. Sign failed error={}", e.getMessage()); + return; + } + BitfinexWebSocketAuth message = new BitfinexWebSocketAuth( + apiKey, payload, String.valueOf(nonce), signature.toLowerCase() + ); + sendObjectMessage(message); + } + + Observable getAuthenticatedOrders() { + return subjectOrder.share(); + } + + Observable getAuthenticatedPreTrades() { + return subjectPreTrade.share(); + } + + Observable getAuthenticatedTrades() { + return subjectTrade.share(); + } + + Observable getAuthenticatedBalances() { + return subjectBalance.share(); + } } diff --git a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java index 9bf1213269f..d97c3ade4b0 100644 --- a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java +++ b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexManualAuthExample.java @@ -4,6 +4,7 @@ import org.apache.commons.lang3.StringUtils; import org.knowm.xchange.ExchangeSpecification; +import org.knowm.xchange.currency.CurrencyPair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,12 @@ public static void main(String[] args) { t -> LOG.info("AUTH BALANCE: {}", t), throwable -> LOG.error("ERROR: ", throwable) ); + + // Make sure we can still get unauthenticated data on the same socket + exchange.getStreamingMarketDataService().getTrades(CurrencyPair.BTC_USD).subscribe( + t -> LOG.info("PUBLIC TRADE: {}", t), + throwable -> LOG.error("ERROR: ", throwable) + ); try { Thread.sleep(10000); } catch (InterruptedException e) { diff --git a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java index be2d09150a8..5c399fb1281 100644 --- a/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java +++ b/xchange-bitfinex/src/test/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingServiceTest.java @@ -12,20 +12,27 @@ import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import java.io.IOException; import java.math.BigDecimal; import static org.assertj.core.api.Assertions.assertThat; +import si.mazi.rescu.SynchronizedValueFactory; + public class BitfinexStreamingServiceTest { - private BitfinexStreamingRawService service; + private BitfinexStreamingService service; private final ObjectMapper objectMapper = new ObjectMapper(); + @Mock SynchronizedValueFactory nonceFactory; + @Before public void setUp() { - service = new BitfinexStreamingRawService(BitfinexStreamingExchange.API_URI); + MockitoAnnotations.initMocks(this); + service = new BitfinexStreamingService(BitfinexStreamingExchange.API_URI, nonceFactory); } @Test From 419f1e9f76976dbb380a0966176bb07bfda3311c Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 27 Jan 2019 15:49:30 +0000 Subject: [PATCH 8/8] Improve null handling --- .../bitfinex/BitfinexStreamingAdapters.java | 12 ++++++++++-- .../bitfinex/BitfinexStreamingService.java | 16 ++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingAdapters.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingAdapters.java index f6a0085607a..761d4db5746 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingAdapters.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingAdapters.java @@ -7,6 +7,8 @@ import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade; import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade; +import io.reactivex.annotations.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,6 +21,7 @@ class BitfinexStreamingAdapters { private static final Logger LOG = LoggerFactory.getLogger(BitfinexStreamingAdapters.class); + @Nullable static BitfinexWebSocketAuthPreTrade adaptPreTrade(JsonNode preTrade) { if (preTrade.size() < 12) { LOG.error("addPreTrade unexpected record size={}, record={}", preTrade.size(), preTrade.toString()); @@ -39,6 +42,7 @@ static BitfinexWebSocketAuthPreTrade adaptPreTrade(JsonNode preTrade) { return preTradeObject; } + @Nullable static BitfinexWebSocketAuthTrade adaptTrade(JsonNode trade) { if (trade.size() < 11) { LOG.error("addTrade unexpected record size={}, record={}", trade.size(), trade.toString()); @@ -65,11 +69,12 @@ static BitfinexWebSocketAuthTrade adaptTrade(JsonNode trade) { static Stream adaptOrders(JsonNode orders) { Iterable iterator = () -> orders.iterator(); return stream(iterator.spliterator(), false) + .filter(o -> o.size() >= 32) .map(BitfinexStreamingAdapters::createOrderObject) - .filter(o -> o != null) .peek(o -> LOG.debug("New order: {}", o)); } + @Nullable static BitfinexWebSocketAuthOrder adaptOrder(JsonNode order) { BitfinexWebSocketAuthOrder orderObject = createOrderObject(order); if (orderObject == null) { @@ -79,6 +84,7 @@ static BitfinexWebSocketAuthOrder adaptOrder(JsonNode order) { return orderObject; } + @Nullable static BitfinexWebSocketAuthBalance adaptBalance(JsonNode balance) { BitfinexWebSocketAuthBalance balanceObject = createBalanceObject(balance); if (balanceObject == null) { @@ -91,11 +97,12 @@ static BitfinexWebSocketAuthBalance adaptBalance(JsonNode balance) { static Stream adaptBalances(JsonNode balances) { Iterable iterator = () -> balances.iterator(); return stream(iterator.spliterator(), false) + .filter(o -> o.size() >= 5) .map(BitfinexStreamingAdapters::createBalanceObject) - .filter(o -> o != null) .peek(o -> LOG.debug("Balance: {}", o)); } + @Nullable static private BitfinexWebSocketAuthBalance createBalanceObject(JsonNode balance) { if (balance.size() < 5) { LOG.error("createBalanceObject unexpected record size={}, record={}", balance.size(), balance.toString()); @@ -111,6 +118,7 @@ static private BitfinexWebSocketAuthBalance createBalanceObject(JsonNode balance return new BitfinexWebSocketAuthBalance(walletType, currency, balanceValue, unsettledInterest, balanceAvailable); } + @Nullable static private BitfinexWebSocketAuthOrder createOrderObject(JsonNode order) { if (order.size() < 32) { LOG.error("createOrderObject unexpected record size={}, record={}", order.size(), order.toString()); diff --git a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java index a015a0fedd5..6c820007f18 100644 --- a/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java +++ b/xchange-bitfinex/src/main/java/info/bitrich/xchangestream/bitfinex/BitfinexStreamingService.java @@ -179,10 +179,14 @@ private void processAuthenticatedMessage(JsonNode message) { JsonNode object = message.get(2); switch (type) { case "te": - subjectPreTrade.onNext(BitfinexStreamingAdapters.adaptPreTrade(object)); + BitfinexWebSocketAuthPreTrade preTrade = BitfinexStreamingAdapters.adaptPreTrade(object); + if (preTrade != null) + subjectPreTrade.onNext(preTrade); break; case "tu": - subjectTrade.onNext(BitfinexStreamingAdapters.adaptTrade(object)); + BitfinexWebSocketAuthTrade trade = BitfinexStreamingAdapters.adaptTrade(object); + if (trade != null) + subjectTrade.onNext(trade); break; case "os": BitfinexStreamingAdapters.adaptOrders(object).forEach(subjectOrder::onNext); @@ -190,13 +194,17 @@ private void processAuthenticatedMessage(JsonNode message) { case "on": case "ou": case "oc": - subjectOrder.onNext(BitfinexStreamingAdapters.adaptOrder(object)); + BitfinexWebSocketAuthOrder order = BitfinexStreamingAdapters.adaptOrder(object); + if (order != null) + subjectOrder.onNext(order); break; case "ws": BitfinexStreamingAdapters.adaptBalances(object).forEach(subjectBalance::onNext); break; case "wu": - subjectBalance.onNext(BitfinexStreamingAdapters.adaptBalance(object)); + BitfinexWebSocketAuthBalance balance = BitfinexStreamingAdapters.adaptBalance(object); + if (balance != null) + subjectBalance.onNext(balance); break; default: // In case bitfinex adds new channels, ignore