# Langchain4j LLM streaming

In [1]:
String userHomeDir = System.getProperty("user.home");
String localRespoUrl = "file://" + userHomeDir + "/.m2/repository/";
String langchain4jVersion = "1.0.0-beta1";
String langgraph4jVersion = "1.4-SNAPSHOT";

Remove installed package from Jupiter cache

In [2]:
%%bash 
rm -rf \{userHomeDir}/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/

In [3]:
%dependency /add-repo local \{localRespoUrl} release|never snapshot|always
// %dependency /list-repos
%dependency /add org.slf4j:slf4j-jdk14:2.0.9
%dependency /add org.bsc.langgraph4j:langgraph4j-core:\{langgraph4jVersion}
%dependency /add org.bsc.langgraph4j:langgraph4j-langchain4j:\{langgraph4jVersion}
%dependency /add dev.langchain4j:langchain4j:\{langchain4jVersion}
%dependency /add dev.langchain4j:langchain4j-open-ai:\{langchain4jVersion}

%dependency /resolve

[0mRepository [1m[32mlocal[0m url: [1m[32mfile:///Users/bsorrentino/.m2/repository/[0m added.
[0mAdding dependency [0m[1m[32morg.slf4j:slf4j-jdk14:2.0.9
[0mAdding dependency [0m[1m[32morg.bsc.langgraph4j:langgraph4j-core:1.3-SNAPSHOT
[0mAdding dependency [0m[1m[32morg.bsc.langgraph4j:langgraph4j-langchain4j:1.3-SNAPSHOT
[0mAdding dependency [0m[1m[32mdev.langchain4j:langchain4j:0.36.2
[0mAdding dependency [0m[1m[32mdev.langchain4j:langchain4j-open-ai:0.36.2
[0mSolving dependencies
Resolved artifacts count: 26
Add to classpath: [0m[32m/Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/slf4j/slf4j-jdk14/2.0.9/slf4j-jdk14-2.0.9.jar[0m
[0mAdd to classpath: [0m[32m/Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/slf4j/slf4j-api/2.0.9/slf4j-api-2.0.9.jar[0m
[0mAdd to classpath: [0m[32m/Users/bsorrentino/Library/Jupyter/kernels/rapaio-jupyter-kernel/mima_cache/org/bsc/langgraph4j/langgraph4j-core

**Initialize Logger**

In [4]:
try( var file = new java.io.FileInputStream("./logging.properties")) {
    var lm = java.util.logging.LogManager.getLogManager();
    lm.checkAccess(); 
    lm.readConfiguration( file );
}

var log = org.slf4j.LoggerFactory.getLogger("llm-streaming");


## How to use LLMStreamingGenerator

In [5]:
import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.model.chat.StreamingChatLanguageModel;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.model.output.Response;
import static dev.langchain4j.model.openai.OpenAiChatModelName.GPT_4_O_MINI;
import org.bsc.langgraph4j.langchain4j.generators.LLMStreamingGenerator;
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.streaming.StreamingOutput;

var generator = LLMStreamingGenerator.<AiMessage,AgentState>builder()
                        .mapResult( r -> Map.of( "content", r.content() ) )
                        .build();

StreamingChatLanguageModel model = OpenAiStreamingChatModel.builder()
    .apiKey(System.getenv("OPENAI_API_KEY"))
    .modelName(GPT_4_O_MINI)
    .build();

String userMessage = "Tell me a joke";

model.generate(userMessage, generator.handler() );

for( var r : generator ) {
    log.info( "{}", r);
}
  
log.info( "RESULT: {}", generator.resultValue().orElse(null) );
  
//Thread.sleep( 1000 );

StreamingOutput{chunk=} 
StreamingOutput{chunk=Why} 
StreamingOutput{chunk= did} 
StreamingOutput{chunk= the} 
StreamingOutput{chunk= scare} 
StreamingOutput{chunk=crow} 
StreamingOutput{chunk= win} 
StreamingOutput{chunk= an} 
StreamingOutput{chunk= award} 
StreamingOutput{chunk=?

} 
StreamingOutput{chunk=Because} 
StreamingOutput{chunk= he} 
StreamingOutput{chunk= was} 
StreamingOutput{chunk= outstanding} 
StreamingOutput{chunk= in} 
StreamingOutput{chunk= his} 
StreamingOutput{chunk= field} 
StreamingOutput{chunk=!} 
RESULT: {content=AiMessage { text = "Why did the scarecrow win an award?

Because he was outstanding in his field!" toolExecutionRequests = null }} 


## Use LLMStreamGenerator in Agent

### Define Serializers

In [6]:
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.ToolExecutionResultMessage;
import dev.langchain4j.agent.tool.ToolExecutionRequest;
import org.bsc.langgraph4j.serializer.std.ObjectStreamStateSerializer;
import org.bsc.langgraph4j.langchain4j.serializer.std.ChatMesssageSerializer;
import org.bsc.langgraph4j.langchain4j.serializer.std.ToolExecutionRequestSerializer;
import org.bsc.langgraph4j.state.AgentStateFactory;
import org.bsc.langgraph4j.prebuilt.MessagesState;

var stateSerializer = new ObjectStreamStateSerializer<MessagesState<ChatMessage>>( MessagesState::new );
stateSerializer.mapper()
    // Setup custom serializer for Langchain4j ToolExecutionRequest
    .register(ToolExecutionRequest.class, new ToolExecutionRequestSerializer() )
    // Setup custom serializer for Langchain4j AiMessage
    .register(ChatMessage.class, new ChatMesssageSerializer() );


SerializerMapper: 
java.util.Map
java.util.Collection
dev.langchain4j.agent.tool.ToolExecutionRequest
dev.langchain4j.data.message.ChatMessage

## Set up the tools

Using [langchain4j], We will first define the tools we want to use. For this simple example, we will
use create a placeholder search engine. However, it is really easy to create
your own tools - see documentation
[here][tools] on how to do
that.

[langchain4j]: https://docs.langchain4j.dev
[tools]: https://docs.langchain4j.dev/tutorials/tools

In [7]:
import dev.langchain4j.agent.tool.P;
import dev.langchain4j.agent.tool.Tool;

import java.util.Optional;

import static java.lang.String.format;

public class SearchTool {

    @Tool("Use to surf the web, fetch current information, check the weather, and retrieve other information.")
    String execQuery(@P("The query to use in your search.") String query) {

        // This is a placeholder for the actual implementation
        return "Cold, with a low of 13 degrees";
    }
}

In [8]:
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.action.EdgeAction;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import org.bsc.langgraph4j.action.NodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import dev.langchain4j.service.tool.DefaultToolExecutor;
import org.bsc.langgraph4j.langchain4j.tool.ToolNode;
import dev.langchain4j.agent.tool.ToolSpecification;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;


// setup streaming model
var model = OpenAiStreamingChatModel.builder()
  .apiKey( System.getenv("OPENAI_API_KEY") )
  .modelName( "gpt-4o-mini" )
  .logResponses(true)
  .temperature(0.0)
  .maxTokens(2000)
  .build();

// setup tools 
var tools = ToolNode.builder()
              .specification( new SearchTool() ) 
              .build(); 

NodeAction<MessagesState<ChatMessage>> callModel = state -> {
  log.info("CallModel:\n{}", state.messages());

  var generator = LLMStreamingGenerator.<AiMessage, MessagesState<ChatMessage>>builder()
          .mapResult(response -> {
              log.info("MapResult: {}", response);
              return Map.of("messages", response.content());
          })
          .startingNode("agent")
          .startingState(state)
          .build();

    model.generate(
            state.messages(),
            tools.toolSpecifications(),
            generator.handler());

    return Map.of("messages", generator);
};
            
// Route Message
EdgeAction<MessagesState<ChatMessage>> routeMessage = state -> {
    log.info("routeMessage:\n{}", state.messages());

    var lastMessage = state.lastMessage()
            .orElseThrow(() -> (new IllegalStateException("last message not found!")));

    if (lastMessage instanceof AiMessage message) {
        // If tools should be called
        if (message.hasToolExecutionRequests()) return "next";
    }

    // If no tools are called, we can finish (respond to the user)
    return "exit";
};
            
// Invoke Tool
NodeAction<MessagesState<ChatMessage>> invokeTool = state -> {
    log.info("invokeTool:\n{}", state.messages());

    var lastMessage = state.lastMessage()
            .orElseThrow(() -> (new IllegalStateException("last message not found!")));


    if (lastMessage instanceof AiMessage lastAiMessage) {

        var result = tools.execute(lastAiMessage.toolExecutionRequests(), null)
                .orElseThrow(() -> (new IllegalStateException("no tool found!")));

        return Map.of("messages", result);

    }

    throw new IllegalStateException("invalid last message");
};
            
// Define Graph
var workflow = new MessagesStateGraph<ChatMessage>(stateSerializer)
        .addNode("agent", node_async(callModel))
        .addNode("tools", node_async(invokeTool))
        .addEdge(START, "agent")
        .addConditionalEdges("agent",
                edge_async(routeMessage),
                Map.of("next", "tools", "exit", END))
        .addEdge("tools", "agent");
            


In [9]:
import org.bsc.langgraph4j.streaming.StreamingOutput;

var app = workflow.compile();

for( var out : app.stream( Map.of( "messages", UserMessage.from( "what is the whether today?")) ) ) {
  if( out instanceof StreamingOutput streaming ) {
    log.info( "StreamingOutput{node={}, chunk={} }", streaming.node(), streaming.chunk() );
  }
  else {
    log.info( "{}", out );
  }

}


START 
CallModel:
[UserMessage { name = null contents = [TextContent { text = "what is the whether today?" }] }] 
MapResult: Response { content = AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp", name = "execQuery", arguments = "{"query":"current weather"}" }] }, tokenUsage = TokenUsage { inputTokenCount = 71, outputTokenCount = 16, totalTokenCount = 87 }, finishReason = TOOL_EXECUTION, metadata = {} } 
routeMessage:
[AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp", name = "execQuery", arguments = "{"query":"current weather"}" }] }] 
NodeOutput{node=__START__, state={messages=[UserMessage { name = null contents = [TextContent { text = "what is the whether today?" }] }]}} 
invokeTool:
[AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp", name = "execQuery", arguments = "{"query":"current weather"}" }] }] 
execut