Skip to content

Commit

Permalink
feat: add exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Angular2Guy committed Apr 1, 2024
1 parent a8deb9e commit 7ced395
Showing 1 changed file with 95 additions and 75 deletions.
170 changes: 95 additions & 75 deletions backend/src/main/java/ch/xxx/trader/adapter/cron/ScheduledTask.java
Expand Up @@ -104,26 +104,31 @@ private void insertBsQuote(String currPair) {
this.disposeClient(currPair);
LocalTime start = LocalTime.now();
final AtomicBoolean exceptionLogged = new AtomicBoolean(false);
Mono<QuoteBs> request = this.webClient.get()
.uri(String.format("%s/v2/ticker/%s/", ScheduledTask.URLBS, currPair))
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBs.class))
.map(res -> {
res.setPair(currPair);
Disposable subscribe = null;
try {
Mono<QuoteBs> request = this.webClient.get()
.uri(String.format("%s/v2/ticker/%s/", ScheduledTask.URLBS, currPair))
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBs.class))
.map(res -> {
res.setPair(currPair);
// log.info(res.toString());
return res;
}).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> {
exceptionLogged.set(this.logRequestFailed("Bitstamp", currPair, start, ex));
return Mono.empty();
}).subscribeOn(this.mongoImportScheduler);
Disposable subscribe = request.flatMap(myQuote -> this.bitstampService.insertQuote(Mono.just(myQuote))
.timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> {
if (!exceptionLogged.get()) {
LOG.warn(String.format("Bitstamp data store failed for: %s", currPair), ex);
}
return Mono.empty();
})).subscribeOn(this.mongoImportScheduler).subscribe(x -> this.logDuration("Bitstamp", currPair, start),
err -> LOG.warn(String.format("Bitstamp data import failed for: %s", currPair), err));
this.disposables.put(currPair, Optional.of(subscribe));
return res;
}).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> {
exceptionLogged.set(this.logRequestFailed("Bitstamp", currPair, start, ex));
return Mono.empty();
}).subscribeOn(this.mongoImportScheduler);
subscribe = request.flatMap(myQuote -> this.bitstampService.insertQuote(Mono.just(myQuote))
.timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> {
if (!exceptionLogged.get()) {
LOG.warn(String.format("Bitstamp data store failed for: %s", currPair), ex);
}
return Mono.empty();
})).subscribeOn(this.mongoImportScheduler)
.subscribe(x -> this.logDuration("Bitstamp", currPair, start),
err -> LOG.warn(String.format("Bitstamp data import failed for: %s", currPair), err));
} finally {
this.disposables.put(currPair, Optional.ofNullable(subscribe));
}
}

private void logDuration(String source, String currPair, LocalTime start) {
Expand Down Expand Up @@ -171,29 +176,34 @@ public void insertCoinbaseQuote() {
this.disposeClient(currPair);
LocalTime start = LocalTime.now();
final AtomicBoolean exceptionLogged = new AtomicBoolean(false);
Mono<QuoteCb> request = this.webClient.get().uri(ScheduledTask.URLCB + "/exchange-rates?currency=BTC")
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> {
return response.bodyToMono(WrapperCb.class);
Disposable subscribe = null;
try {
Mono<QuoteCb> request = this.webClient.get().uri(ScheduledTask.URLCB + "/exchange-rates?currency=BTC")
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> {
return response.bodyToMono(WrapperCb.class);
// return response.bodyToMono(String.class);
// }).flatMap(value -> {
// // log.info(value);
// return Mono.just(this.messageMapper.mapJsonToObject(value, WrapperCb.class));
}).flatMap(resp -> Mono.just(resp.getData())).flatMap(resp2 -> {
}).flatMap(resp -> Mono.just(resp.getData())).flatMap(resp2 -> {
// log.info(resp2.getRates().toString());
return Mono.just(resp2.getRates());
}).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> {
exceptionLogged.set(this.logRequestFailed("Coinbase", currPair, start, ex));
return Mono.empty();
}).subscribeOn(this.mongoImportScheduler);
Disposable subscribe = request.flatMap(myQuote -> this.coinbaseService.insertQuote(Mono.just(myQuote))
.timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> {
if (!exceptionLogged.get()) {
LOG.warn("Coinbase data store failed", ex);
}
return Mono.empty();
})).subscribeOn(this.mongoImportScheduler).subscribe(x -> this.logDuration("Coinbase", currPair, start),
err -> LOG.warn("Coinbase data import failed.", err));
this.disposables.put(currPair, Optional.of(subscribe));
return Mono.just(resp2.getRates());
}).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> {
exceptionLogged.set(this.logRequestFailed("Coinbase", currPair, start, ex));
return Mono.empty();
}).subscribeOn(this.mongoImportScheduler);
subscribe = request.flatMap(myQuote -> this.coinbaseService.insertQuote(Mono.just(myQuote))
.timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> {
if (!exceptionLogged.get()) {
LOG.warn("Coinbase data store failed", ex);
}
return Mono.empty();
})).subscribeOn(this.mongoImportScheduler)
.subscribe(x -> this.logDuration("Coinbase", currPair, start),
err -> LOG.warn("Coinbase data import failed.", err));
} finally {
this.disposables.put(currPair, Optional.ofNullable(subscribe));
}
}

@Async("clientTaskExecutor")
Expand All @@ -205,25 +215,30 @@ public void insertItbitUsdQuote() {
this.disposeClient(currPair);
LocalTime start = LocalTime.now();
final AtomicBoolean exceptionLogged = new AtomicBoolean(false);
Mono<QuoteIb> request = this.webClient.get()
.uri(String.format("%s/markets/%s/ticker", ScheduledTask.URLPA, currPair))
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(PaxosQuote.class))
.map(res -> {
Disposable subscribe = null;
try {
Mono<QuoteIb> request = this.webClient.get()
.uri(String.format("%s/markets/%s/ticker", ScheduledTask.URLPA, currPair))
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono(response -> response.bodyToMono(PaxosQuote.class)).map(res -> {
// log.info(res.toString());
return res;
}).map(paxosQuote -> this.convert(paxosQuote)).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> {
exceptionLogged.set(this.logRequestFailed("Ibit", currPair, start, ex));
return Mono.empty();
}).subscribeOn(this.mongoImportScheduler);
Disposable subscribe = request.flatMap(myQuote -> this.itbitService.insertQuote(Mono.just(myQuote))
.timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> {
if (!exceptionLogged.get()) {
LOG.warn(String.format("Itbit data store failed for: %s", currPair), ex);
}
return Mono.empty();
})).subscribeOn(this.mongoImportScheduler).subscribe(x -> this.logDuration("Itbit", currPair, start),
err -> LOG.warn(String.format("Itbit data import failed for: %s", currPair), err));
this.disposables.put(currPair, Optional.of(subscribe));
return res;
}).map(paxosQuote -> this.convert(paxosQuote)).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> {
exceptionLogged.set(this.logRequestFailed("Ibit", currPair, start, ex));
return Mono.empty();
}).subscribeOn(this.mongoImportScheduler);
subscribe = request.flatMap(myQuote -> this.itbitService.insertQuote(Mono.just(myQuote))
.timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> {
if (!exceptionLogged.get()) {
LOG.warn(String.format("Itbit data store failed for: %s", currPair), ex);
}
return Mono.empty();
})).subscribeOn(this.mongoImportScheduler)
.subscribe(x -> this.logDuration("Itbit", currPair, start),
err -> LOG.warn(String.format("Itbit data import failed for: %s", currPair), err));
} finally {
this.disposables.put(currPair, Optional.ofNullable(subscribe));
}
}

QuoteIb convert(PaxosQuote paxosQuote) {
Expand Down Expand Up @@ -292,27 +307,32 @@ private void insertBfQuote(String currPair) {
this.disposeClient(currPair);
LocalTime start = LocalTime.now();
final AtomicBoolean exceptionLogged = new AtomicBoolean(false);
Mono<QuoteBf> request = this.webClient.get()
.uri(String.format("%s/v1/pubticker/%s", ScheduledTask.URLBF, currPair))
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBf.class))
.map(res -> {
res.setPair(currPair);
QuoteBf result = checkBfTimestamp(res);
Disposable subscribe = null;
try {
Mono<QuoteBf> request = this.webClient.get()
.uri(String.format("%s/v1/pubticker/%s", ScheduledTask.URLBF, currPair))
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBf.class))
.map(res -> {
res.setPair(currPair);
QuoteBf result = checkBfTimestamp(res);
// log.info(res.toString());
return result;
}).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> {
exceptionLogged.set(this.logRequestFailed("Bitfinex", currPair, start, ex));
return Mono.empty();
}).subscribeOn(this.mongoImportScheduler);
Disposable subscribe = request.flatMap(myQuote -> this.bitfinexService.insertQuote(Mono.just(myQuote))
.timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> {
if (!exceptionLogged.get()) {
LOG.warn(String.format("Bitfinex data store failed for: %s", currPair), ex);
}
return Mono.empty();
})).subscribeOn(this.mongoImportScheduler).subscribe(x -> this.logDuration("Bitfinex", currPair, start),
err -> LOG.warn(String.format("Bitfinex data import failed for: %s", currPair), err));
this.disposables.put(currPair, Optional.of(subscribe));
return result;
}).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> {
exceptionLogged.set(this.logRequestFailed("Bitfinex", currPair, start, ex));
return Mono.empty();
}).subscribeOn(this.mongoImportScheduler);
subscribe = request.flatMap(myQuote -> this.bitfinexService.insertQuote(Mono.just(myQuote))
.timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> {
if (!exceptionLogged.get()) {
LOG.warn(String.format("Bitfinex data store failed for: %s", currPair), ex);
}
return Mono.empty();
})).subscribeOn(this.mongoImportScheduler)
.subscribe(x -> this.logDuration("Bitfinex", currPair, start),
err -> LOG.warn(String.format("Bitfinex data import failed for: %s", currPair), err));
} finally {
this.disposables.put(currPair, Optional.ofNullable(subscribe));
}
}

private QuoteBf checkBfTimestamp(QuoteBf res) {
Expand Down

0 comments on commit 7ced395

Please sign in to comment.