Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 94 additions & 50 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,38 @@ interface NodeInfoWithClient {
client: NodeClient;
}

/**
* 积极引擎接口
* 定义引擎必须实现的基本操作
*/
interface EagerEngine {
/**
* 接收数据
* @param {string} data - 输入数据
* @returns {string | null} - 如果接受返回格式化后的数据,否则返回 null
*/
input(data: string): string | null;

/**
* 执行一轮搜索
* @param {function} callback - 处理结果的回调函数
* @returns {number} - 搜索结果数量
*/
output(callback: (result: string) => void): number;

/**
* 获取引擎元数据
* @returns {Object} - 包含输入和输出模式的对象
*/
meta(): { input: string[]; output: string[] };
}

/**
* 搜索引擎类
* 扩展 ATSDS 的 Search 类,提供字符串形式的规则处理
* 实现 EagerEngine 接口
*/
class Search extends Search_ {
class Search extends Search_ implements EagerEngine {
/**
* 构造函数
* @param {number} limit_size - Size of the buffer for storing the final objects (rules/facts) in the knowledge base (default: 1000)
Expand Down Expand Up @@ -99,28 +126,27 @@ class Search extends Search_ {
}

/**
* 集群节点类
* 积极节点类
* 管理分布式搜索引擎集群中的单个节点
*/
class ClusterNode {
private id: string;
class EagerNode {
private engine: EagerEngine;
private addr: string;
private engine: Search;
private id: string;
private server: grpc.Server;
private nodes: Map<string, NodeInfoWithClient>;
private data: Set<string>;

/**
* 构造函数
* @param {EagerEngine} engine - 实现 EagerEngine 接口的引擎实例
* @param {string} addr - 节点绑定地址
* @param {string} id - 节点唯一标识符,默认随机生成
* @param {number} limit_size - 搜索引擎的限制大小参数,默认 1000
* @param {number} buffer_size - 搜索引擎的缓冲区大小参数,默认 10000
*/
constructor(addr: string, id: string = randomUUID(), limit_size: number = 1000, buffer_size: number = 10000) {
this.id = id;
constructor(engine: EagerEngine, addr: string, id: string = randomUUID()) {
this.engine = engine;
this.addr = addr;
this.engine = new Search(limit_size, buffer_size);
this.id = id;
this.server = new grpc.Server();
this.nodes = new Map();
this.data = new Set();
Expand Down Expand Up @@ -148,11 +174,13 @@ class ClusterNode {
if (data.length > 0) {
for (const id of this.nodes.keys()) {
if (id !== this.id) {
const node = this.nodes.get(id)!;
const pushDataAsync = promisify<PushDataRequest, PushDataResponse>(
node.client.engine.pushData,
).bind(node.client.engine);
await pushDataAsync({ data });
const node = this.nodes.get(id);
if (node) {
const pushDataAsync = promisify<PushDataRequest, PushDataResponse>(
node.client.engine.pushData,
).bind(node.client.engine);
await pushDataAsync({ data });
}
}
}
}
Expand Down Expand Up @@ -186,11 +214,13 @@ class ClusterNode {
console.log(`Received input: ${formattedLine}`);
for (const id of this.nodes.keys()) {
if (id !== this.id) {
const node = this.nodes.get(id)!;
const pushDataAsync = promisify<PushDataRequest, PushDataResponse>(
node.client.engine.pushData,
).bind(node.client.engine);
await pushDataAsync({ data: [formattedLine] });
const node = this.nodes.get(id);
if (node) {
const pushDataAsync = promisify<PushDataRequest, PushDataResponse>(
node.client.engine.pushData,
).bind(node.client.engine);
await pushDataAsync({ data: [formattedLine] });
}
}
}
});
Expand Down Expand Up @@ -237,9 +267,9 @@ class ClusterNode {
/**
* 启动节点监听服务
* 创建 gRPC 服务器并注册集群管理和引擎服务
* @returns {ClusterNode} 返回当前节点实例
* @returns {EagerNode} 返回当前节点实例
*/
async listen(): Promise<ClusterNode> {
async listen(): Promise<EagerNode> {
this.server.addService(ClusterService, {
/**
* 处理节点加入请求
Expand All @@ -249,11 +279,13 @@ class ClusterNode {
call: grpc.ServerUnaryCall<JoinRequest, JoinResponse>,
callback: grpc.sendUnaryData<JoinResponse>,
) => {
const node = call.request.node!;
const { id, addr } = node;
if (!this.nodes.has(id)) {
this.nodes.set(id, this.nodeInfo(id, addr));
console.log(`Joined node: ${id} at ${addr}`);
const node = call.request.node;
if (node) {
const { id, addr } = node;
if (!this.nodes.has(id)) {
this.nodes.set(id, this.nodeInfo(id, addr));
console.log(`Joined node: ${id} at ${addr}`);
}
}
callback(null, {});
},
Expand All @@ -265,11 +297,13 @@ class ClusterNode {
call: grpc.ServerUnaryCall<LeaveRequest, LeaveResponse>,
callback: grpc.sendUnaryData<LeaveResponse>,
) => {
const node = call.request.node!;
const { id, addr } = node;
if (this.nodes.has(id)) {
this.nodes.delete(id);
console.log(`Left node: ${id} at ${addr}`);
const node = call.request.node;
if (node) {
const { id, addr } = node;
if (this.nodes.has(id)) {
this.nodes.delete(id);
console.log(`Left node: ${id} at ${addr}`);
}
}
callback(null, {});
},
Expand Down Expand Up @@ -315,11 +349,15 @@ class ClusterNode {
call: grpc.ServerUnaryCall<PushDataRequest, PushDataResponse>,
callback: grpc.sendUnaryData<PushDataResponse>,
) => {
const data = call.request.data!;
for (const item of data) {
const formattedItem = this.engine.input(item)!;
this.data.add(formattedItem);
console.log(`Received data: ${item}`);
const data = call.request.data;
if (data) {
for (const item of data) {
const formattedItem = this.engine.input(item);
if (formattedItem !== null) {
this.data.add(formattedItem);
console.log(`Received data: ${item}`);
}
}
}
callback(null, {});
},
Expand Down Expand Up @@ -389,9 +427,11 @@ class ClusterNode {
const dataResponse = await pullAsync({});
if (dataResponse.data) {
for (const item of dataResponse.data) {
const formattedItem = this.engine.input(item)!;
this.data.add(formattedItem);
console.log(`Receiving data: ${item}`);
const formattedItem = this.engine.input(item);
if (formattedItem !== null) {
this.data.add(formattedItem);
console.log(`Receiving data: ${item}`);
}
}
}
console.log(`Joining node ${node.id} at ${node.addr}`);
Expand All @@ -405,14 +445,16 @@ class ClusterNode {
async leave(): Promise<void> {
for (const id of this.nodes.keys()) {
if (id !== this.id) {
const node = this.nodes.get(id)!;
const leaveAsync = promisify<LeaveRequest, LeaveResponse>(node.client.cluster.leave).bind(
node.client.cluster,
);
await leaveAsync({ node: { id: this.id, addr: this.addr } });
node.client.cluster.close();
node.client.engine.close();
console.log(`Leaving node ${node.id} at ${node.addr}`);
const node = this.nodes.get(id);
if (node) {
const leaveAsync = promisify<LeaveRequest, LeaveResponse>(node.client.cluster.leave).bind(
node.client.cluster,
);
await leaveAsync({ node: { id: this.id, addr: this.addr } });
node.client.cluster.close();
node.client.engine.close();
console.log(`Leaving node ${node.id} at ${node.addr}`);
}
}
}
}
Expand Down Expand Up @@ -464,7 +506,8 @@ if (process.argv.length === 4) {
const listenAddr = addAddressPrefixForPort(process.argv[2], "0.0.0.0");
const joinAddr = addAddressPrefixForPort(process.argv[3], "127.0.0.1");
console.log(`Starting node at ${listenAddr} and joining ${joinAddr}`);
const node = new ClusterNode(listenAddr);
const engine = new Search();
const node = new EagerNode(engine, listenAddr);
await node.listen();
await node.join(await node.list(joinAddr));
}
Expand All @@ -476,6 +519,7 @@ if (process.argv.length === 4) {
if (process.argv.length === 3) {
const listenAddr = addAddressPrefixForPort(process.argv[2], "0.0.0.0");
console.log(`Starting node at ${listenAddr}`);
const node = new ClusterNode(listenAddr);
const engine = new Search();
const node = new EagerNode(engine, listenAddr);
await node.listen();
}