diff --git a/packages/components/credentials/UpstashRedisApi.credential.ts b/packages/components/credentials/UpstashRedisApi.credential.ts index b6e62ff35ca..7e2b367f996 100644 --- a/packages/components/credentials/UpstashRedisApi.credential.ts +++ b/packages/components/credentials/UpstashRedisApi.credential.ts @@ -4,12 +4,15 @@ class UpstashRedisApi implements INodeCredential { label: string name: string version: number + description: string inputs: INodeParams[] constructor() { this.label = 'Upstash Redis API' this.name = 'upstashRedisApi' this.version = 1.0 + this.description = + 'Refer to official guide on how to create redis instance and get redis REST URL and Token' this.inputs = [ { label: 'Upstash Redis REST URL', diff --git a/packages/components/nodes/cache/InMemoryCache/InMemoryCache.ts b/packages/components/nodes/cache/InMemoryCache/InMemoryCache.ts new file mode 100644 index 00000000000..1ea035668b8 --- /dev/null +++ b/packages/components/nodes/cache/InMemoryCache/InMemoryCache.ts @@ -0,0 +1,65 @@ +import { getBaseClasses, ICommonObject, INode, INodeData, INodeParams } from '../../../src' +import { BaseCache } from 'langchain/schema' +import hash from 'object-hash' + +class InMemoryCache implements INode { + label: string + name: string + version: number + description: string + type: string + icon: string + category: string + baseClasses: string[] + inputs: INodeParams[] + credential: INodeParams + + constructor() { + this.label = 'InMemory Cache' + this.name = 'inMemoryCache' + this.version = 1.0 + this.type = 'InMemoryCache' + this.description = 'Cache LLM response in memory, will be cleared once app restarted' + this.icon = 'inmemorycache.png' + this.category = 'Cache' + this.baseClasses = [this.type, ...getBaseClasses(InMemoryCacheExtended)] + this.inputs = [] + } + + async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { + const memoryMap = options.cachePool.getLLMCache(options.chatflowid) ?? new Map() + const inMemCache = new InMemoryCacheExtended(memoryMap) + + inMemCache.lookup = async (prompt: string, llmKey: string): Promise => { + const memory = options.cachePool.getLLMCache(options.chatflowid) ?? inMemCache.cache + return Promise.resolve(memory.get(getCacheKey(prompt, llmKey)) ?? null) + } + + inMemCache.update = async (prompt: string, llmKey: string, value: any): Promise => { + inMemCache.cache.set(getCacheKey(prompt, llmKey), value) + options.cachePool.addLLMCache(options.chatflowid, inMemCache.cache) + } + return inMemCache + } +} + +const getCacheKey = (...strings: string[]): string => hash(strings.join('_')) + +class InMemoryCacheExtended extends BaseCache { + cache: Map + + constructor(map: Map) { + super() + this.cache = map + } + + lookup(prompt: string, llmKey: string): Promise { + return Promise.resolve(this.cache.get(getCacheKey(prompt, llmKey)) ?? null) + } + + async update(prompt: string, llmKey: string, value: any): Promise { + this.cache.set(getCacheKey(prompt, llmKey), value) + } +} + +module.exports = { nodeClass: InMemoryCache } diff --git a/packages/components/nodes/cache/InMemoryCache/inmemorycache.png b/packages/components/nodes/cache/InMemoryCache/inmemorycache.png new file mode 100644 index 00000000000..1e5fe6d14a4 Binary files /dev/null and b/packages/components/nodes/cache/InMemoryCache/inmemorycache.png differ diff --git a/packages/components/nodes/cache/MomentoCache/MomentoCache.ts b/packages/components/nodes/cache/MomentoCache/MomentoCache.ts index 9aa82e822ca..2bd2625b86a 100644 --- a/packages/components/nodes/cache/MomentoCache/MomentoCache.ts +++ b/packages/components/nodes/cache/MomentoCache/MomentoCache.ts @@ -19,6 +19,7 @@ class MomentoCache implements INode { this.name = 'momentoCache' this.version = 1.0 this.type = 'MomentoCache' + this.description = 'Cache LLM response using Momento, a distributed, serverless cache' this.icon = 'momento.png' this.category = 'Cache' this.baseClasses = [this.type, ...getBaseClasses(LangchainMomentoCache)] diff --git a/packages/components/nodes/cache/RedisCache/RedisCache.ts b/packages/components/nodes/cache/RedisCache/RedisCache.ts index c1b08be69ea..da2dfc49ec5 100644 --- a/packages/components/nodes/cache/RedisCache/RedisCache.ts +++ b/packages/components/nodes/cache/RedisCache/RedisCache.ts @@ -19,6 +19,7 @@ class RedisCache implements INode { this.name = 'redisCache' this.version = 1.0 this.type = 'RedisCache' + this.description = 'Cache LLM response in Redis, useful for sharing cache across multiple processes or servers' this.icon = 'redis.svg' this.category = 'Cache' this.baseClasses = [this.type, ...getBaseClasses(LangchainRedisCache)] diff --git a/packages/components/nodes/cache/UpstashRedisCache/UpstashRedisCache.ts b/packages/components/nodes/cache/UpstashRedisCache/UpstashRedisCache.ts index eb5a9e2f6a5..f4ed947baa3 100644 --- a/packages/components/nodes/cache/UpstashRedisCache/UpstashRedisCache.ts +++ b/packages/components/nodes/cache/UpstashRedisCache/UpstashRedisCache.ts @@ -18,6 +18,7 @@ class UpstashRedisCache implements INode { this.name = 'upstashRedisCache' this.version = 1.0 this.type = 'UpstashRedisCache' + this.description = 'Cache LLM response in Upstash Redis, serverless data for Redis and Kafka' this.icon = 'upstash.png' this.category = 'Cache' this.baseClasses = [this.type, ...getBaseClasses(LangchainUpstashRedisCache)] diff --git a/packages/components/package.json b/packages/components/package.json index b2f302637a7..f6d5f516949 100644 --- a/packages/components/package.json +++ b/packages/components/package.json @@ -57,6 +57,7 @@ "node-fetch": "^2.6.11", "node-html-markdown": "^1.3.0", "notion-to-md": "^3.1.1", + "object-hash": "^3.0.0", "pdf-parse": "^1.1.1", "pdfjs-dist": "^3.7.107", "pg": "^8.11.2", @@ -73,6 +74,7 @@ "devDependencies": { "@types/gulp": "4.0.9", "@types/node-fetch": "2.6.2", + "@types/object-hash": "^3.0.2", "@types/pg": "^8.10.2", "@types/ws": "^8.5.3", "gulp": "^4.0.2", diff --git a/packages/server/src/CachePool.ts b/packages/server/src/CachePool.ts new file mode 100644 index 00000000000..b59789d2701 --- /dev/null +++ b/packages/server/src/CachePool.ts @@ -0,0 +1,53 @@ +import { IActiveCache } from './Interface' + +/** + * This pool is to keep track of in-memory cache used for LLM and Embeddings + */ +export class CachePool { + activeLLMCache: IActiveCache = {} + activeEmbeddingCache: IActiveCache = {} + + /** + * Add to the llm cache pool + * @param {string} chatflowid + * @param {Map} value + */ + addLLMCache(chatflowid: string, value: Map) { + this.activeLLMCache[chatflowid] = value + } + + /** + * Add to the embedding cache pool + * @param {string} chatflowid + * @param {Map} value + */ + addEmbeddingCache(chatflowid: string, value: Map) { + this.activeEmbeddingCache[chatflowid] = value + } + + /** + * Get item from llm cache pool + * @param {string} chatflowid + */ + getLLMCache(chatflowid: string): Map | undefined { + return this.activeLLMCache[chatflowid] + } + + /** + * Get item from embedding cache pool + * @param {string} chatflowid + */ + getEmbeddingCache(chatflowid: string): Map | undefined { + return this.activeEmbeddingCache[chatflowid] + } +} + +let cachePoolInstance: CachePool | undefined + +export function getInstance(): CachePool { + if (cachePoolInstance === undefined) { + cachePoolInstance = new CachePool() + } + + return cachePoolInstance +} diff --git a/packages/server/src/Interface.ts b/packages/server/src/Interface.ts index 58740b864d4..b3c3a3925ce 100644 --- a/packages/server/src/Interface.ts +++ b/packages/server/src/Interface.ts @@ -157,6 +157,10 @@ export interface IActiveChatflows { } } +export interface IActiveCache { + [key: string]: Map +} + export interface IOverrideConfig { node: string nodeId: string diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index db5ecf38412..9d3f7052400 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -53,6 +53,7 @@ import { ChatMessage } from './database/entities/ChatMessage' import { Credential } from './database/entities/Credential' import { Tool } from './database/entities/Tool' import { ChatflowPool } from './ChatflowPool' +import { CachePool } from './CachePool' import { ICommonObject, INodeOptionsValue } from 'flowise-components' import { createRateLimiter, getRateLimiter, initializeRateLimiter } from './utils/rateLimit' @@ -60,6 +61,7 @@ export class App { app: express.Application nodesPool: NodesPool chatflowPool: ChatflowPool + cachePool: CachePool AppDataSource = getDataSource() constructor() { @@ -91,6 +93,9 @@ export class App { // Initialize Rate Limit const AllChatFlow: IChatFlow[] = await getAllChatFlow() await initializeRateLimiter(AllChatFlow) + + // Initialize cache pool + this.cachePool = new CachePool() }) .catch((err) => { logger.error('❌ [server]: Error during Data Source initialization:', err) @@ -944,8 +949,10 @@ export class App { incomingInput.question, incomingInput.history, chatId, + chatflowid, this.AppDataSource, - incomingInput?.overrideConfig + incomingInput?.overrideConfig, + this.cachePool ) const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 7686e476080..317bcc0c26d 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -35,6 +35,7 @@ import { ChatMessage } from '../database/entities/ChatMessage' import { Credential } from '../database/entities/Credential' import { Tool } from '../database/entities/Tool' import { DataSource } from 'typeorm' +import { CachePool } from '../CachePool' const QUESTION_VAR_PREFIX = 'question' const CHAT_HISTORY_VAR_PREFIX = 'chat_history' @@ -197,8 +198,10 @@ export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeD * @param {IComponentNodes} componentNodes * @param {string} question * @param {string} chatId + * @param {string} chatflowid * @param {DataSource} appDataSource * @param {ICommonObject} overrideConfig + * @param {CachePool} cachePool */ export const buildLangchain = async ( startingNodeIds: string[], @@ -209,8 +212,10 @@ export const buildLangchain = async ( question: string, chatHistory: IMessage[], chatId: string, + chatflowid: string, appDataSource: DataSource, - overrideConfig?: ICommonObject + overrideConfig?: ICommonObject, + cachePool?: CachePool ) => { const flowNodes = cloneDeep(reactFlowNodes) @@ -245,9 +250,11 @@ export const buildLangchain = async ( logger.debug(`[server]: Initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question, { chatId, + chatflowid, appDataSource, databaseEntities, - logger + logger, + cachePool }) logger.debug(`[server]: Finished initializing ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) } catch (e: any) {