diff --git a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts new file mode 100644 index 00000000000..1fb6d9ff68e --- /dev/null +++ b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts @@ -0,0 +1,140 @@ +import { SystemChatMessage } from 'langchain/schema' +import { INode, INodeData, INodeParams } from '../../../src/Interface' +import { getBaseClasses } from '../../../src/utils' +import { ZepMemory, ZepMemoryInput } from 'langchain/memory/zep' +import { ICommonObject } from '../../../src' + +class ZepMemory_Memory implements INode { + label: string + name: string + description: string + type: string + icon: string + category: string + baseClasses: string[] + inputs: INodeParams[] + + constructor() { + this.label = 'Zep Memory' + this.name = 'ZepMemory' + this.type = 'ZepMemory' + this.icon = 'memory.svg' + this.category = 'Memory' + this.description = 'Summarizes the conversation and stores the memory in zep server' + this.baseClasses = [this.type, ...getBaseClasses(ZepMemory)] + this.inputs = [ + { + label: 'Base URL', + name: 'baseURL', + type: 'string', + default: 'http://127.0.0.1:8000' + }, + { + label: 'Auto Summary', + name: 'autoSummary', + type: 'boolean', + default: true + }, + { + label: 'Session Id', + name: 'sessionId', + type: 'string', + description: 'if empty, chatId will be used automatically', + default: '', + additionalParams: true + }, + { + label: 'Auto Summary Template', + name: 'autoSummaryTemplate', + type: 'string', + default: 'This is the summary of the following conversation:\n{summary}', + additionalParams: true + }, + { + label: 'AI Prefix', + name: 'aiPrefix', + type: 'string', + default: 'ai', + additionalParams: true + }, + { + label: 'Human Prefix', + name: 'humanPrefix', + type: 'string', + default: 'human', + additionalParams: true + }, + { + label: 'Memory Key', + name: 'memoryKey', + type: 'string', + default: 'chat_history', + additionalParams: true + }, + { + label: 'Input Key', + name: 'inputKey', + type: 'string', + default: 'input', + additionalParams: true + }, + { + label: 'Output Key', + name: 'outputKey', + type: 'string', + default: 'text', + additionalParams: true + } + ] + } + + async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { + const baseURL = nodeData.inputs?.baseURL as string + const aiPrefix = nodeData.inputs?.aiPrefix as string + const humanPrefix = nodeData.inputs?.humanPrefix as string + const memoryKey = nodeData.inputs?.memoryKey as string + const inputKey = nodeData.inputs?.inputKey as string + const autoSummaryTemplate = nodeData.inputs?.autoSummaryTemplate as string + const autoSummary = nodeData.inputs?.autoSummary as boolean + const sessionId = nodeData.inputs?.sessionId as string + + const chatId = options?.chatId as string + + const obj: ZepMemoryInput = { + baseURL, + sessionId: sessionId ? sessionId : chatId, + aiPrefix, + humanPrefix, + returnMessages: true, + memoryKey, + inputKey + } + + let zep = new ZepMemory(obj) + + // hack to support summary + let tmpFunc = zep.loadMemoryVariables + zep.loadMemoryVariables = async (values) => { + let data = await tmpFunc.bind(zep, values)() + if (autoSummary && zep.returnMessages && data[zep.memoryKey] && data[zep.memoryKey].length) { + const memory = await zep.zepClient.getMemory(zep.sessionId, 10) + if (memory?.summary) { + let summary = autoSummaryTemplate.replace(/{summary}/g, memory.summary.content) + // eslint-disable-next-line no-console + console.log('[ZepMemory] auto summary:', summary) + data[zep.memoryKey].unshift(new SystemChatMessage(summary)) + } + } + // for langchain zep memory compatibility, or we will get "Missing value for input variable chat_history" + if (data instanceof Array) { + data = { + [zep.memoryKey]: data + } + } + return data + } + return zep + } +} + +module.exports = { nodeClass: ZepMemory_Memory } diff --git a/packages/components/nodes/memory/ZepMemory/memory.svg b/packages/components/nodes/memory/ZepMemory/memory.svg new file mode 100644 index 00000000000..ca8e17da1c8 --- /dev/null +++ b/packages/components/nodes/memory/ZepMemory/memory.svg @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/packages/components/package.json b/packages/components/package.json index 3e3d58b6ceb..a778ea8fdb3 100644 --- a/packages/components/package.json +++ b/packages/components/package.json @@ -17,6 +17,7 @@ "license": "SEE LICENSE IN LICENSE.md", "dependencies": { "@dqbd/tiktoken": "^1.0.7", + "@getzep/zep-js": "^0.3.1", "@huggingface/inference": "1", "@pinecone-database/pinecone": "^0.0.12", "@supabase/supabase-js": "^2.21.0", diff --git a/packages/server/src/ChildProcess.ts b/packages/server/src/ChildProcess.ts index 483379d0886..08847a52123 100644 --- a/packages/server/src/ChildProcess.ts +++ b/packages/server/src/ChildProcess.ts @@ -23,7 +23,7 @@ export class ChildProcess { await sendToParentProcess('start', '_') // Create a Queue and add our initial node in it - const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue + const { endingNodeData, chatflow, chatId, incomingInput, componentNodes } = messageValue let nodeToExecuteData: INodeData let addToChatFlowPool: any = {} @@ -83,6 +83,7 @@ export class ChildProcess { depthQueue, componentNodes, incomingInput.question, + chatId, incomingInput?.overrideConfig ) diff --git a/packages/server/src/Interface.ts b/packages/server/src/Interface.ts index b6876df3f16..2c1fe406c90 100644 --- a/packages/server/src/Interface.ts +++ b/packages/server/src/Interface.ts @@ -143,6 +143,7 @@ export interface IDatabaseExport { export interface IRunChatflowMessageValue { chatflow: IChatFlow + chatId: string incomingInput: IncomingInput componentNodes: IComponentNodes endingNodeData?: INodeData diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 3a0c64cdac9..b248f22ccf1 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -432,7 +432,7 @@ export class App { * @param {IncomingInput} incomingInput * @param {INodeData} endingNodeData */ - async startChildProcess(chatflow: ChatFlow, incomingInput: IncomingInput, endingNodeData?: INodeData) { + async startChildProcess(chatflow: ChatFlow, chatId: string, incomingInput: IncomingInput, endingNodeData?: INodeData) { try { const controller = new AbortController() const { signal } = controller @@ -444,6 +444,7 @@ export class App { const value = { chatflow, + chatId, incomingInput, componentNodes: cloneDeep(this.nodesPool.componentNodes), endingNodeData @@ -506,6 +507,9 @@ export class App { }) if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`) + const chatId = await getChatId(chatflow.id) + if (!chatId) return res.status(500).send(`Chatflow ${chatflowid} first message not found`) + if (!isInternal) { await this.validateKey(req, res, chatflow) } @@ -557,7 +561,7 @@ export class App { if (isRebuildNeeded()) { nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData try { - const result = await this.startChildProcess(chatflow, incomingInput, nodeToExecuteData) + const result = await this.startChildProcess(chatflow, chatId, incomingInput, nodeToExecuteData) return res.json(result) } catch (error) { @@ -565,7 +569,7 @@ export class App { } } else { try { - const result = await this.startChildProcess(chatflow, incomingInput) + const result = await this.startChildProcess(chatflow, chatId, incomingInput) return res.json(result) } catch (error) { return res.status(500).send(error) @@ -618,6 +622,7 @@ export class App { depthQueue, this.nodesPool.componentNodes, incomingInput.question, + chatId, incomingInput?.overrideConfig ) @@ -661,6 +666,23 @@ export class App { } } +/** + * Get first chat message id + * @param {string} chatflowid + * @returns {string} + */ +export async function getChatId(chatflowid: string) { + // first chatmessage id as the unique chat id + const firstChatMessage = await getDataSource() + .getRepository(ChatMessage) + .createQueryBuilder('cm') + .select('cm.id') + .where('chatflowid = :chatflowid', { chatflowid }) + .orderBy('cm.createdDate', 'ASC') + .getOne() + return firstChatMessage ? firstChatMessage.id : '' +} + let serverApp: App | undefined export async function start(): Promise { diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 18473c51140..b993b9586f7 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -182,6 +182,7 @@ export const buildLangchain = async ( depthQueue: IDepthQueue, componentNodes: IComponentNodes, question: string, + chatId: string, overrideConfig?: ICommonObject ) => { const flowNodes = cloneDeep(reactFlowNodes) @@ -214,7 +215,7 @@ export const buildLangchain = async ( if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig) const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question) - flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question) + flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, { chatId }) } catch (e: any) { console.error(e) throw new Error(e) diff --git a/packages/ui/src/views/chatmessage/ChatMessage.js b/packages/ui/src/views/chatmessage/ChatMessage.js index 077419f1b78..5021cd9b335 100644 --- a/packages/ui/src/views/chatmessage/ChatMessage.js +++ b/packages/ui/src/views/chatmessage/ChatMessage.js @@ -118,7 +118,8 @@ export const ChatMessage = ({ open, chatflowid, isDialog }) => { setLoading(true) setMessages((prevMessages) => [...prevMessages, { message: userInput, type: 'userMessage' }]) - addChatMessage(userInput, 'userMessage') + // waiting for first chatmessage saved, the first chatmessage will be used in sendMessageAndGetPrediction + await addChatMessage(userInput, 'userMessage') // Send user question and history to API try {