From 011f6afcb319eb9ce477e0dd2155de8096e0c540 Mon Sep 17 00:00:00 2001 From: Jeffrey-Wang Date: Sat, 10 Jun 2023 12:09:22 +0800 Subject: [PATCH 1/5] feat: support zep memory --- .../nodes/memory/ZepMemory/ZepMemory.ts | 136 ++++++++++++++++++ .../nodes/memory/ZepMemory/memory.svg | 8 ++ packages/components/package.json | 1 + 3 files changed, 145 insertions(+) create mode 100644 packages/components/nodes/memory/ZepMemory/ZepMemory.ts create mode 100644 packages/components/nodes/memory/ZepMemory/memory.svg diff --git a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts new file mode 100644 index 00000000000..f833b4e3a39 --- /dev/null +++ b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts @@ -0,0 +1,136 @@ +import { SystemChatMessage } from 'langchain/schema' +import { INode, INodeData, INodeParams } from '../../../src/Interface' +import { getBaseClasses } from '../../../src/utils' +import { ZepMemory, ZepMemoryInput } from 'langchain/memory/zep' + +class ZepMemory_Memory implements INode { + label: string + name: string + description: string + type: string + icon: string + category: string + baseClasses: string[] + inputs: INodeParams[] + + constructor() { + this.label = 'Zep Memory' + this.name = 'ZepMemory' + this.type = 'ZepMemory' + this.icon = 'memory.svg' + this.category = 'Memory' + this.description = 'Summarizes the conversation and stores the memory in zep server' + this.baseClasses = [this.type, ...getBaseClasses(ZepMemory)] + this.inputs = [ + { + label: 'Base URL', + name: 'baseURL', + type: 'string', + default: 'http://127.0.0.1:8000' + }, + { + label: 'Session Id', + name: 'sessionId', + type: 'string', + placeholder: 'unique and manually change in every conversion!', + default: '' + }, + { + label: 'Auto Summary', + name: 'autoSummary', + type: 'boolean', + default: true + }, + { + label: 'Auto Summary Template', + name: 'autoSummaryTemplate', + type: 'string', + default: 'This is the summary of the following conversation:\n{summary}', + additionalParams: true + }, + { + label: 'AI Prefix', + name: 'aiPrefix', + type: 'string', + default: 'ai', + additionalParams: true + }, + { + label: 'Human Prefix', + name: 'humanPrefix', + type: 'string', + default: 'human', + additionalParams: true + }, + { + label: 'Memory Key', + name: 'memoryKey', + type: 'string', + default: 'chat_history', + additionalParams: true + }, + { + label: 'Input Key', + name: 'inputKey', + type: 'string', + default: 'input', + additionalParams: true + }, + { + label: 'Output Key', + name: 'outputKey', + type: 'string', + default: 'text', + additionalParams: true + } + ] + } + + async init(nodeData: INodeData): Promise { + const baseURL = nodeData.inputs?.baseURL as string + const sessionId = nodeData.inputs?.sessionId as string + const aiPrefix = nodeData.inputs?.aiPrefix as string + const humanPrefix = nodeData.inputs?.humanPrefix as string + const memoryKey = nodeData.inputs?.memoryKey as string + const inputKey = nodeData.inputs?.inputKey as string + const autoSummaryTemplate = nodeData.inputs?.autoSummaryTemplate as string + const autoSummary = nodeData.inputs?.autoSummary as boolean + + const obj: ZepMemoryInput = { + baseURL, + sessionId, + aiPrefix, + humanPrefix, + returnMessages: true, + memoryKey, + inputKey + } + + let zep = new ZepMemory(obj) + + // hack to support summary + let tmpFunc = zep.loadMemoryVariables + zep.loadMemoryVariables = async (values) => { + let data = await tmpFunc.bind(zep, values)() + if (autoSummary && zep.returnMessages && data[zep.memoryKey] && data[zep.memoryKey].length) { + const memory = await zep.zepClient.getMemory(zep.sessionId, 10) + if (memory?.summary) { + let summary = autoSummaryTemplate.replace(/{summary}/g, memory.summary.content) + // eslint-disable-next-line no-console + console.log('[ZepMemory] auto summary:', summary) + data[zep.memoryKey].unshift(new SystemChatMessage(summary)) + } + } + // for langchain zep memory compatibility, or we will get "Missing value for input variable chat_history" + if (data instanceof Array) { + data = { + [zep.memoryKey]: data + } + } + return data + } + return zep + } +} + +module.exports = { nodeClass: ZepMemory_Memory } diff --git a/packages/components/nodes/memory/ZepMemory/memory.svg b/packages/components/nodes/memory/ZepMemory/memory.svg new file mode 100644 index 00000000000..ca8e17da1c8 --- /dev/null +++ b/packages/components/nodes/memory/ZepMemory/memory.svg @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/packages/components/package.json b/packages/components/package.json index 3e3d58b6ceb..a778ea8fdb3 100644 --- a/packages/components/package.json +++ b/packages/components/package.json @@ -17,6 +17,7 @@ "license": "SEE LICENSE IN LICENSE.md", "dependencies": { "@dqbd/tiktoken": "^1.0.7", + "@getzep/zep-js": "^0.3.1", "@huggingface/inference": "1", "@pinecone-database/pinecone": "^0.0.12", "@supabase/supabase-js": "^2.21.0", From 023967e0c29039ecc3491ee51f7a4a7a5a35e4c2 Mon Sep 17 00:00:00 2001 From: Jeffrey-Wang Date: Sun, 11 Jun 2023 13:14:53 +0800 Subject: [PATCH 2/5] feat: unique chat id --- .../components/nodes/memory/ZepMemory/ZepMemory.ts | 12 +++--------- packages/server/src/ChildProcess.ts | 1 + packages/server/src/index.ts | 11 +++++++++++ packages/server/src/utils/index.ts | 3 ++- packages/ui/src/views/chatmessage/ChatMessage.js | 3 ++- 5 files changed, 19 insertions(+), 11 deletions(-) diff --git a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts index f833b4e3a39..95b83bbb82f 100644 --- a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts +++ b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts @@ -2,6 +2,7 @@ import { SystemChatMessage } from 'langchain/schema' import { INode, INodeData, INodeParams } from '../../../src/Interface' import { getBaseClasses } from '../../../src/utils' import { ZepMemory, ZepMemoryInput } from 'langchain/memory/zep' +import { ICommonObject } from '../../../src' class ZepMemory_Memory implements INode { label: string @@ -28,13 +29,6 @@ class ZepMemory_Memory implements INode { type: 'string', default: 'http://127.0.0.1:8000' }, - { - label: 'Session Id', - name: 'sessionId', - type: 'string', - placeholder: 'unique and manually change in every conversion!', - default: '' - }, { label: 'Auto Summary', name: 'autoSummary', @@ -86,15 +80,15 @@ class ZepMemory_Memory implements INode { ] } - async init(nodeData: INodeData): Promise { + async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { const baseURL = nodeData.inputs?.baseURL as string - const sessionId = nodeData.inputs?.sessionId as string const aiPrefix = nodeData.inputs?.aiPrefix as string const humanPrefix = nodeData.inputs?.humanPrefix as string const memoryKey = nodeData.inputs?.memoryKey as string const inputKey = nodeData.inputs?.inputKey as string const autoSummaryTemplate = nodeData.inputs?.autoSummaryTemplate as string const autoSummary = nodeData.inputs?.autoSummary as boolean + const sessionId = options?.chatId as string const obj: ZepMemoryInput = { baseURL, diff --git a/packages/server/src/ChildProcess.ts b/packages/server/src/ChildProcess.ts index 483379d0886..e5f951700ec 100644 --- a/packages/server/src/ChildProcess.ts +++ b/packages/server/src/ChildProcess.ts @@ -83,6 +83,7 @@ export class ChildProcess { depthQueue, componentNodes, incomingInput.question, + '', incomingInput?.overrideConfig ) diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 3a0c64cdac9..5e91a30ab7a 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -506,6 +506,16 @@ export class App { }) if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`) + // first chatmessage id as the unique chat id + const firstChatMessage = await this.AppDataSource.getRepository(ChatMessage) + .createQueryBuilder('cm') + .select('cm.id') + .where('chatflowid = :chatflowid', { chatflowid }) + .orderBy('cm.createdDate', 'ASC') + .getOne() + if (!firstChatMessage) return res.status(500).send(`Chatflow ${chatflowid} first message not found`) + const chatId = firstChatMessage.id + if (!isInternal) { await this.validateKey(req, res, chatflow) } @@ -618,6 +628,7 @@ export class App { depthQueue, this.nodesPool.componentNodes, incomingInput.question, + chatId, incomingInput?.overrideConfig ) diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 18473c51140..b993b9586f7 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -182,6 +182,7 @@ export const buildLangchain = async ( depthQueue: IDepthQueue, componentNodes: IComponentNodes, question: string, + chatId: string, overrideConfig?: ICommonObject ) => { const flowNodes = cloneDeep(reactFlowNodes) @@ -214,7 +215,7 @@ export const buildLangchain = async ( if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig) const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question) - flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question) + flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, { chatId }) } catch (e: any) { console.error(e) throw new Error(e) diff --git a/packages/ui/src/views/chatmessage/ChatMessage.js b/packages/ui/src/views/chatmessage/ChatMessage.js index 077419f1b78..2f5415d0ef9 100644 --- a/packages/ui/src/views/chatmessage/ChatMessage.js +++ b/packages/ui/src/views/chatmessage/ChatMessage.js @@ -118,7 +118,8 @@ export const ChatMessage = ({ open, chatflowid, isDialog }) => { setLoading(true) setMessages((prevMessages) => [...prevMessages, { message: userInput, type: 'userMessage' }]) - addChatMessage(userInput, 'userMessage') + // waiting for first chatmessage uploaded, the first chatmessage id will be chatId for every components + await addChatMessage(userInput, 'userMessage') // Send user question and history to API try { From fd9d6fcb0356b2a93df4874e910afce6a7482f0b Mon Sep 17 00:00:00 2001 From: Jeffrey-Wang Date: Sun, 11 Jun 2023 23:30:26 +0800 Subject: [PATCH 3/5] fix: zep memory --- .../nodes/memory/ZepMemory/ZepMemory.ts | 14 +++++++++-- packages/server/src/ChildProcess.ts | 4 +++- packages/server/src/index.ts | 23 +++++++++++-------- .../ui/src/views/chatmessage/ChatMessage.js | 2 +- 4 files changed, 30 insertions(+), 13 deletions(-) diff --git a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts index 95b83bbb82f..cf1d8e58e24 100644 --- a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts +++ b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts @@ -35,6 +35,14 @@ class ZepMemory_Memory implements INode { type: 'boolean', default: true }, + { + label: 'Session Id', + name: 'sessionId', + type: 'string', + placeholder: 'if empty, chatId will be used automatically', + default: '', + additionalParams: true + }, { label: 'Auto Summary Template', name: 'autoSummaryTemplate', @@ -88,11 +96,13 @@ class ZepMemory_Memory implements INode { const inputKey = nodeData.inputs?.inputKey as string const autoSummaryTemplate = nodeData.inputs?.autoSummaryTemplate as string const autoSummary = nodeData.inputs?.autoSummary as boolean - const sessionId = options?.chatId as string + const sessionId = nodeData.inputs?.sessionId as string + + const chatId = options?.chatId as string const obj: ZepMemoryInput = { baseURL, - sessionId, + sessionId: sessionId ? sessionId : chatId, aiPrefix, humanPrefix, returnMessages: true, diff --git a/packages/server/src/ChildProcess.ts b/packages/server/src/ChildProcess.ts index e5f951700ec..07b52909a24 100644 --- a/packages/server/src/ChildProcess.ts +++ b/packages/server/src/ChildProcess.ts @@ -1,5 +1,6 @@ import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface' import { buildLangchain, constructGraphs, getEndingNode, getStartingNodes, resolveVariables } from './utils' +import { getChatId } from './index' export class ChildProcess { /** @@ -76,6 +77,7 @@ export class ChildProcess { const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) /*** BFS to traverse from Starting Nodes to Ending Node ***/ + const chatId = await getChatId(chatflow.id) const reactFlowNodes = await buildLangchain( startingNodeIds, nodes, @@ -83,7 +85,7 @@ export class ChildProcess { depthQueue, componentNodes, incomingInput.question, - '', + chatId, incomingInput?.overrideConfig ) diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 5e91a30ab7a..78be231aa35 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -506,15 +506,8 @@ export class App { }) if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`) - // first chatmessage id as the unique chat id - const firstChatMessage = await this.AppDataSource.getRepository(ChatMessage) - .createQueryBuilder('cm') - .select('cm.id') - .where('chatflowid = :chatflowid', { chatflowid }) - .orderBy('cm.createdDate', 'ASC') - .getOne() - if (!firstChatMessage) return res.status(500).send(`Chatflow ${chatflowid} first message not found`) - const chatId = firstChatMessage.id + const chatId = await getChatId(chatflow.id) + if (!chatId) return res.status(500).send(`Chatflow ${chatflowid} first message not found`) if (!isInternal) { await this.validateKey(req, res, chatflow) @@ -672,6 +665,18 @@ export class App { } } +export async function getChatId(chatflowid: string) { + // first chatmessage id as the unique chat id + const firstChatMessage = await getDataSource() + .getRepository(ChatMessage) + .createQueryBuilder('cm') + .select('cm.id') + .where('chatflowid = :chatflowid', { chatflowid }) + .orderBy('cm.createdDate', 'ASC') + .getOne() + return firstChatMessage ? firstChatMessage.id : '' +} + let serverApp: App | undefined export async function start(): Promise { diff --git a/packages/ui/src/views/chatmessage/ChatMessage.js b/packages/ui/src/views/chatmessage/ChatMessage.js index 2f5415d0ef9..5021cd9b335 100644 --- a/packages/ui/src/views/chatmessage/ChatMessage.js +++ b/packages/ui/src/views/chatmessage/ChatMessage.js @@ -118,7 +118,7 @@ export const ChatMessage = ({ open, chatflowid, isDialog }) => { setLoading(true) setMessages((prevMessages) => [...prevMessages, { message: userInput, type: 'userMessage' }]) - // waiting for first chatmessage uploaded, the first chatmessage id will be chatId for every components + // waiting for first chatmessage saved, the first chatmessage will be used in sendMessageAndGetPrediction await addChatMessage(userInput, 'userMessage') // Send user question and history to API From fe6737a6cb4f7118c8d26f7d57001a85f17a36f0 Mon Sep 17 00:00:00 2001 From: Jeffrey-Wang Date: Mon, 12 Jun 2023 21:52:46 +0800 Subject: [PATCH 4/5] fix: childprocess chatId. --- packages/components/nodes/memory/ZepMemory/ZepMemory.ts | 2 +- packages/server/src/ChildProcess.ts | 4 +--- packages/server/src/Interface.ts | 1 + packages/server/src/index.ts | 7 +++++++ 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts index cf1d8e58e24..1fb6d9ff68e 100644 --- a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts +++ b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts @@ -39,7 +39,7 @@ class ZepMemory_Memory implements INode { label: 'Session Id', name: 'sessionId', type: 'string', - placeholder: 'if empty, chatId will be used automatically', + description: 'if empty, chatId will be used automatically', default: '', additionalParams: true }, diff --git a/packages/server/src/ChildProcess.ts b/packages/server/src/ChildProcess.ts index 07b52909a24..08847a52123 100644 --- a/packages/server/src/ChildProcess.ts +++ b/packages/server/src/ChildProcess.ts @@ -1,6 +1,5 @@ import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface' import { buildLangchain, constructGraphs, getEndingNode, getStartingNodes, resolveVariables } from './utils' -import { getChatId } from './index' export class ChildProcess { /** @@ -24,7 +23,7 @@ export class ChildProcess { await sendToParentProcess('start', '_') // Create a Queue and add our initial node in it - const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue + const { endingNodeData, chatflow, chatId, incomingInput, componentNodes } = messageValue let nodeToExecuteData: INodeData let addToChatFlowPool: any = {} @@ -77,7 +76,6 @@ export class ChildProcess { const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) /*** BFS to traverse from Starting Nodes to Ending Node ***/ - const chatId = await getChatId(chatflow.id) const reactFlowNodes = await buildLangchain( startingNodeIds, nodes, diff --git a/packages/server/src/Interface.ts b/packages/server/src/Interface.ts index b6876df3f16..2c1fe406c90 100644 --- a/packages/server/src/Interface.ts +++ b/packages/server/src/Interface.ts @@ -143,6 +143,7 @@ export interface IDatabaseExport { export interface IRunChatflowMessageValue { chatflow: IChatFlow + chatId: string incomingInput: IncomingInput componentNodes: IComponentNodes endingNodeData?: INodeData diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 78be231aa35..9379a9221a6 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -441,9 +441,11 @@ export class App { if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts' const childProcess = fork(childpath, [], { signal }) + const chatId = await getChatId(chatflow.id) const value = { chatflow, + chatId, incomingInput, componentNodes: cloneDeep(this.nodesPool.componentNodes), endingNodeData @@ -665,6 +667,11 @@ export class App { } } +/** + * Get first chat message id + * @param {string} chatflowid + * @returns {string} + */ export async function getChatId(chatflowid: string) { // first chatmessage id as the unique chat id const firstChatMessage = await getDataSource() From a5c408dbe8d58cd1fa262177a489367c639ae469 Mon Sep 17 00:00:00 2001 From: Jeffrey-Wang Date: Mon, 12 Jun 2023 23:45:41 +0800 Subject: [PATCH 5/5] fix: remove useless query --- packages/server/src/index.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 9379a9221a6..b248f22ccf1 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -432,7 +432,7 @@ export class App { * @param {IncomingInput} incomingInput * @param {INodeData} endingNodeData */ - async startChildProcess(chatflow: ChatFlow, incomingInput: IncomingInput, endingNodeData?: INodeData) { + async startChildProcess(chatflow: ChatFlow, chatId: string, incomingInput: IncomingInput, endingNodeData?: INodeData) { try { const controller = new AbortController() const { signal } = controller @@ -441,7 +441,6 @@ export class App { if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts' const childProcess = fork(childpath, [], { signal }) - const chatId = await getChatId(chatflow.id) const value = { chatflow, @@ -562,7 +561,7 @@ export class App { if (isRebuildNeeded()) { nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData try { - const result = await this.startChildProcess(chatflow, incomingInput, nodeToExecuteData) + const result = await this.startChildProcess(chatflow, chatId, incomingInput, nodeToExecuteData) return res.json(result) } catch (error) { @@ -570,7 +569,7 @@ export class App { } } else { try { - const result = await this.startChildProcess(chatflow, incomingInput) + const result = await this.startChildProcess(chatflow, chatId, incomingInput) return res.json(result) } catch (error) { return res.status(500).send(error)