In [None]:
import numpy as np

np.__version__

In [None]:
class SNPGenerator:

    def __init__(self, c, r):
        self.inputs = []  # 暂时默认输入到0号
        self.outputs = []  # 暂时不需要输出
        self.cells = np.empty(c)
        self.rules = np.empty((c, r, c + 1))

        self.logs = np.empty([0, c], dtype=np.int32)
        self.logs_no_input = np.empty([0, c], dtype=np.int32)

    def set_inputs(self, inputs):
        self.inputs = inputs
        # inputs = [1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]

    def set_cells(self, cells):
        self.cells = cells
        # cells = np.array([0, 0, 2, 0])

    def set_rules(self, rules):
        self.rules = rules
        # rules = np.array([[[1, -1, 1, 0, 1],
        #                    [0, 0, 0, 0, 0]],
        # 
        #                   [[1, 0, -1, 1, 0],
        #                    [3, 0, -3, 0, 0]],
        # 
        #                   [[1, 0, 0, -1, 1],
        #                    [3, 0, 0, -3, 0]],
        # 
        #                   [[1, 0, 1, 0, -1],
        #                    [2, 0, 1, 0, -2]]])

    def clear_except_rules(self):
        c = self.cells.size
        self.inputs = []  # 暂时默认输入到0号
        self.outputs = []  # 暂时不需要输出
        self.cells = np.empty(c)
        self.logs = np.empty([0, c], dtype=np.int32)
        self.logs_no_input = np.empty([0, c], dtype=np.int32)

    def generate(self):
        # 非空断言
        assert len(self.inputs) > 0, "no input data provided"
        assert self.cells.size > 0, "no cells set"

        for input_spike in self.inputs:
            # 追加无输入日志
            self.logs_no_input = np.vstack((self.logs_no_input, np.array([self.cells])))
            # 输入脉冲信号
            self.cells[0] += input_spike
            # 追加带输入日志
            self.logs = np.vstack((self.logs, np.array(self.cells)))

            # 点火计数器和累加器归零
            fired = 0
            update = np.zeros(self.cells.shape[0], dtype=np.int32)

            # 遍历整个神经元组，依次计算要点火的规则
            for cid in range(self.cells.shape[0]):
                # 找到目标神经元的内部脉冲数目和规则
                spikes = self.cells[cid]
                rule_mat = self.rules[cid]  # shape=(2,5)
                # 过滤符合阈值的规则         
                cond = np.equal(rule_mat[:, 0], spikes) & np.not_equal(rule_mat[:, 0], 0)
                res = rule_mat[cond]

                # 无规则适合就直接跳过
                if res.shape[0] == 0:
                    continue
                # 随机点火
                rng = np.random.default_rng()
                rows = rng.choice(res, 1, replace=False)
                update += rows[0, 1:]
                fired += 1

            # 如果没有规则被执行，且无输入了，那说明系统已经结束了 
            if fired == 0:
                # break
                pass  # 维持输出矩阵的完整形状
            # 更新神经元状态
            self.cells += update

    def save(self, filename):
        np.save(filename, self.logs_no_input)
        np.save(filename + ".input", self.inputs)

In [None]:
def test_generator():
    # 初始化神经元组
    generator = SNPGenerator(4, 2)
    # 填充数据
    generator.set_inputs([1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0])
    generator.set_cells(np.array([0, 0, 2, 0]))
    generator.set_rules(np.array([[[1, -1, 1, 0, 1],
                                   [0, 0, 0, 0, 0]],

                                  [[1, 0, -1, 1, 0],
                                   [3, 0, -3, 0, 0]],

                                  [[1, 0, 0, -1, 1],
                                   [3, 0, 0, -3, 0]],

                                  [[1, 0, 1, 0, -1],
                                   [2, 0, 1, 0, -2]]]))
    # 运行并记录日志
    generator.generate()
    # generator.save("test")
    print(generator.logs)


if __name__ == '__main__':
    # 测试
    test_generator()

In [None]:
def test_npy():
    npy_train_input_datas = np.load('npy/train_input_datas.npy')
    npy_train_datas = np.load('npy/train_datas.npy')
    npy_test_input_datas = np.load('npy/test_input_datas.npy')
    npy_test_datas = np.load('npy/test_datas.npy')
    print(npy_train_input_datas[30])
    print(npy_train_datas[30])
    print(npy_test_input_datas[30])
    print(npy_test_datas[30])


if __name__ == '__main__':
    # 测试输出文件
    test_npy()

In [None]:
if __name__ == '__main__':
    # 批量生成数据
    train_datas = np.zeros(shape=(2000, 20, 4), dtype=np.int32)  # 实际上第二维是低于20项的，因为可能提前结束
    train_input_datas = np.random.randint(2, size=(2000, 20))
    train_input_datas[:, 0] = 1  # 第一项数据必须是1
    # 构造生成器，填充规则
    gen = SNPGenerator(4, 2)
    gen.set_rules(np.array([[[1, -1, 1, 0, 1],
                             [0, 0, 0, 0, 0]],

                            [[1, 0, -1, 1, 0],
                             [3, 0, -3, 0, 0]],

                            [[1, 0, 0, -1, 1],
                             [3, 0, 0, -3, 0]],

                            [[1, 0, 1, 0, -1],
                             [2, 0, 1, 0, -2]]]))
    # 遍历所有输入行，一共idx=2000次
    for idx in range(train_input_datas.shape[0]):
        # 计算新一轮
        data = train_input_datas[idx].tolist()
        gen.clear_except_rules()
        gen.set_inputs(data)
        gen.set_cells(np.array([0, 0, 2, 0]))
        gen.generate()
        # 抄录结果
        train_datas[idx] = gen.logs_no_input

    # 输出训练集数目
    print(f"train input datas: {train_input_datas.shape} generated.")
    print(f"train datas: {train_datas.shape} generated.")
    # 按照9:1的比例来划分训练数据和测试数据（旧文件数据会覆盖）
    np.save('npy/train_input_datas.npy', train_input_datas[:1800])
    np.save('npy/train_datas.npy', train_datas[:1800])
    np.save('npy/test_input_datas.npy', train_input_datas[1800:])
    np.save('npy/test_datas.npy', train_datas[1800:])
    print("done.")