Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Request queue is full #140

Closed
pkgonan opened this issue Jul 28, 2023 · 3 comments
Closed

[Bug] Request queue is full #140

pkgonan opened this issue Jul 28, 2023 · 3 comments
Labels
question Further information is requested

Comments

@pkgonan
Copy link

pkgonan commented Jul 28, 2023

Describe the bug
An asynchronous call was used in a Kotlin Coroutine and a Request queue is full error was detected.

How can i solve this problem ?

Should we increase the settings below?

-Dreactor.bufferSize.small=4096

Error Log
{"timestamp":"2023-07-28T08:01:14.679Z","level":"ERROR","thread":"reactor-tcp-nio-4","mdc":{"trace_id":"0f649e308d0dd6030daffe80bf7cd9fd","trace_flags":"01","span_id":"714f910c35cf72c5"},"logger":"com.hpcnt.paymentplatform.advice.ExceptionHandler$Companion","message":"Internal Server Error : java.lang.IllegalStateException: Request queue is full\n\tat io.asyncer.r2dbc.mysql.client.RequestQueue.submit(RequestQueue.java:110)\n\tSuppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: \nError has been observed at the following site(s):\n\t*__checkpoint ⇢ SQL \"INSERT INTO gateway_identifier_payment_context (application_id, type, gateway_id, payment_id, created_at) VALUES (?, ?, ?, ?, ?)\" [DatabaseClient]\n\t*__checkpoint ⇢ Handler com.hpcnt.paymentplatform.gatewayevent.GooglePlayClientEventController#create(UUID, GooglePlayClientEventRequest, Continuation) [DispatcherHandler]\nOriginal Stack Trace:\n\t\tat io.asyncer.r2dbc.mysql.client.RequestQueue.submit(RequestQueue.java:110)\n\t\tat io.asyncer.r2dbc.mysql.client.ReactorNettyClient.lambda$exchange$9(ReactorNettyClient.java:189)\n\t\tat reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:58)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)\n\t\tat reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)\n\t\tat reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8773)\n\t\tat reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.onNext(FluxUsingWhen.java:195)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:291)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)\n\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2341)\n\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\t\tat reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4495)\n\t\tat reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:104)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:291)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)\n\t\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)\n\t\tat reactor.core.publisher.StrictSubscriber.onSubscribe(StrictSubscriber.java:77)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4495)\n\t\tat kotlinx.coroutines.reactive.AwaitKt.awaitOne(Await.kt:190)\n\t\tat kotlinx.coroutines.reactive.AwaitKt.awaitOne$default(Await.kt:183)\n\t\tat kotlinx.coroutines.reactive.AwaitKt.awaitSingle(Await.kt:81)\n\t\tat org.springframework.data.r2dbc.core.ReactiveInsertOperationExtensionsKt.usingAndAwait(ReactiveInsertOperationExtensions.kt:38)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2$1.invokeSuspend(GatewayIdentifierPaymentContextRepositoryImpl.kt:40)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2$1.invoke(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2$1.invoke(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat com.hpcnt.paymentplatform.mysql.config.R2dbcExceptionHandler.handle(R2dbcExceptionHandler.kt:11)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2.invokeSuspend(GatewayIdentifierPaymentContextRepositoryImpl.kt:38)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2.invoke(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2.invoke(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:89)\n\t\tat kotlinx.coroutines.BuildersKt__Builders_commonKt.withContext(Builders.common.kt:169)\n\t\tat kotlinx.coroutines.BuildersKt.withContext(Unknown Source)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl.insert$suspendImpl(GatewayIdentifierPaymentContextRepositoryImpl.kt:37)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl.insert(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\t\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\t\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\t\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\t\tat org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)\n\t\tat org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)\n\t\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)\n\t\tat org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:756)\n\t\tat org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)\n\t\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)\n\t\tat org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:756)\n\t\tat org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$$SpringCGLIB$$0.insert(<generated>)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContext.create(GatewayIdentifierPaymentContext.kt:50)\n\t\tat com.hpcnt.paymentplatform.gatewayevent.AbstractClientEventHandler.create(AbstractClientEventHandler.kt:63)\n\t\tat com.hpcnt.paymentplatform.gatewayevent.GooglePlayClientEventHandler.createGatewayIdentifierPaymentContext(GooglePlayClientEventHandler.kt:171)\n\t\tat com.hpcnt.paymentplatform.gatewayevent.GooglePlayClientEventHandler.access$createGatewayIdentifierPaymentContext(GooglePlayClientEventHandler.kt:36)\n\t\tat com.hpcnt.paymentplatform.gatewayevent.GooglePlayClientEventHandler$handle$2$deferredCreateGatewayIdentifierPaymentContext$1.invokeSuspend(GooglePlayClientEventHandler.kt:70)\n\t\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)\n\t\tat kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)\n\t\tat kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)\n\t\tat kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)\n\t\tat kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)\n\t\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)\n\t\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)\n\t\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)\n","context":"default","exception":"java.lang.IllegalStateException: Request queue is full\n\tat io.asyncer.r2dbc.mysql.client.RequestQueue.submit(RequestQueue.java:110)\n\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nError has been observed at the following site(s):\n\t*__checkpoint ⇢ SQL \"INSERT INTO gateway_identifier_payment_context (application_id, type, gateway_id, payment_id, created_at) VALUES (?, ?, ?, ?, ?)\" [DatabaseClient]\n\t*__checkpoint ⇢ Handler com.hpcnt.paymentplatform.gatewayevent.GooglePlayClientEventController#create(UUID, GooglePlayClientEventRequest, Continuation) [DispatcherHandler]\nOriginal Stack Trace:\n\t\tat io.asyncer.r2dbc.mysql.client.RequestQueue.submit(RequestQueue.java:110)\n\t\tat io.asyncer.r2dbc.mysql.client.ReactorNettyClient.lambda$exchange$9(ReactorNettyClient.java:189)\n\t\tat reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:58)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)\n\t\tat reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)\n\t\tat reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8773)\n\t\tat reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.onNext(FluxUsingWhen.java:195)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:291)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onNext$1(TracingSubscriber.java:62)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:62)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)\n\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2341)\n\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\t\tat reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4495)\n\t\tat reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:104)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:291)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)\n\t\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)\n\t\tat reactor.core.publisher.StrictSubscriber.onSubscribe(StrictSubscriber.java:77)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)\n\t\tat io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)\n\t\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4495)\n\t\tat kotlinx.coroutines.reactive.AwaitKt.awaitOne(Await.kt:190)\n\t\tat kotlinx.coroutines.reactive.AwaitKt.awaitOne$default(Await.kt:183)\n\t\tat kotlinx.coroutines.reactive.AwaitKt.awaitSingle(Await.kt:81)\n\t\tat org.springframework.data.r2dbc.core.ReactiveInsertOperationExtensionsKt.usingAndAwait(ReactiveInsertOperationExtensions.kt:38)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2$1.invokeSuspend(GatewayIdentifierPaymentContextRepositoryImpl.kt:40)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2$1.invoke(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2$1.invoke(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat com.hpcnt.paymentplatform.mysql.config.R2dbcExceptionHandler.handle(R2dbcExceptionHandler.kt:11)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2.invokeSuspend(GatewayIdentifierPaymentContextRepositoryImpl.kt:38)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2.invoke(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$insert$2.invoke(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:89)\n\t\tat kotlinx.coroutines.BuildersKt__Builders_commonKt.withContext(Builders.common.kt:169)\n\t\tat kotlinx.coroutines.BuildersKt.withContext(Unknown Source)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl.insert$suspendImpl(GatewayIdentifierPaymentContextRepositoryImpl.kt:37)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl.insert(GatewayIdentifierPaymentContextRepositoryImpl.kt)\n\t\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\t\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\t\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\t\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\t\tat org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)\n\t\tat org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)\n\t\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)\n\t\tat org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:756)\n\t\tat org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)\n\t\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)\n\t\tat org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:756)\n\t\tat org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContextRepositoryImpl$$SpringCGLIB$$0.insert(<generated>)\n\t\tat com.hpcnt.paymentplatform.gatewayidentifierpaymentcontext.GatewayIdentifierPaymentContext.create(GatewayIdentifierPaymentContext.kt:50)\n\t\tat com.hpcnt.paymentplatform.gatewayevent.AbstractClientEventHandler.create(AbstractClientEventHandler.kt:63)\n\t\tat com.hpcnt.paymentplatform.gatewayevent.GooglePlayClientEventHandler.createGatewayIdentifierPaymentContext(GooglePlayClientEventHandler.kt:171)\n\t\tat com.hpcnt.paymentplatform.gatewayevent.GooglePlayClientEventHandler.access$createGatewayIdentifierPaymentContext(GooglePlayClientEventHandler.kt:36)\n\t\tat com.hpcnt.paymentplatform.gatewayevent.GooglePlayClientEventHandler$handle$2$deferredCreateGatewayIdentifierPaymentContext$1.invokeSuspend(GooglePlayClientEventHandler.kt:70)\n\t\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)\n\t\tat kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)\n\t\tat kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)\n\t\tat kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)\n\t\tat kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)\n\t\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)\n\t\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)\n\t\tat

@jchrys
Copy link
Collaborator

jchrys commented Jul 28, 2023

Hello, @pkgonan.
I understand your concern about adjusting the queue size using the -Dreactor.bufferSize.small option. It will increase queue size. However, I agree with you that merely changing the queue size might not be the most effective solution, especially in high-traffic environments.

To address this problem, I recommend ensuring that you have properly configured r2dbc-pool. It plays a crucial role in managing the connection pool and can significantly impact performance. I suggest checking the r2dbc-pool related issue (r2dbc/r2dbc-pool#190 (comment)) as it might provide valuable insights into resolving the "Request queue is full" error.(Please ensure that you are utilizing LoopResource and setting initialSize to be different from maxSize.)

Efficiently managing connections, returning them to the connection pool when they are no longer in use, and acquiring new connections when necessary can enhance the overall performance. It becomes even more crucial since each connection object has its dedicated request queue.

Please don't hesitate to ask if you have any other concerns.
Thanks.

@pkgonan
Copy link
Author

pkgonan commented Jul 29, 2023

Hi, @jchrys .
Thank you for your reply.

I checked that r2dbc-pool 1.0.1 changed to use Schedulers.single. Is changing r2dbc-pool from 1.0.0 -> 1.0.1 to solve Request queue is full a solution to another problem?

And i did not set Custom LoopResource. Is it possible to solve Request queue is full problem without setting Custom LoopResource?

My R2DBC Configuration

@Configuration
internal class R2dbcConfiguration internal constructor(
    @Value("\${payment-platform.r2dbc.host}")
    private val host: String,
    @Value("\${payment-platform.r2dbc.port}")
    private val port: Int,
    @Value("\${payment-platform.r2dbc.username}")
    private val username: String,
    @Value("\${payment-platform.r2dbc.password}")
    private val password: String,
    @Value("\${payment-platform.r2dbc.database}")
    private val database: String,
): AbstractR2dbcConfiguration() {

    companion object {
        private const val CONNECTION_POOL_INITIAL_SIZE = 20
        private const val CONNECTION_POOL_MAX_SIZE = CONNECTION_POOL_INITIAL_SIZE
        private val CONNECTION_POOL_MAX_LIFE_TIME = Duration.ofSeconds(10)
        private const val VALIDATION_QUERY = "SELECT 1"
    }

    @Bean
    override fun connectionFactory(): ConnectionFactory {
        return ConnectionFactories.get(
            ConnectionFactoryOptions.builder()
                .option(ConnectionFactoryOptions.SSL, false)
                .option(ConnectionFactoryOptions.HOST, host)
                .option(ConnectionFactoryOptions.PORT, port)
                .option(ConnectionFactoryOptions.USER, username)
                .option(ConnectionFactoryOptions.PASSWORD, password)
                .option(ConnectionFactoryOptions.DATABASE, database)
                .option(ConnectionFactoryOptions.DRIVER, PoolingConnectionFactoryProvider.POOLING_DRIVER)
                .option(ConnectionFactoryOptions.PROTOCOL, MySqlConnectionFactoryProvider.MYSQL_DRIVER)
                .option(PoolingConnectionFactoryProvider.INITIAL_SIZE, CONNECTION_POOL_INITIAL_SIZE)
                .option(PoolingConnectionFactoryProvider.MAX_SIZE, CONNECTION_POOL_MAX_SIZE)
                .option(PoolingConnectionFactoryProvider.MAX_LIFE_TIME, CONNECTION_POOL_MAX_LIFE_TIME)
                .option(PoolingConnectionFactoryProvider.VALIDATION_DEPTH, ValidationDepth.LOCAL)
                .option(PoolingConnectionFactoryProvider.VALIDATION_QUERY, VALIDATION_QUERY)
                .build()
        )
    }
}

@jchrys
Copy link
Collaborator

jchrys commented Jul 30, 2023

Yes, it is possible to address the 'Request queue is full' problem without setting a Custom LoopResource.
(The request queue serves as a buffer to temporarily store incoming client requests while the server processes ongoing requests.)

To optimize the request queue and avoid it from becoming full, several steps can be taken. These steps involve optimizing the MySQL server configurations, fine-tuning tables and queries, and increasing the connection pool size, ,,.
Additionally, investigating application performance using application performance monitoring (APM) tools and analyzing MySQL server performance can also be beneficial. the main objective here is to identify any bottlenecks and resolve them.

Since this process involves tuning, there isn't a definitive one-size-fits-all solution, but rather a collection of guidelines aimed at enhancing the overall system performance

@jchrys jchrys added the question Further information is requested label Jul 31, 2023
@jchrys jchrys closed this as completed Jul 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants