Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions packages/components/nodes/memory/ZepMemory/ZepMemory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { SystemChatMessage } from 'langchain/schema'
import { INode, INodeData, INodeParams } from '../../../src/Interface'
import { getBaseClasses } from '../../../src/utils'
import { ZepMemory, ZepMemoryInput } from 'langchain/memory/zep'
import { ICommonObject } from '../../../src'

class ZepMemory_Memory implements INode {
label: string
name: string
description: string
type: string
icon: string
category: string
baseClasses: string[]
inputs: INodeParams[]

constructor() {
this.label = 'Zep Memory'
this.name = 'ZepMemory'
this.type = 'ZepMemory'
this.icon = 'memory.svg'
this.category = 'Memory'
this.description = 'Summarizes the conversation and stores the memory in zep server'
this.baseClasses = [this.type, ...getBaseClasses(ZepMemory)]
this.inputs = [
{
label: 'Base URL',
name: 'baseURL',
type: 'string',
default: 'http://127.0.0.1:8000'
},
{
label: 'Auto Summary',
name: 'autoSummary',
type: 'boolean',
default: true
},
{
label: 'Session Id',
name: 'sessionId',
type: 'string',
description: 'if empty, chatId will be used automatically',
default: '',
additionalParams: true
},
{
label: 'Auto Summary Template',
name: 'autoSummaryTemplate',
type: 'string',
default: 'This is the summary of the following conversation:\n{summary}',
additionalParams: true
},
{
label: 'AI Prefix',
name: 'aiPrefix',
type: 'string',
default: 'ai',
additionalParams: true
},
{
label: 'Human Prefix',
name: 'humanPrefix',
type: 'string',
default: 'human',
additionalParams: true
},
{
label: 'Memory Key',
name: 'memoryKey',
type: 'string',
default: 'chat_history',
additionalParams: true
},
{
label: 'Input Key',
name: 'inputKey',
type: 'string',
default: 'input',
additionalParams: true
},
{
label: 'Output Key',
name: 'outputKey',
type: 'string',
default: 'text',
additionalParams: true
}
]
}

async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const baseURL = nodeData.inputs?.baseURL as string
const aiPrefix = nodeData.inputs?.aiPrefix as string
const humanPrefix = nodeData.inputs?.humanPrefix as string
const memoryKey = nodeData.inputs?.memoryKey as string
const inputKey = nodeData.inputs?.inputKey as string
const autoSummaryTemplate = nodeData.inputs?.autoSummaryTemplate as string
const autoSummary = nodeData.inputs?.autoSummary as boolean
const sessionId = nodeData.inputs?.sessionId as string

const chatId = options?.chatId as string

const obj: ZepMemoryInput = {
baseURL,
sessionId: sessionId ? sessionId : chatId,
aiPrefix,
humanPrefix,
returnMessages: true,
memoryKey,
inputKey
}

let zep = new ZepMemory(obj)

// hack to support summary
let tmpFunc = zep.loadMemoryVariables
zep.loadMemoryVariables = async (values) => {
let data = await tmpFunc.bind(zep, values)()
if (autoSummary && zep.returnMessages && data[zep.memoryKey] && data[zep.memoryKey].length) {
const memory = await zep.zepClient.getMemory(zep.sessionId, 10)
if (memory?.summary) {
let summary = autoSummaryTemplate.replace(/{summary}/g, memory.summary.content)
// eslint-disable-next-line no-console
console.log('[ZepMemory] auto summary:', summary)
data[zep.memoryKey].unshift(new SystemChatMessage(summary))
}
}
// for langchain zep memory compatibility, or we will get "Missing value for input variable chat_history"
if (data instanceof Array) {
data = {
[zep.memoryKey]: data
}
}
return data
}
return zep
}
}

module.exports = { nodeClass: ZepMemory_Memory }
8 changes: 8 additions & 0 deletions packages/components/nodes/memory/ZepMemory/memory.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions packages/components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"license": "SEE LICENSE IN LICENSE.md",
"dependencies": {
"@dqbd/tiktoken": "^1.0.7",
"@getzep/zep-js": "^0.3.1",
"@huggingface/inference": "1",
"@pinecone-database/pinecone": "^0.0.12",
"@supabase/supabase-js": "^2.21.0",
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/ChildProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class ChildProcess {
await sendToParentProcess('start', '_')

// Create a Queue and add our initial node in it
const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue
const { endingNodeData, chatflow, chatId, incomingInput, componentNodes } = messageValue

let nodeToExecuteData: INodeData
let addToChatFlowPool: any = {}
Expand Down Expand Up @@ -83,6 +83,7 @@ export class ChildProcess {
depthQueue,
componentNodes,
incomingInput.question,
chatId,
incomingInput?.overrideConfig
)

Expand Down
1 change: 1 addition & 0 deletions packages/server/src/Interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ export interface IDatabaseExport {

export interface IRunChatflowMessageValue {
chatflow: IChatFlow
chatId: string
incomingInput: IncomingInput
componentNodes: IComponentNodes
endingNodeData?: INodeData
Expand Down
28 changes: 25 additions & 3 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ export class App {
* @param {IncomingInput} incomingInput
* @param {INodeData} endingNodeData
*/
async startChildProcess(chatflow: ChatFlow, incomingInput: IncomingInput, endingNodeData?: INodeData) {
async startChildProcess(chatflow: ChatFlow, chatId: string, incomingInput: IncomingInput, endingNodeData?: INodeData) {
try {
const controller = new AbortController()
const { signal } = controller
Expand All @@ -444,6 +444,7 @@ export class App {

const value = {
chatflow,
chatId,
incomingInput,
componentNodes: cloneDeep(this.nodesPool.componentNodes),
endingNodeData
Expand Down Expand Up @@ -506,6 +507,9 @@ export class App {
})
if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`)

const chatId = await getChatId(chatflow.id)
if (!chatId) return res.status(500).send(`Chatflow ${chatflowid} first message not found`)

if (!isInternal) {
await this.validateKey(req, res, chatflow)
}
Expand Down Expand Up @@ -557,15 +561,15 @@ export class App {
if (isRebuildNeeded()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
try {
const result = await this.startChildProcess(chatflow, incomingInput, nodeToExecuteData)
const result = await this.startChildProcess(chatflow, chatId, incomingInput, nodeToExecuteData)

return res.json(result)
} catch (error) {
return res.status(500).send(error)
}
} else {
try {
const result = await this.startChildProcess(chatflow, incomingInput)
const result = await this.startChildProcess(chatflow, chatId, incomingInput)
return res.json(result)
} catch (error) {
return res.status(500).send(error)
Expand Down Expand Up @@ -618,6 +622,7 @@ export class App {
depthQueue,
this.nodesPool.componentNodes,
incomingInput.question,
chatId,
incomingInput?.overrideConfig
)

Expand Down Expand Up @@ -661,6 +666,23 @@ export class App {
}
}

/**
* Get first chat message id
* @param {string} chatflowid
* @returns {string}
*/
export async function getChatId(chatflowid: string) {
// first chatmessage id as the unique chat id
const firstChatMessage = await getDataSource()
.getRepository(ChatMessage)
.createQueryBuilder('cm')
.select('cm.id')
.where('chatflowid = :chatflowid', { chatflowid })
.orderBy('cm.createdDate', 'ASC')
.getOne()
return firstChatMessage ? firstChatMessage.id : ''
}

let serverApp: App | undefined

export async function start(): Promise<void> {
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ export const buildLangchain = async (
depthQueue: IDepthQueue,
componentNodes: IComponentNodes,
question: string,
chatId: string,
overrideConfig?: ICommonObject
) => {
const flowNodes = cloneDeep(reactFlowNodes)
Expand Down Expand Up @@ -214,7 +215,7 @@ export const buildLangchain = async (
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question)

flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question)
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, { chatId })
} catch (e: any) {
console.error(e)
throw new Error(e)
Expand Down
3 changes: 2 additions & 1 deletion packages/ui/src/views/chatmessage/ChatMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ export const ChatMessage = ({ open, chatflowid, isDialog }) => {

setLoading(true)
setMessages((prevMessages) => [...prevMessages, { message: userInput, type: 'userMessage' }])
addChatMessage(userInput, 'userMessage')
// waiting for first chatmessage saved, the first chatmessage will be used in sendMessageAndGetPrediction
await addChatMessage(userInput, 'userMessage')

// Send user question and history to API
try {
Expand Down