Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/Add missing TTL implementation for Redis #2131

Merged
merged 1 commit into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
chatHistory: redisChatMessageHistory,
sessionId,
windowSize,
redisClient: client
redisClient: client,
sessionTTL
})

return memory
Expand All @@ -172,18 +173,21 @@ interface BufferMemoryExtendedInput {
redisClient: Redis
sessionId: string
windowSize?: number
sessionTTL?: number
}

class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
sessionId = ''
redisClient: Redis
windowSize?: number
sessionTTL?: number

constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) {
super(fields)
this.sessionId = fields.sessionId
this.redisClient = fields.redisClient
this.windowSize = fields.windowSize
this.sessionTTL = fields.sessionTTL
}

async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
Expand All @@ -207,12 +211,14 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
const newInputMessage = new HumanMessage(input.text)
const messageToAdd = [newInputMessage].map((msg) => msg.toDict())
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL)
}

if (output) {
const newOutputMessage = new AIMessage(output.text)
const messageToAdd = [newOutputMessage].map((msg) => msg.toDict())
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ class UpstashRedisBackedChatMemory_Memory implements INode {

const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject): Promise<BufferMemory> => {
const baseURL = nodeData.inputs?.baseURL as string
const sessionTTL = nodeData.inputs?.sessionTTL as string
const sessionId = nodeData.inputs?.sessionId as string
const _sessionTTL = nodeData.inputs?.sessionTTL as string

const sessionTTL = _sessionTTL ? parseInt(_sessionTTL, 10) : undefined

const credentialData = await getCredentialData(nodeData.credential ?? '', options)
const upstashRestToken = getCredentialParam('upstashRestToken', credentialData, nodeData)
Expand All @@ -101,14 +103,15 @@ const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject

const redisChatMessageHistory = new UpstashRedisChatMessageHistory({
sessionId,
sessionTTL: sessionTTL ? parseInt(sessionTTL, 10) : undefined,
sessionTTL,
client
})

const memory = new BufferMemoryExtended({
memoryKey: 'chat_history',
chatHistory: redisChatMessageHistory,
sessionId,
sessionTTL,
redisClient: client
})

Expand All @@ -118,16 +121,19 @@ const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject
interface BufferMemoryExtendedInput {
redisClient: Redis
sessionId: string
sessionTTL?: number
}

class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
sessionId = ''
redisClient: Redis
sessionTTL?: number

constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) {
super(fields)
this.sessionId = fields.sessionId
this.redisClient = fields.redisClient
this.sessionTTL = fields.sessionTTL
}

async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise<IMessage[] | BaseMessage[]> {
Expand All @@ -152,12 +158,14 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods {
const newInputMessage = new HumanMessage(input.text)
const messageToAdd = [newInputMessage].map((msg) => msg.toDict())
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL)
}

if (output) {
const newOutputMessage = new AIMessage(output.text)
const messageToAdd = [newOutputMessage].map((msg) => msg.toDict())
await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0]))
if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL)
}
}

Expand Down
Loading