diff --git a/main.ts b/main.ts index def0c75..98d6e4e 100644 --- a/main.ts +++ b/main.ts @@ -46,11 +46,38 @@ interface NodeInfoWithClient { client: NodeClient; } +/** + * 积极引擎接口 + * 定义引擎必须实现的基本操作 + */ +interface EagerEngine { + /** + * 接收数据 + * @param {string} data - 输入数据 + * @returns {string | null} - 如果接受返回格式化后的数据,否则返回 null + */ + input(data: string): string | null; + + /** + * 执行一轮搜索 + * @param {function} callback - 处理结果的回调函数 + * @returns {number} - 搜索结果数量 + */ + output(callback: (result: string) => void): number; + + /** + * 获取引擎元数据 + * @returns {Object} - 包含输入和输出模式的对象 + */ + meta(): { input: string[]; output: string[] }; +} + /** * 搜索引擎类 * 扩展 ATSDS 的 Search 类,提供字符串形式的规则处理 + * 实现 EagerEngine 接口 */ -class Search extends Search_ { +class Search extends Search_ implements EagerEngine { /** * 构造函数 * @param {number} limit_size - Size of the buffer for storing the final objects (rules/facts) in the knowledge base (default: 1000) @@ -99,28 +126,27 @@ class Search extends Search_ { } /** - * 集群节点类 + * 积极节点类 * 管理分布式搜索引擎集群中的单个节点 */ -class ClusterNode { - private id: string; +class EagerNode { + private engine: EagerEngine; private addr: string; - private engine: Search; + private id: string; private server: grpc.Server; private nodes: Map; private data: Set; /** * 构造函数 + * @param {EagerEngine} engine - 实现 EagerEngine 接口的引擎实例 * @param {string} addr - 节点绑定地址 * @param {string} id - 节点唯一标识符,默认随机生成 - * @param {number} limit_size - 搜索引擎的限制大小参数,默认 1000 - * @param {number} buffer_size - 搜索引擎的缓冲区大小参数,默认 10000 */ - constructor(addr: string, id: string = randomUUID(), limit_size: number = 1000, buffer_size: number = 10000) { - this.id = id; + constructor(engine: EagerEngine, addr: string, id: string = randomUUID()) { + this.engine = engine; this.addr = addr; - this.engine = new Search(limit_size, buffer_size); + this.id = id; this.server = new grpc.Server(); this.nodes = new Map(); this.data = new Set(); @@ -148,11 +174,13 @@ class ClusterNode { if (data.length > 0) { for (const id of this.nodes.keys()) { if (id !== this.id) { - const node = this.nodes.get(id)!; - const pushDataAsync = promisify( - node.client.engine.pushData, - ).bind(node.client.engine); - await pushDataAsync({ data }); + const node = this.nodes.get(id); + if (node) { + const pushDataAsync = promisify( + node.client.engine.pushData, + ).bind(node.client.engine); + await pushDataAsync({ data }); + } } } } @@ -186,11 +214,13 @@ class ClusterNode { console.log(`Received input: ${formattedLine}`); for (const id of this.nodes.keys()) { if (id !== this.id) { - const node = this.nodes.get(id)!; - const pushDataAsync = promisify( - node.client.engine.pushData, - ).bind(node.client.engine); - await pushDataAsync({ data: [formattedLine] }); + const node = this.nodes.get(id); + if (node) { + const pushDataAsync = promisify( + node.client.engine.pushData, + ).bind(node.client.engine); + await pushDataAsync({ data: [formattedLine] }); + } } } }); @@ -237,9 +267,9 @@ class ClusterNode { /** * 启动节点监听服务 * 创建 gRPC 服务器并注册集群管理和引擎服务 - * @returns {ClusterNode} 返回当前节点实例 + * @returns {EagerNode} 返回当前节点实例 */ - async listen(): Promise { + async listen(): Promise { this.server.addService(ClusterService, { /** * 处理节点加入请求 @@ -249,11 +279,13 @@ class ClusterNode { call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData, ) => { - const node = call.request.node!; - const { id, addr } = node; - if (!this.nodes.has(id)) { - this.nodes.set(id, this.nodeInfo(id, addr)); - console.log(`Joined node: ${id} at ${addr}`); + const node = call.request.node; + if (node) { + const { id, addr } = node; + if (!this.nodes.has(id)) { + this.nodes.set(id, this.nodeInfo(id, addr)); + console.log(`Joined node: ${id} at ${addr}`); + } } callback(null, {}); }, @@ -265,11 +297,13 @@ class ClusterNode { call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData, ) => { - const node = call.request.node!; - const { id, addr } = node; - if (this.nodes.has(id)) { - this.nodes.delete(id); - console.log(`Left node: ${id} at ${addr}`); + const node = call.request.node; + if (node) { + const { id, addr } = node; + if (this.nodes.has(id)) { + this.nodes.delete(id); + console.log(`Left node: ${id} at ${addr}`); + } } callback(null, {}); }, @@ -315,11 +349,15 @@ class ClusterNode { call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData, ) => { - const data = call.request.data!; - for (const item of data) { - const formattedItem = this.engine.input(item)!; - this.data.add(formattedItem); - console.log(`Received data: ${item}`); + const data = call.request.data; + if (data) { + for (const item of data) { + const formattedItem = this.engine.input(item); + if (formattedItem !== null) { + this.data.add(formattedItem); + console.log(`Received data: ${item}`); + } + } } callback(null, {}); }, @@ -389,9 +427,11 @@ class ClusterNode { const dataResponse = await pullAsync({}); if (dataResponse.data) { for (const item of dataResponse.data) { - const formattedItem = this.engine.input(item)!; - this.data.add(formattedItem); - console.log(`Receiving data: ${item}`); + const formattedItem = this.engine.input(item); + if (formattedItem !== null) { + this.data.add(formattedItem); + console.log(`Receiving data: ${item}`); + } } } console.log(`Joining node ${node.id} at ${node.addr}`); @@ -405,14 +445,16 @@ class ClusterNode { async leave(): Promise { for (const id of this.nodes.keys()) { if (id !== this.id) { - const node = this.nodes.get(id)!; - const leaveAsync = promisify(node.client.cluster.leave).bind( - node.client.cluster, - ); - await leaveAsync({ node: { id: this.id, addr: this.addr } }); - node.client.cluster.close(); - node.client.engine.close(); - console.log(`Leaving node ${node.id} at ${node.addr}`); + const node = this.nodes.get(id); + if (node) { + const leaveAsync = promisify(node.client.cluster.leave).bind( + node.client.cluster, + ); + await leaveAsync({ node: { id: this.id, addr: this.addr } }); + node.client.cluster.close(); + node.client.engine.close(); + console.log(`Leaving node ${node.id} at ${node.addr}`); + } } } } @@ -464,7 +506,8 @@ if (process.argv.length === 4) { const listenAddr = addAddressPrefixForPort(process.argv[2], "0.0.0.0"); const joinAddr = addAddressPrefixForPort(process.argv[3], "127.0.0.1"); console.log(`Starting node at ${listenAddr} and joining ${joinAddr}`); - const node = new ClusterNode(listenAddr); + const engine = new Search(); + const node = new EagerNode(engine, listenAddr); await node.listen(); await node.join(await node.list(joinAddr)); } @@ -476,6 +519,7 @@ if (process.argv.length === 4) { if (process.argv.length === 3) { const listenAddr = addAddressPrefixForPort(process.argv[2], "0.0.0.0"); console.log(`Starting node at ${listenAddr}`); - const node = new ClusterNode(listenAddr); + const engine = new Search(); + const node = new EagerNode(engine, listenAddr); await node.listen(); }