Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] [Azure OpenAI] block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2 #40898

Open
nspyke opened this issue Jun 27, 2024 · 4 comments
Assignees
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. OpenAI question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@nspyke
Copy link

nspyke commented Jun 27, 2024

Describe the bug
In a Spring Boot WebFlux project, calling the stream method of Spring AI which then calls the Azure OpenAI SDK, it throws the exception
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

Exception or Stack Trace

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
	Suppressed: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.6.7.jar:3.6.7]
		at com.azure.core.http.netty.NettyAsyncHttpClient.sendSync(NettyAsyncHttpClient.java:198) ~[azure-core-http-netty-1.15.0.jar:1.15.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.HttpLoggingPolicy.processSync(HttpLoggingPolicy.java:175) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.implementation.http.policy.InstrumentationPolicy.processSync(InstrumentationPolicy.java:101) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.KeyCredentialPolicy.processSync(KeyCredentialPolicy.java:115) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.CookiePolicy.processSync(CookiePolicy.java:73) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.AddDatePolicy.processSync(AddDatePolicy.java:50) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.RetryPolicy.attemptSync(RetryPolicy.java:211) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.RetryPolicy.processSync(RetryPolicy.java:161) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.AddHeadersFromContextPolicy.processSync(AddHeadersFromContextPolicy.java:67) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.RequestIdPolicy.processSync(RequestIdPolicy.java:77) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.HttpPipelineSyncPolicy.processSync(HttpPipelineSyncPolicy.java:51) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.policy.UserAgentPolicy.processSync(UserAgentPolicy.java:174) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipelineNextSyncPolicy.processSync(HttpPipelineNextSyncPolicy.java:53) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:138) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.implementation.http.rest.SyncRestProxy.send(SyncRestProxy.java:62) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.implementation.http.rest.SyncRestProxy.invoke(SyncRestProxy.java:83) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.implementation.http.rest.RestProxyBase.invoke(RestProxyBase.java:125) ~[azure-core-1.49.0.jar:1.49.0]
		at com.azure.core.http.rest.RestProxy.invoke(RestProxy.java:97) ~[azure-core-1.49.0.jar:1.49.0]
		at jdk.proxy2/jdk.proxy2.$Proxy55.getChatCompletionsSync(Unknown Source) ~[na:na]
		at com.azure.ai.openai.implementation.OpenAIClientImpl.getChatCompletionsWithResponse(OpenAIClientImpl.java:1444) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
		at com.azure.ai.openai.OpenAIClient.getChatCompletionsWithResponse(OpenAIClient.java:318) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
		at com.azure.ai.openai.OpenAIClient.getChatCompletionsStream(OpenAIClient.java:732) ~[azure-ai-openai-1.0.0-beta.8.jar:1.0.0-beta.8]
		at org.springframework.ai.azure.openai.AzureOpenAiChatModel.stream(AzureOpenAiChatModel.java:165) ~[spring-ai-azure-openai-1.0.0-M1.jar:1.0.0-M1]
		at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.doGetFluxChatResponse(ChatClient.java:692) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
		at org.springframework.ai.chat.client.ChatClient$ChatClientRequest$StreamResponseSpec.chatResponse(ChatClient.java:707) ~[spring-ai-core-1.0.0-M1.jar:1.0.0-M1]
		at nz.co.airnz.aiplatform.conversations.api.resource.ChatResource.stream(ChatResource.java:30) ~[main/:na]
		at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
		at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
		at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:198) ~[spring-webflux-6.1.10.jar:6.1.10]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.7.jar:3.6.7]
		at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) ~[reactor-netty-core-1.1.20.jar:1.1.20]
		at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.20.jar:1.1.20]
		at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:818) ~[reactor-netty-http-1.1.20.jar:1.1.20]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.20.jar:1.1.20]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:305) ~[reactor-netty-http-1.1.20.jar:1.1.20]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
		at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

To Reproduce
Create a new Spring Boot 3.3.1 project through initializer, add Spring AI and Azure Open AI dependencies and configure them in application.properties for autoconfiguration.

  • Create a RestController
  • Create a ChatClient bean
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
        return builder.build();
}
  • Call your API endpoint

Code Snippet

@RestController
public class ChatResource {
    private final ChatClient chatClient;

    public ChatResource(ChatClient chatClient) {
        this.chatClient = chatClient;
    }
    @PostMapping(path = "stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ChatResponse> stream(@RequestBody String input) {
        return chatClient.prompt().user(input).stream().chatResponse();
    }
}

Expected behavior
Azure OpenAI chat streams a response

Setup (please complete the following information):

  • OS: Windows
  • IDE: IntelliJ
  • Library/Libraries:
    • 'org.springframework.boot:spring-boot-starter-webflux'
    • 'org.springframework.ai:spring-ai-azure-openai-spring-boot-starter'
  • Java version: 21
  • App Server/Environment: Netty
  • Frameworks: Spring Boot 3

Additional context
Add any other context about the problem here.

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [x ] Bug Description Added
  • [ x] Repro Steps Added
  • [x ] Setup information Added
@github-actions github-actions bot added customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-triage This is a new issue that needs to be triaged to the appropriate team. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Jun 27, 2024
@github-actions github-actions bot removed the needs-triage This is a new issue that needs to be triaged to the appropriate team. label Jun 27, 2024
@joshfree
Copy link
Member

@mssfang could you please assist @nspyke

@nspyke
Copy link
Author

nspyke commented Jun 29, 2024

Hi @joshfree, thanks for picking this up so quickly.

I have done a bit more investigation, this time using the Azure OpenAI SDK directly and removing the Spring AI library.
I've found that the stream does work now, even though the stack trace makes it look like it the issue is coming from deep inside the Azure library.

@Bean
public OpenAIAsyncClient chatClient() {
    return new OpenAIClientBuilder()
            .credential(new AzureKeyCredential("my-az-open-ai-key"))
            .endpoint("https://my-endpoint.openai.azure.com")
            .buildAsyncClient();
}

And this code in the controller

@PostMapping(path = "stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatCompletions> stream(@RequestBody String input) {
    List<ChatRequestMessage> chatMessages = new ArrayList<>();
    chatMessages.add(new ChatRequestSystemMessage("You are a helpful assistant. You will talk like a pirate."));
    chatMessages.add(new ChatRequestUserMessage(input));

    return chatClient.getChatCompletionsStream(
            "gpt-4",
            new ChatCompletionsOptions(chatMessages));
}

@nspyke
Copy link
Author

nspyke commented Jun 29, 2024

Back into Spring AI and stepping through with the debugger, it calls
com.azure.ai.openai.implementation.OpenAIClientImpl#getChatCompletionsWithResponse which calls the interface stub
com.azure.ai.openai.implementation.OpenAIClientImpl.OpenAIClientService#getChatCompletionsSync

Response<BinaryData> getChatCompletionsSync(@HostParam("endpoint") String var1, @QueryParam("api-version") String var2, @PathParam("deploymentId") String var3, @HeaderParam("accept") String var4, @BodyParam("application/json") BinaryData var5, RequestOptions var6, Context var7);

This looks like a potential cause of the blocking issue.

I would have expected it to call com.azure.ai.openai.implementation.OpenAIClientImpl#getChatCompletionsWithResponseAsync and com.azure.ai.openai.implementation.OpenAIClientImpl.OpenAIClientService#getChatCompletions with the Mono<> return type.

@nspyke
Copy link
Author

nspyke commented Jun 29, 2024

Duplicated this issue in Spring AI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. OpenAI question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

No branches or pull requests

3 participants