diff --git a/backend/src/main/java/ch/xxx/trader/adapter/cron/ScheduledTask.java b/backend/src/main/java/ch/xxx/trader/adapter/cron/ScheduledTask.java index 5c4e0f3..5a5cd68 100644 --- a/backend/src/main/java/ch/xxx/trader/adapter/cron/ScheduledTask.java +++ b/backend/src/main/java/ch/xxx/trader/adapter/cron/ScheduledTask.java @@ -104,26 +104,31 @@ private void insertBsQuote(String currPair) { this.disposeClient(currPair); LocalTime start = LocalTime.now(); final AtomicBoolean exceptionLogged = new AtomicBoolean(false); - Mono request = this.webClient.get() - .uri(String.format("%s/v2/ticker/%s/", ScheduledTask.URLBS, currPair)) - .accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBs.class)) - .map(res -> { - res.setPair(currPair); + Disposable subscribe = null; + try { + Mono request = this.webClient.get() + .uri(String.format("%s/v2/ticker/%s/", ScheduledTask.URLBS, currPair)) + .accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBs.class)) + .map(res -> { + res.setPair(currPair); // log.info(res.toString()); - return res; - }).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> { - exceptionLogged.set(this.logRequestFailed("Bitstamp", currPair, start, ex)); - return Mono.empty(); - }).subscribeOn(this.mongoImportScheduler); - Disposable subscribe = request.flatMap(myQuote -> this.bitstampService.insertQuote(Mono.just(myQuote)) - .timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> { - if (!exceptionLogged.get()) { - LOG.warn(String.format("Bitstamp data store failed for: %s", currPair), ex); - } - return Mono.empty(); - })).subscribeOn(this.mongoImportScheduler).subscribe(x -> this.logDuration("Bitstamp", currPair, start), - err -> LOG.warn(String.format("Bitstamp data import failed for: %s", currPair), err)); - this.disposables.put(currPair, Optional.of(subscribe)); + return res; + }).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> { + exceptionLogged.set(this.logRequestFailed("Bitstamp", currPair, start, ex)); + return Mono.empty(); + }).subscribeOn(this.mongoImportScheduler); + subscribe = request.flatMap(myQuote -> this.bitstampService.insertQuote(Mono.just(myQuote)) + .timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> { + if (!exceptionLogged.get()) { + LOG.warn(String.format("Bitstamp data store failed for: %s", currPair), ex); + } + return Mono.empty(); + })).subscribeOn(this.mongoImportScheduler) + .subscribe(x -> this.logDuration("Bitstamp", currPair, start), + err -> LOG.warn(String.format("Bitstamp data import failed for: %s", currPair), err)); + } finally { + this.disposables.put(currPair, Optional.ofNullable(subscribe)); + } } private void logDuration(String source, String currPair, LocalTime start) { @@ -171,29 +176,34 @@ public void insertCoinbaseQuote() { this.disposeClient(currPair); LocalTime start = LocalTime.now(); final AtomicBoolean exceptionLogged = new AtomicBoolean(false); - Mono request = this.webClient.get().uri(ScheduledTask.URLCB + "/exchange-rates?currency=BTC") - .accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> { - return response.bodyToMono(WrapperCb.class); + Disposable subscribe = null; + try { + Mono request = this.webClient.get().uri(ScheduledTask.URLCB + "/exchange-rates?currency=BTC") + .accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> { + return response.bodyToMono(WrapperCb.class); // return response.bodyToMono(String.class); // }).flatMap(value -> { // // log.info(value); // return Mono.just(this.messageMapper.mapJsonToObject(value, WrapperCb.class)); - }).flatMap(resp -> Mono.just(resp.getData())).flatMap(resp2 -> { + }).flatMap(resp -> Mono.just(resp.getData())).flatMap(resp2 -> { // log.info(resp2.getRates().toString()); - return Mono.just(resp2.getRates()); - }).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> { - exceptionLogged.set(this.logRequestFailed("Coinbase", currPair, start, ex)); - return Mono.empty(); - }).subscribeOn(this.mongoImportScheduler); - Disposable subscribe = request.flatMap(myQuote -> this.coinbaseService.insertQuote(Mono.just(myQuote)) - .timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> { - if (!exceptionLogged.get()) { - LOG.warn("Coinbase data store failed", ex); - } - return Mono.empty(); - })).subscribeOn(this.mongoImportScheduler).subscribe(x -> this.logDuration("Coinbase", currPair, start), - err -> LOG.warn("Coinbase data import failed.", err)); - this.disposables.put(currPair, Optional.of(subscribe)); + return Mono.just(resp2.getRates()); + }).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> { + exceptionLogged.set(this.logRequestFailed("Coinbase", currPair, start, ex)); + return Mono.empty(); + }).subscribeOn(this.mongoImportScheduler); + subscribe = request.flatMap(myQuote -> this.coinbaseService.insertQuote(Mono.just(myQuote)) + .timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> { + if (!exceptionLogged.get()) { + LOG.warn("Coinbase data store failed", ex); + } + return Mono.empty(); + })).subscribeOn(this.mongoImportScheduler) + .subscribe(x -> this.logDuration("Coinbase", currPair, start), + err -> LOG.warn("Coinbase data import failed.", err)); + } finally { + this.disposables.put(currPair, Optional.ofNullable(subscribe)); + } } @Async("clientTaskExecutor") @@ -205,25 +215,30 @@ public void insertItbitUsdQuote() { this.disposeClient(currPair); LocalTime start = LocalTime.now(); final AtomicBoolean exceptionLogged = new AtomicBoolean(false); - Mono request = this.webClient.get() - .uri(String.format("%s/markets/%s/ticker", ScheduledTask.URLPA, currPair)) - .accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(PaxosQuote.class)) - .map(res -> { + Disposable subscribe = null; + try { + Mono request = this.webClient.get() + .uri(String.format("%s/markets/%s/ticker", ScheduledTask.URLPA, currPair)) + .accept(MediaType.APPLICATION_JSON) + .exchangeToMono(response -> response.bodyToMono(PaxosQuote.class)).map(res -> { // log.info(res.toString()); - return res; - }).map(paxosQuote -> this.convert(paxosQuote)).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> { - exceptionLogged.set(this.logRequestFailed("Ibit", currPair, start, ex)); - return Mono.empty(); - }).subscribeOn(this.mongoImportScheduler); - Disposable subscribe = request.flatMap(myQuote -> this.itbitService.insertQuote(Mono.just(myQuote)) - .timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> { - if (!exceptionLogged.get()) { - LOG.warn(String.format("Itbit data store failed for: %s", currPair), ex); - } - return Mono.empty(); - })).subscribeOn(this.mongoImportScheduler).subscribe(x -> this.logDuration("Itbit", currPair, start), - err -> LOG.warn(String.format("Itbit data import failed for: %s", currPair), err)); - this.disposables.put(currPair, Optional.of(subscribe)); + return res; + }).map(paxosQuote -> this.convert(paxosQuote)).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> { + exceptionLogged.set(this.logRequestFailed("Ibit", currPair, start, ex)); + return Mono.empty(); + }).subscribeOn(this.mongoImportScheduler); + subscribe = request.flatMap(myQuote -> this.itbitService.insertQuote(Mono.just(myQuote)) + .timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> { + if (!exceptionLogged.get()) { + LOG.warn(String.format("Itbit data store failed for: %s", currPair), ex); + } + return Mono.empty(); + })).subscribeOn(this.mongoImportScheduler) + .subscribe(x -> this.logDuration("Itbit", currPair, start), + err -> LOG.warn(String.format("Itbit data import failed for: %s", currPair), err)); + } finally { + this.disposables.put(currPair, Optional.ofNullable(subscribe)); + } } QuoteIb convert(PaxosQuote paxosQuote) { @@ -292,27 +307,32 @@ private void insertBfQuote(String currPair) { this.disposeClient(currPair); LocalTime start = LocalTime.now(); final AtomicBoolean exceptionLogged = new AtomicBoolean(false); - Mono request = this.webClient.get() - .uri(String.format("%s/v1/pubticker/%s", ScheduledTask.URLBF, currPair)) - .accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBf.class)) - .map(res -> { - res.setPair(currPair); - QuoteBf result = checkBfTimestamp(res); + Disposable subscribe = null; + try { + Mono request = this.webClient.get() + .uri(String.format("%s/v1/pubticker/%s", ScheduledTask.URLBF, currPair)) + .accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBf.class)) + .map(res -> { + res.setPair(currPair); + QuoteBf result = checkBfTimestamp(res); // log.info(res.toString()); - return result; - }).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> { - exceptionLogged.set(this.logRequestFailed("Bitfinex", currPair, start, ex)); - return Mono.empty(); - }).subscribeOn(this.mongoImportScheduler); - Disposable subscribe = request.flatMap(myQuote -> this.bitfinexService.insertQuote(Mono.just(myQuote)) - .timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> { - if (!exceptionLogged.get()) { - LOG.warn(String.format("Bitfinex data store failed for: %s", currPair), ex); - } - return Mono.empty(); - })).subscribeOn(this.mongoImportScheduler).subscribe(x -> this.logDuration("Bitfinex", currPair, start), - err -> LOG.warn(String.format("Bitfinex data import failed for: %s", currPair), err)); - this.disposables.put(currPair, Optional.of(subscribe)); + return result; + }).timeout(Duration.ofSeconds(5L)).onErrorResume(ex -> { + exceptionLogged.set(this.logRequestFailed("Bitfinex", currPair, start, ex)); + return Mono.empty(); + }).subscribeOn(this.mongoImportScheduler); + subscribe = request.flatMap(myQuote -> this.bitfinexService.insertQuote(Mono.just(myQuote)) + .timeout(Duration.ofSeconds(6L)).subscribeOn(this.mongoImportScheduler).onErrorResume(ex -> { + if (!exceptionLogged.get()) { + LOG.warn(String.format("Bitfinex data store failed for: %s", currPair), ex); + } + return Mono.empty(); + })).subscribeOn(this.mongoImportScheduler) + .subscribe(x -> this.logDuration("Bitfinex", currPair, start), + err -> LOG.warn(String.format("Bitfinex data import failed for: %s", currPair), err)); + } finally { + this.disposables.put(currPair, Optional.ofNullable(subscribe)); + } } private QuoteBf checkBfTimestamp(QuoteBf res) {