From aa74e02d2e7b0fd4e54c18ed81f56f31723dad88 Mon Sep 17 00:00:00 2001 From: jchrys Date: Tue, 30 May 2023 22:34:58 +0900 Subject: [PATCH] Fix request queue delaying (#119) Motivation: The request queue could contain cancelled tasks, which caused delays and stalls. Modification: Implemented Filtered out cancelled tasks from the queue. Result: The queue now functions better. Resolves issue #114 --- .../r2dbc/mysql/client/RequestQueue.java | 32 +++++++++---------- .../r2dbc/mysql/client/RequestTask.java | 31 ++++++++++++++---- .../mysql/ConnectionIntegrationTest.java | 10 ++++++ .../r2dbc/mysql/IntegrationTestSupport.java | 4 +++ 4 files changed, 55 insertions(+), 22 deletions(-) diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/RequestQueue.java b/src/main/java/io/asyncer/r2dbc/mysql/client/RequestQueue.java index c60df947e..ff27d44a3 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/RequestQueue.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/RequestQueue.java @@ -65,22 +65,29 @@ final class RequestQueue extends ActiveStatus implements Runnable { */ @Override public void run() { - RequestTask task = queue.poll(); - - if (task == null) { - // Queue was empty, set it to idle if it is not disposed. - STATUS_UPDATER.compareAndSet(this, ACTIVE, IDLE); - } else { - int status = this.status; - - if (status == DISPOSE) { + for (;;) { + RequestTask task = queue.poll(); + final int status = this.status; + + if (task == null) { + // Queue was empty, set it to idle if it is not disposed. + if (status != ACTIVE || STATUS_UPDATER.compareAndSet(this, ACTIVE, IDLE) && queue.isEmpty()) { + return; + } + } else if (status == DISPOSE) { // Cancel and no need clear queue because it should be cleared by other one. task.cancel(requireDisposed()); + return; } else { task.run(); + // The execution of a canceled task would result in a stall of the request queue. + // refer: https://github.com/asyncer-io/r2dbc-mysql/issues/114 + if (!task.isCancelled()) { + return; } } } +} /** * Submit an exchange task. If the queue is inactive, it will execute directly instead of queuing. @@ -90,13 +97,6 @@ public void run() { * @param the type argument of {@link RequestTask}. */ void submit(RequestTask task) { - if (STATUS_UPDATER.compareAndSet(this, IDLE, ACTIVE)) { - // Fast path for general way. - task.run(); - return; - } - - // Check dispose after fast path failed. int status = this.status; if (status == DISPOSE) { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java b/src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java index 09235e05f..e92e808c3 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java @@ -38,6 +38,8 @@ final class RequestTask { private final T supplier; + private volatile boolean isCancelled; + private RequestTask(@Nullable Disposable disposable, MonoSink sink, T supplier) { this.disposable = disposable; this.sink = sink; @@ -54,26 +56,43 @@ void run() { * @param e cancelled by which error */ void cancel(Throwable e) { + cancel0(); + sink.error(e); + } + + boolean isCancelled() { + return isCancelled; + } + + private void cancel0() { if (disposable != null) { disposable.dispose(); } - sink.error(e); + isCancelled = true; } static RequestTask wrap(ClientMessage message, MonoSink sink, T supplier) { + final RequestTask task; if (message instanceof Disposable) { - return new RequestTask<>((Disposable) message, sink, supplier); - } + task = new RequestTask<>((Disposable) message, sink, supplier); + } else { + task = new RequestTask<>(null, sink, supplier); - return new RequestTask<>(null, sink, supplier); + } + sink.onCancel(() -> task.cancel0()); + return task; } static RequestTask wrap(Flux messages, MonoSink sink, T supplier) { - return new RequestTask<>(new DisposableFlux(messages), sink, supplier); + final RequestTask task = new RequestTask<>(new DisposableFlux(messages), sink, supplier); + sink.onCancel(() -> task.cancel0()); + return task; } static RequestTask wrap(MonoSink sink, T supplier) { - return new RequestTask<>(null, sink, supplier); + final RequestTask task = new RequestTask<>(null, sink, supplier); + sink.onCancel(() -> task.cancel0()); + return task; } private static final class DisposableFlux implements Disposable { diff --git a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java index ee6f93116..3e4ca117a 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java @@ -233,6 +233,16 @@ void setTransactionIsolationLevel() { .doOnNext(a -> a.isEqualTo(connection.getTransactionIsolationLevel())))); } + @Test + void errorPropagteRequestQueue() { + illegalArgument(connection -> Flux.merge( + connection.createStatement("SELECT 'Result 1', SLEEP(1)").execute(), + connection.createStatement("SELECT 'Result 2'").execute(), + connection.createStatement("SELECT 'Result 3'").execute() + ).flatMap(result -> result.map((row, meta) -> row.get(0, Integer.class))) + ); + } + @Test void batchCrud() { // TODO: spilt it to multiple test cases and move it to BatchIntegrationTest diff --git a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java index 68110c065..521796c4c 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java @@ -50,6 +50,10 @@ void badGrammar(Function> runner) { process(runner).verifyError(R2dbcBadGrammarException.class); } + void illegalArgument(Function> runner) { + process(runner).expectError(IllegalArgumentException.class).verify(Duration.ofSeconds(3)); + } + Mono create() { return connectionFactory.create(); }