Skip to content

Commit

Permalink
Fix issue where return value was not always sent to Zorro, causing it…
Browse files Browse the repository at this point in the history
… to freeze
  • Loading branch information
Daniel Lindberg committed Jun 21, 2017
1 parent 002f6d6 commit 856b1f2
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 39 deletions.
12 changes: 1 addition & 11 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.6</version>
<version>2.1.0</version>
</dependency>

<dependency>
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/danlind/igz/ZorroBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.danlind.igz.config.ZorroReturnValues;
import com.danlind.igz.domain.types.Epic;
import com.danlind.igz.handler.*;
import com.danlind.igz.misc.MarketDataProvider;
import com.danlind.igz.misc.TimeConvert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -153,5 +152,4 @@ public int doSetOrderText(final String orderText) {
// Zorro.logError("doSetOrderText for " + orderText + " called but not yet supported!");
return ZorroReturnValues.BROKER_COMMAND_OK.getValue();
}

}
2 changes: 2 additions & 0 deletions src/main/java/com/danlind/igz/adapter/RestApiAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,11 @@ public Observable<GetDealConfirmationV1Response> getDealConfirmationObservable(S
});
} catch (HttpClientErrorException e) {
LOG.error("Exception when getting deal confirmation for deal reference {}, error was {}", dealReference, e.getResponseBodyAsString(), e);
Zorro.indicateError();
return Observable.error(e);
} catch (Exception e) {
LOG.error("Exception when getting deal confirmation for deal reference {}", dealReference, e);
Zorro.indicateError();
return Observable.error(e);
}
});
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/danlind/igz/brokerapi/BrokerAsset.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public int subscribeToLighstreamerTickUpdates(Epic epic) {
.subscribe(
priceDetails -> priceDataMap.put(priceDetails.getEpic(), priceDetails),
e -> {
LOG.error("Error subscribing to tick observable", e);
LOG.error("Error subscribing to tick observable for {}",epic.getName(), e);
Zorro.indicateError();
},
() -> {
//TODO: How to handle close of stream on weekends?
//TODO: How to handle close of stream on weekends? (Weekend = 7 is the obvious option for now)
LOG.info("Received complete signal from TickObservable for epic {}", epic.getName());
marketDataProvider.cancelSubscription();
marketDataProvider.cancelSubscription(epic);
historyHandler.cancelSubscription();
}
);
Expand All @@ -76,7 +76,7 @@ public int subscribeToLighstreamerTickUpdates(Epic epic) {
.subscribe(
volume -> volumeProvider.updateRollingVolume(epic, volume),
e -> {
LOG.error("Error subscribing to volume observable", e);
LOG.error("Error subscribing to volume observable for {}", epic.getName(), e);
Zorro.indicateError();
},
() -> LOG.info("Received complete signal from VolumeObservable for epic {}", epic.getName())
Expand Down
27 changes: 22 additions & 5 deletions src/main/java/com/danlind/igz/brokerapi/BrokerBuy.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import com.danlind.igz.ig.api.client.rest.dto.positions.otc.createOTCPositionV2.Direction;
import com.danlind.igz.ig.api.client.rest.dto.positions.otc.createOTCPositionV2.OrderType;
import com.danlind.igz.misc.RetryWithDelay;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import net.openhft.chronicle.map.ChronicleMap;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
Expand Down Expand Up @@ -45,14 +48,16 @@ public int createPosition(final Epic epic,
double stopDistance = tradeParams[1];
CreateOTCPositionV2Request createPositionRequest = createPositionRequest(epic, numberOfContracts, stopDistance);

//Disposable progress = indicateProgress();

return restApiAdapter.createPosition(createPositionRequest)
.subscribeOn(Schedulers.io())
.doOnNext(dealReference -> LOG.debug("Got dealReference {} when attempting to open position", dealReference.getValue()))
.delay(1500, TimeUnit.MILLISECONDS)
.delay(500, TimeUnit.MILLISECONDS)
.flatMap(dealReference -> restApiAdapter.getDealConfirmationObservable(dealReference.getValue())
.retryWhen(new RetryWithDelay(3, 1500))
.map(dealConfirmationResponse -> buyConfirmationHandler(dealConfirmationResponse, createPositionRequest.getDirection(), tradeParams))
)
.doOnError(e -> Zorro.indicateError())
.onErrorReturn(e -> ZorroReturnValues.BROKER_BUY_FAIL.getValue())
.blockingSingle();
}
Expand All @@ -73,7 +78,14 @@ private CreateOTCPositionV2Request createPositionRequest(Epic epic, double numbe
createPositionRequest.setExpiry(contractDetails.getExpiry());
createPositionRequest.setOrderType(OrderType.MARKET);
createPositionRequest.setCurrencyCode(contractDetails.getCurrencyCode());
createPositionRequest.setSize(BigDecimal.valueOf(Math.abs(numberOfContracts/contractDetails.getLotAmount())));

if (contractDetails.getLotAmount() >= 1) {
createPositionRequest.setSize(BigDecimal.valueOf(Math.abs(numberOfContracts/contractDetails.getLotAmount())));
} else {
createPositionRequest.setSize(BigDecimal.valueOf(Math.abs(numberOfContracts)));
}


createPositionRequest.setGuaranteedStop(false);
createPositionRequest.setForceOpen(true);

Expand All @@ -83,14 +95,19 @@ private CreateOTCPositionV2Request createPositionRequest(Epic epic, double numbe
createPositionRequest.setDirection(Direction.SELL);
}

//TODO: Check if we should really use scalingFactor here
if (stopDistance != 0) {
createPositionRequest.setStopDistance(BigDecimal.valueOf(stopDistance));
createPositionRequest.setStopDistance(BigDecimal.valueOf(stopDistance * contractDetails.getScalingFactor()));
}

LOG.info(">>> Creating long position epic={}, \ndirection={}, \nexpiry={}, \nsize={}, \norderType={}, \ncurrency={}, \nstop loss distance={}",
epic.getName(), createPositionRequest.getDirection(), createPositionRequest.getExpiry(),
createPositionRequest.getSize(), createPositionRequest.getOrderType(), createPositionRequest.getCurrencyCode(), stopDistance);
return createPositionRequest;
}

private Disposable indicateProgress() {
return Observable.interval(250, TimeUnit.MILLISECONDS, Schedulers.io())
.subscribe(x -> Zorro.callProgress(1));
}

}
9 changes: 7 additions & 2 deletions src/main/java/com/danlind/igz/brokerapi/BrokerHistory.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static com.danlind.igz.domain.types.Resolution.*;

Expand Down Expand Up @@ -53,16 +54,20 @@ public int getPriceHistory(final Epic epic,
}

public void getTimeZoneOffsetObservable() {
if (Objects.nonNull(timeZoneOffsetSubscription)) {
logger.debug("Disposing of existing time offset subscription");
timeZoneOffsetSubscription.dispose();
}
timeZoneOffsetSubscription = restApiAdapter.getTimeZoneOffset()
.subscribe(
timeZone -> {
logger.debug("Updating contract details");
logger.debug("Updating time zone offset");
accountZoneOffset = timeZone;
}
);
}

public void cancelSubscription() {
public void cancelTimeOffsetSubscription() {
timeZoneOffsetSubscription.dispose();
}

Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/danlind/igz/brokerapi/BrokerLogin.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

@Component
Expand Down Expand Up @@ -73,7 +74,6 @@ public int disconnect() {
return ZorroReturnValues.LOGOUT_OK.getValue();
}

//TODO: Handle "errorCode":"error.security.oauth-token-invalid", disconnect (and reconnect or let Zorro handle that)
private void refreshAccessToken(final ConversationContextV3 contextV3) {
logger.debug("Refreshing access token");
try {
Expand All @@ -87,6 +87,10 @@ private void refreshAccessToken(final ConversationContextV3 contextV3) {

//TOOD: What happens if an exception is thrown when attepting to refresh token? Is the observable cancelled?
private void startRefreshAccessTokenScheduler() {
if (Objects.nonNull(tokenSubscription)) {
logger.debug("Disposing of existing access token subscription");
tokenSubscription.dispose();
}
tokenSubscription = Observable.interval(pluginProperties.getRefreshTokenInterval(), TimeUnit.MILLISECONDS, Schedulers.io())
.doOnError(e -> logger.debug("Error when refreshing session token, retrying"))
.retryWhen(new RetryWithDelay(60, 5000))
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/com/danlind/igz/brokerapi/BrokerSell.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.danlind.igz.brokerapi;

import com.danlind.igz.Zorro;
import com.danlind.igz.adapter.RestApiAdapter;
import com.danlind.igz.config.ZorroReturnValues;
import com.danlind.igz.domain.ContractDetails;
Expand All @@ -12,6 +13,9 @@
import com.danlind.igz.ig.api.client.rest.dto.positions.otc.closeOTCPositionV1.OrderType;
import com.danlind.igz.misc.MarketDataProvider;
import com.danlind.igz.misc.RetryWithDelay;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import net.openhft.chronicle.map.ChronicleMap;
import org.apache.http.annotation.Contract;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -40,7 +44,6 @@ public BrokerSell(RestApiAdapter restApiAdapter, ChronicleMap<Integer, OrderDeta
this.marketDataProvider = marketDataProvider;
}

//TODO: Update to handle correct contract size
public int closePosition(final int nOrderId,
final int nAmount) {
OrderDetails orderDetails = orderReferenceMap.get(nOrderId);
Expand All @@ -51,7 +54,10 @@ public int closePosition(final int nOrderId,

LOG.info(">>> Closing size {} for position with dealId {}", lotSize, dealId.getValue() );

//Disposable progress = indicateProgress();

return restApiAdapter.closePosition(request)
.subscribeOn(Schedulers.io())
.doOnNext(dealReference -> LOG.debug("Got dealReference {} when attempting to close position with dealId {}", dealReference.getValue(), dealId.getValue()))
.delay(500, TimeUnit.MILLISECONDS)
.flatMap(dealReference -> restApiAdapter.getDealConfirmationObservable(dealReference.getValue())
Expand All @@ -65,7 +71,6 @@ public int closePosition(final int nOrderId,
private int closeConfirmationHandler(GetDealConfirmationV1Response dealConfirmationResponse, int nOrderId, int lotSize ) {
OrderDetails sellOrderDetails = orderReferenceMap.get(nOrderId);

//TODO: Need to log PositionStatus. Some trades are marked as partially closed, when they are in fact fully closed
LOG.debug("Position status is {}", dealConfirmationResponse.getStatus());
if (dealConfirmationResponse.getStatus() == PositionStatus.CLOSED) {
LOG.debug("Position with deal id {} now fully closed", sellOrderDetails.getDealId().getValue());
Expand Down Expand Up @@ -97,4 +102,10 @@ private CloseOTCPositionV1Request createClosePositionRequest(int lotSize, DealId
request.setOrderType(OrderType.MARKET);
return request;
}

private Disposable indicateProgress() {
return Observable.interval(250, TimeUnit.MILLISECONDS, Schedulers.io())
.subscribe(x -> Zorro.callProgress(1));
}

}
4 changes: 1 addition & 3 deletions src/main/java/com/danlind/igz/handler/HistoryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import com.danlind.igz.brokerapi.BrokerHistory;
import com.danlind.igz.domain.types.Epic;
import com.danlind.igz.ig.api.client.rest.dto.prices.getPricesV3.PricesItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -41,6 +39,6 @@ public void getTimeZoneOffsetObservable() {
}

public void cancelSubscription() {
brokerHistory.cancelSubscription();
brokerHistory.cancelTimeOffsetSubscription();
}
}
7 changes: 6 additions & 1 deletion src/main/java/com/danlind/igz/ig/api/client/RestAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,18 @@
import com.danlind.igz.ig.api.client.rest.dto.workingorders.otc.deleteOTCWorkingOrderV2.DeleteOTCWorkingOrderV2Response;
import com.danlind.igz.ig.api.client.rest.dto.workingorders.otc.updateOTCWorkingOrderV2.UpdateOTCWorkingOrderV2Request;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.StreamUtils;
import org.springframework.web.client.HttpClientErrorException;

import java.nio.charset.Charset;
import java.util.Arrays;

@Service
Expand Down Expand Up @@ -263,7 +267,8 @@ public CloseOTCPositionV1Response closeOTCPositionV1(ConversationContext convers
if (HttpStatus.OK.value() == httpResponse.getStatusLine().getStatusCode()) {
return objectMapper.readValue(httpResponse.getEntity().getContent(), CloseOTCPositionV1Response.class);
}
throw new RuntimeException("Delete failed: " + httpResponse.getStatusLine());
//throw new RuntimeException("Delete failed: " + httpResponse.getStatusLine());
throw new HttpClientErrorException(HttpStatus.valueOf(httpResponse.getStatusLine().getStatusCode()), httpResponse.getStatusLine().getReasonPhrase(), StreamUtils.copyToByteArray(httpResponse.getEntity().getContent()), Charset.defaultCharset());
} finally {
if (httpResponse != null) {
EntityUtils.consumeQuietly(httpResponse.getEntity());
Expand Down
27 changes: 20 additions & 7 deletions src/main/java/com/danlind/igz/misc/MarketDataProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import com.danlind.igz.domain.types.Epic;
import com.danlind.igz.ig.api.client.rest.dto.markets.getMarketDetailsV3.MarketStatus;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Objects;
import java.util.Set;

@Component
Expand All @@ -20,7 +22,7 @@ public class MarketDataProvider {
private final static Logger logger = LoggerFactory.getLogger(MarketDataProvider.class);
private final RestApiAdapter restApiAdapter;
private final HashMap<Epic, ContractDetails> contractDetailsMap = new HashMap<>();
private Disposable marketDetailsSubscription;
private HashMap<Epic, Disposable> marketDetailsSubscriptions = new HashMap<>();

@Autowired
public MarketDataProvider(RestApiAdapter restApiAdapter) {
Expand All @@ -42,8 +44,9 @@ public int isAnySubscribedEpicTradable() {
return contractDetailsMap.keySet().stream().mapToInt(epic -> isEpicTradable(epic)).max().orElse(1);
}

public void cancelSubscription() {
marketDetailsSubscription.dispose();
public void cancelSubscription(Epic epic) {
marketDetailsSubscriptions.get(epic).dispose();
marketDetailsSubscriptions.remove(epic);
}

public Set<Epic> getAllSubscribedEpics() {
Expand All @@ -54,17 +57,27 @@ public ContractDetails getContractDetails(Epic epic) {
return contractDetailsMap.get(epic);
}


public void updateMarketDetails(Epic epic) {
ContractDetails contractDetails = restApiAdapter.getContractDetailsBlocking(epic).blockingSingle();
contractDetailsMap.put(epic, contractDetails);

marketDetailsSubscription = restApiAdapter.getContractDetailsObservable(epic)
if (Objects.nonNull(marketDetailsSubscriptions.get(epic))) {
logger.debug("Disposing of existing market data subscription for epic {}", epic.getName());
marketDetailsSubscriptions.get(epic).dispose();
marketDetailsSubscriptions.remove(epic);
}

marketDetailsSubscriptions.put(epic,restApiAdapter.getContractDetailsObservable(epic)
.subscribeOn(Schedulers.io())
.retryWhen(new RetryWithDelay(5, 2000))
.subscribe(
updatedContractDetails -> {
logger.debug("Updating contract details");
logger.debug("Updating contract details for {}",updatedContractDetails.getEpic().getName());
contractDetailsMap.put(epic, updatedContractDetails);
}
);
},
e -> logger.error("Unexpected error when updating market details", e)
));


}
Expand Down
Loading

0 comments on commit 856b1f2

Please sign in to comment.