diff --git a/src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java b/src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java index 25815d00..a06f4213 100644 --- a/src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java +++ b/src/main/java/com/uid2/shared/vertx/CloudSyncVerticle.java @@ -232,10 +232,10 @@ private Future cloudRefresh() { Promise refreshPromise = Promise.promise(); this.isRefreshing = true; - vertx.executeBlocking(blockBromise -> { + vertx.executeBlocking(() -> { this.cloudRefreshEnsureInSync(refreshPromise, 0); - blockBromise.complete(); - }, ar -> {}); + return null; + }); return refreshPromise.future() .onComplete(v -> { @@ -320,12 +320,13 @@ private void handleUpload(Message msg) { this.pendingUpload.add(fileToUpload); } - this.uploadExecutor.executeBlocking( - promise -> this.cloudUploadBlocking(promise, msg.body()), - ar -> { - this.pendingUpload.remove(fileToUpload); - this.handleAsyncResult(ar); - msg.reply(ar.succeeded()); + this.uploadExecutor.executeBlocking(() -> { + this.cloudUploadBlocking(msg.body()); + return null; + }).onComplete(ar -> { + this.pendingUpload.remove(fileToUpload); + this.handleAsyncResult(ar); + msg.reply(ar.succeeded()); // increase counter if (ar.succeeded()) { @@ -338,16 +339,10 @@ private void handleUpload(Message msg) { }); } - private void cloudUploadBlocking(Promise promise, String fileToUpload) { - try { - String cloudPath = this.cloudSync.toCloudPath(fileToUpload); - try (InputStream localInput = this.localStorage.download(fileToUpload)) { - this.cloudStorage.upload(localInput, cloudPath); - } - promise.complete(); - } catch (Exception ex) { - LOGGER.error(ex.getMessage(), ex); - promise.fail(new Throwable(ex)); + private void cloudUploadBlocking(String fileToUpload) throws Exception { + String cloudPath = this.cloudSync.toCloudPath(fileToUpload); + try (InputStream localInput = this.localStorage.download(fileToUpload)) { + this.cloudStorage.upload(localInput, cloudPath); } } @@ -364,9 +359,10 @@ private Future cloudDownloadFile(String s3Path) { } Promise promise = Promise.promise(); - this.downloadExecutor.executeBlocking( - blockingPromise -> this.cloudDownloadBlocking(blockingPromise, s3Path), - ar -> { + this.downloadExecutor.executeBlocking(() -> { + this.cloudDownloadBlocking(s3Path); + return null; + }, false).onComplete(ar -> { this.pendingDownload.remove(s3Path); this.handleAsyncResult(ar); promise.complete(); @@ -385,7 +381,7 @@ private Future cloudDownloadFile(String s3Path) { return promise.future(); } - private void cloudDownloadBlocking(Promise promise, String s3Path) { + private void cloudDownloadBlocking(String s3Path) throws Exception { final long cloudDownloadStart = System.nanoTime(); try { String localPath = this.cloudSync.toLocalPath(s3Path); @@ -398,7 +394,6 @@ private void cloudDownloadBlocking(Promise promise, String s3Path) { downloadSuccessTimer.record(java.time.Duration.ofMillis(cloudDownloadTimeMs)); LOGGER.info("S3 download completed: {} in {} ms", cloudStorage.mask(s3Path), cloudDownloadTimeMs); } - promise.complete(); } catch (Exception ex) { final long cloudDownloadEnd = System.nanoTime(); final long cloudDownloadTimeMs = (cloudDownloadEnd - cloudDownloadStart) / 1_000_000; @@ -406,7 +401,7 @@ private void cloudDownloadBlocking(Promise promise, String s3Path) { downloadFailureTimer.record(java.time.Duration.ofMillis(cloudDownloadTimeMs)); // Be careful as the s3Path may contain the pre-signed S3 token, so do not log the whole path LOGGER.error("download error: " + ex.getClass().getSimpleName()); - promise.fail(new Throwable(ex)); + throw new CloudStorageException("Download failed"); } } diff --git a/src/main/java/com/uid2/shared/vertx/RotatingStoreVerticle.java b/src/main/java/com/uid2/shared/vertx/RotatingStoreVerticle.java index 23ce22f6..b71ad9f6 100644 --- a/src/main/java/com/uid2/shared/vertx/RotatingStoreVerticle.java +++ b/src/main/java/com/uid2/shared/vertx/RotatingStoreVerticle.java @@ -83,14 +83,10 @@ public void start(Promise startPromise) throws Exception { private void startRefresh(Promise promise) { LOGGER.info("Starting " + this.storeName + " loading"); final long startupRefreshStart = System.nanoTime(); - vertx.executeBlocking(p -> { - try { - this.refresh(); - p.complete(); - } catch (Exception e) { - p.fail(e); - } - }, ar -> { + vertx.executeBlocking(() -> { + this.refresh(); + return null; + }).onComplete(ar -> { final long startupRefreshEnd = System.nanoTime(); final long startupRefreshTimeMs = (startupRefreshEnd - startupRefreshStart) / 1000000; @@ -112,15 +108,10 @@ private void startBackgroundRefresh() { vertx.setPeriodic(this.refreshIntervalMs, (id) -> { final long start = System.nanoTime(); - vertx.executeBlocking(promise -> { - try { - this.refresh(); - promise.complete(); - } catch (Exception e) { - promise.fail(e); - } - }, - asyncResult -> { + vertx.executeBlocking(() -> { + this.refresh(); + return null; + }).onComplete(asyncResult -> { final long end = System.nanoTime(); final long elapsed = ((end - start) / 1000000); this.counterStoreRefreshTimeMs.increment(elapsed);