# GCN相关特征的生成

### (不要随意重新运行) 通过 node2vec 获取节点向量（包含调整id0->n_questions）

##### 241011：注意到除了孤立节点，其他节点embedding的生成具有随机性！！！（一是需要小心覆盖情况；二是检查相似度关系是否稳定！）

In [12]:
import json
import networkx as nx
import pandas as pd
from node2vec import Node2Vec
import copy

# 节点 & 边 原始json文件
# concept_map_path = '/mnt/new_pfs/liming_team/auroraX/caoying/github/CRKT/data/DBE_KT22/data/concept_map_vis.json'

datasets = 'DBE_KT22' 
# datasets = 'EdNet'
# datasets = 'NIPS34'

concept_map_file = f'{datasets}/concept_map_vis.json'      # 包含节点 & 边的信息（这玩意是怎么获得的？）
node_embeddings_file = f'{datasets}/node_embeddings.csv'   # 节点embedding（本步骤中生成并保存）
concept_sim_file = f'{datasets}/concept_sim.xlsx'          # 节点相似度信息excel（基于节点embedding计算得到）

# 读取JSON文件
with open(concept_map_file, 'r') as f:
    data = json.load(f)

"""========================================================================="""
"""241011: 改写节点和边的信息：将0号节点及其关系复制至93号，同时彻底断开其与其他节点的关系"""
# 修改节点信息
max_id = -1
for ind, node in enumerate(data['nodes']):
    if node["id"] > max_id:
        max_id = node["id"]          # 查找最大qid
    if node["id"] == 0:
        ind_temp = ind
        temp = copy.deepcopy(node)   # 深拷贝需要处理的node，并记录其索引
# 修改原始node
data["nodes"][ind_temp]["name"] += '(deprecated)'
# 在末尾append复制的新node
temp["id"] = max_id + 1
data["nodes"].append(temp)

# 修改边的信息(将所有与id0节点相关的关系转移给id93)
for ind, dict in enumerate(data['links']):
    if dict["source"] == 0:
        data["links"][ind]["source"] = max_id + 1
    if dict["target"] == 0:
        data["links"][ind]["target"] = max_id + 1
"""================================= done! ================================="""

# 创建图
G = nx.Graph()

# 添加节点
for node in data['nodes']:
    G.add_node(node['id'], name=node['name'])   # 传参形如 0, "Set"

# 添加边
for link in data['links']:
    G.add_edge(link['source'], link['target'])  # 传参形如 0, 1

# 创建node2vec模型
node2vec = Node2Vec(G, dimensions=128, walk_length=30, num_walks=200, workers=4)

# 训练模型
model = node2vec.fit(window=10, min_count=1, batch_words=4)

# 获取节点的embedding
node_embeddings = {node: model.wv[str(node)] for node in G.nodes()}

embeddings_dict = {
    'index': [],
}

# 为每个embedding维度创建一个列
dimensions = len(next(iter(node_embeddings.values())))  # 128

# # check
# print(node_embeddings.keys())                     # dict_keys([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,...
# # print(node_embeddings[0])                       # numpy.ndarray, (128,)
# print(next(iter(node_embeddings.values())))       # 同上
# print(len(next(iter(node_embeddings.values()))))  # 128


for i in range(dimensions):
    embeddings_dict[f'{i}'] = []
    # print(embeddings_dict.keys())  # ['index','0','1',...]

# 填充字典
for node in G.nodes():
    embeddings_dict['index'].append(node)  # index列记录节点ID（int）
    embedding = node_embeddings[node]
    for i, value in enumerate(embedding):
        embeddings_dict[f'{i}'].append(value)  # 按照索引添加至对应的列

# 创建DataFrame
df = pd.DataFrame(embeddings_dict)
df.to_csv(node_embeddings_file, index=False)

print("Node embeddings have been saved to node_embeddings.csv")


Computing transition probabilities: 100%|██████████| 94/94 [00:00<00:00, 31614.51it/s]
Generating walks (CPU: 1): 100%|██████████| 50/50 [00:00<00:00, 158.50it/s]
Generating walks (CPU: 4): 100%|██████████| 50/50 [00:00<00:00, 157.55it/s]
Generating walks (CPU: 2): 100%|██████████| 50/50 [00:00<00:00, 157.35it/s]
Generating walks (CPU: 3): 100%|██████████| 50/50 [00:00<00:00, 158.00it/s]


Node embeddings have been saved to node_embeddings.csv


### (不要随意重新运行) 获取每个节点的5 个相似节点（包含对调整和孤立节点的处理）

In [13]:
"""假设我们已经有了node_embeddings字典，其中包含了每个节点的embedding向量"""
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity


# 1. 准备数据
concept_sim_file = f'{datasets}/concept_sim.xlsx'                    # 输出文件名
node_ids = list(G.nodes())                                           # 0-93
embeddings = np.array([node_embeddings[node] for node in node_ids])  # (94, 128)

# 2. 计算余弦相似度矩阵
similarity_matrix = cosine_similarity(embeddings)

# # check
# print(similarity_matrix.shape)                                     # (94, 94)
# print(np.max(similarity_matrix))
# print(np.min(similarity_matrix))
# print(similarity_matrix[0,:])
# print(similarity_matrix[:,0])

"""================================================================="""
"""241011: 确保孤立节点与其他所有节点的相似度为-1(不会被选为其他节点的相似节点)"""
for i in range(similarity_matrix.shape[0]):
    similarity_matrix[i,0] = -1  # 第0列置为-1
"""============================= done! ============================="""

# 3. 找出每个节点最相似的5个节点
top_k = 5
most_similar = {}

for i, node_id in enumerate(node_ids):
    # 获取相似度，并排除自身
    similarities = [(j, similarity_matrix[i][j]) for j in range(len(node_ids)) if i != j]
    # 按相似度降序排序
    similarities.sort(key=lambda x: x[1], reverse=True)  # 处理后孤立节点相似度为-1，必定排在队尾
    # 取前5个最相似的节点
    top_similar = similarities[:top_k]

    """============================================"""
    """241011: 改写孤立节点的topk相似节点的相似度，统一为0"""
    if node_id == 0:
        top_similar = [(idx, 0) for idx, sim in top_similar]
    """================== done! ==================="""
    
    # 存储结果
    most_similar[node_id] = [(node_ids[idx], sim) for idx, sim in top_similar]

# 4. 保存结果
results = []
for node_id, similar_nodes in most_similar.items():
    node_name = data['nodes'][node_id]['name']
    # print(f"\n节点 {node_id} ({node_name}) 的最相似5个节点:")

    # for similar_id, similarity in similar_nodes:
    #     similar_name = data['nodes'][similar_id]['name']
    #     print(f"  - 节点 {similar_id} ({similar_name}): 相似度 {similarity:.4f}")
    result={
        'id': node_id,
        '节点实际内容': node_name,
        '最相似的前5个节点id及相似度': [(similar_id, similarity) for similar_id, similarity in similar_nodes]
    }
    for i,(similar_id, similarity) in enumerate(similar_nodes):
        similar_name = data['nodes'][similar_id]['name']
        result[f'最相似的节点 {i+1}'] = similar_name
    results.append(result)

results_df = pd.DataFrame(results)
results_df.to_excel(concept_sim_file, index=False)
print(f"Topic results have been saved to {concept_sim_file}.")

Topic results have been saved to DBE_KT22/concept_sim.xlsx.


### 读取DBE_KT22数据集的原始fold数据，整合为txt样本格式（包含调整qid后的pid/q/s信息）

##### 241017: 新增乱序

In [17]:
# 获取train.txt和 test.txt（包含调整qid后的pid/q/s信息）—— 暂时将test置空
import json
import os
import random
import pandas as pd
from sklearn.model_selection import train_test_split

datasets = 'DBE_KT22'
# datasets = 'EdNet'
# datasets = 'NIPS34'

train_file = f'{datasets}/train.txt'
test_file = f'{datasets}/test.txt'


def process_fold_files(file_paths):
    all_data = []
    for file_path in file_paths:
        with open(file_path, 'r') as f:
            data = json.load(f)
        
        for item in data:
            # user_id = item['user']
            questions = item['question']
            scores = item['score']
            concepts = item['concept']  # 这里concept的id还没进行调整！（查一下具体在哪一步骤调整了？）

            """=================================== 同步乱序三个list ==================================="""
            packed_list = list(zip(questions, scores, concepts))
            random.shuffle(packed_list)
            questions, scores, concepts = zip(*packed_list)
            questions = list(questions)
            scores = list(scores)
            concepts = list(concepts)
            """======================================= done! ========================================"""
            # # check
            # print(item)
            # raise TypeError

            processed_item = [
                len(questions),
                ','.join(map(str, questions)),
                ','.join(['_'.join(map(str, c)) for c in concepts]),
                ','.join(map(str, scores))
            ]
            all_data.append(processed_item)

    return all_data


def write_to_file(data, file_path):
    with open(file_path, 'w') as f:
        for item in data:
            for field in item:
                f.write(f"{field}\n")
            # f.write("\n")  # 在每个用户的数据之间添加一个空行


def main():
    fold_files = [f'{datasets}/fold{i}.json' for i in range(5)]
    all_data = process_fold_files(fold_files)

    # 随机打乱数据
    # random.shuffle(all_data)
    
    # 分割为训练集和测试集
    split_index = int(len(all_data) * 1.0)  # 暂时将测试集置空，在后面的步骤中再拆分
    train_data = all_data[:split_index]
    test_data = all_data[split_index:]
    # 保存为文本文件
    write_to_file(train_data, train_file)
    write_to_file(test_data, test_file)
    print(f"处理完成。训练集保存到 train.txt (共{len(train_data)}条记录)，测试集保存到 test.txt (共{len(test_data)}条记录)。")
  

if __name__ == "__main__":
    main()

处理完成。训练集保存到 train.txt (共1153条记录)，测试集保存到 test.txt (共0条记录)。


### (若不重新生成GCN属性则无需重新运行) 获取图的边连接信息

In [15]:
def process_concept_map(input_file, output_file):
    # 读取JSON文件
    with open(input_file, 'r') as f:
        data = json.load(f)
    
    # 提取links数据
    links = data['links']
    
    # 将数据写入输出文件
    with open(output_file, 'w') as f:
        for link in links:
            source = link['source']
            target = link['target']
            f.write(f"{source}\t{target}\n")
    
    print(f"处理完成。链接数据已保存到 {output_file}")


def main():
    # datasets = 'DBE_KT22' 
    # datasets = 'EdNet'
    # datasets = 'NIPS34'

    concept_map_file = f'{datasets}/concept_map_vis.json'
    output_file = f'{datasets}/concept_links.txt'
    
    process_concept_map(concept_map_file, output_file)


if __name__ == "__main__":
    main()

处理完成。链接数据已保存到 DBE_KT22/concept_links.txt


# 不同方法生成学生能力并保存

### （1）统计历史数据，生成每个学生的能力预测（整合自线上项目的proccess_opensource.ipynb）——后续用于与 策略 和 模型预报 结果进行对比

##### 导入

In [18]:
# import math
import sys
import os
import json
import numpy as np
import matplotlib
# matplotlib.use('Agg')   # 确保你的图表在后台生成且不显示图形窗口
import matplotlib.pyplot as plt
from datetime import datetime
import functools

import seaborn as sns
from collections import defaultdict

def ensure_directory_exists(path_in):
    """确保路径存在，否则递归地建立文件夹"""
    if not os.path.exists(path_in):
        os.makedirs(path_in)

##### 读取txt格式数据

In [19]:
from itertools import islice
import subprocess

def get_line_count(file_in):
    result = subprocess.run(["wc","-l",file_in], capture_output=True, text=True)
    return int(result.stdout.split()[0])

def read_n_lines(file_in, n):
    with open(file_in, "r") as file:
        while True:
            lines = list(islice(file, n))
            if not lines:
                break
            yield lines

# datasets = 'DBE_KT22'
# datasets = 'EdNet'
# datasets = 'NIPS34'

# 调用
file_path = f'{datasets}/train.txt'
n_lines = 4

# 获取行数
len_file = get_line_count(file_in=file_path)
print(len_file)

# test
for lines in read_n_lines(file_in=file_path, n=n_lines):
    for line in lines:
        print(line)  # end=""
        print(type(line))
    print("--end of 4 lines--")
    break

4612
34

<class 'str'>
19,0,23,29,2,5,33,24,1,14,15,12,28,31,9,17,3,16,10,11,6,27,7,21,13,26,25,20,22,32,8,18,4,30

<class 'str'>
6_8_1,0,4_6,7_1,1,3,7_1,4_5_6,0,4_7,5_7,1,6_1,7_1,4,8_1,1,4_6,4_5,5_6,1,4_5_2,6,3_6,4_7,1_2,5_6,5_2,5,7_1,5_6,6_8_1,2,7_1

<class 'str'>
1,1,1,1,0,1,0,1,1,0,0,1,1,0,1,1,0,1,1,1,1,1,1,1,0,1,0,1,1,0,1,1,1,0

<class 'str'>
--end of 4 lines--


##### 处理为json接口格式（这一步骤对序列长度进行了筛选）

In [20]:
import json

data = []
stu_id = 0
for lines in read_n_lines(file_in=file_path, n=n_lines):
    stu_id += 1  # 学生id从1起（仅用于查找在txt文件中的位置）
    if stu_id % 100 == 0:
        print("{} of {}".format(stu_id, len_file//4))

    # lines 为长度=4 的列表
    n_seq = int(lines[0])
    pids = lines[1].strip().split(",")  # 得到元素为str的list（且去除多余的\n）
    qs = lines[2].strip().split(",")
    ss = lines[3].strip().split(",")
    # qs = ast.literal_eval("["+ lines[2] +"]")  # 错误，会自动将下划线去掉合并为单个int元素


    # 判断序列长度是否达标，否则跳过
    if n_seq < 100:
        continue

    # 处理格式，并根据qs生成均匀权重（默认）
    pids = list(map(int, pids))
    qs = [list(map(int, elem.split("_"))) for elem in qs]  # list of lists(一般长度为1-3)
    ss = list(map(int, ss))
    w_qs = [[round(1/len(elem), 3)]*len(elem) for elem in qs]
    
    # 声明接口格式字典，并赋值
    data_out = {
        "old_p": [],
        "new_p": [],
        "student": {
            "topic_mastery": {},
            "id": stu_id
        }
    }
    for pid, q, w_q, s in zip(pids, qs, w_qs, ss):
        # 向"old_p"中逐条添加做题记录
        record = {          # 每次遍历重新定义，防止指向同一dict
            "pid": pid,
            "q": q,
            "q_w": w_q,
            "w": [],        # 开源数据集无单词
            "w_w": [],
            "s": s,
            "diff": 1,      # 开源数据集无难度，默认为1
            "session": 1,
            "corrate": -1   # 新增正确率项
        }
        data_out["old_p"].append(record)
    
    # 汇总为list of dicts
    data.append(data_out)

# 保存为json文件
with open(f'{datasets}/train.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=4)

100 of 1153
200 of 1153
300 of 1153
400 of 1153
500 of 1153
600 of 1153
700 of 1153
800 of 1153
900 of 1153
1000 of 1153
1100 of 1153


##### 根据历史统计，生成学生能力，保存为list of dicts，元素为 学生id：{qid0:qval0, qid1:qval1, ...}

In [21]:
"""
正确率-能力映射关系参考：
def get_solution(coefficient=None, constant=None):
    '''
    求解线性方程组（diff为离散的二次项）
    :param coefficient: np.ndarray, 系数矩阵
    :param constant: np.ndarray, 常数项
    说明：映射关系如下：
        正确率 = A * mean(掌握度)/(max-min) + B * (难度/max)^2 + C
    对于KC掌握度取值0-1连续
        正确率 = A * mean(掌握度)/(1-0) + B * (难度/1)^2 + C
    三种情景：
        （1） 最菜的学生（平均掌握度0）做中等难的题（难度1），正确率为 0
        （2） 最犇的学生（平均掌握度1）做中等难的题（难度1），正确率为 1
        （3） 普通的学生（平均掌握度0.66）做中等难的题（难度1），正确率为 0.7
    则有以下三元一次方程组：
        （1） 0/1 * A + (-1*(1/1)**2) * B + 1 * C = 0.33      -b+c = 0.25-0.33  => b=0(无效化难度), c=0.25
        （2） 1/1 * A + (-1*(1/1)**2) * B + 1 * C = 1            a = 0.66
        （3） 2/3 * A + (-1*(1/1)**2) * B + 1 * C = 0.7
    即为函数中默认的 coefficient 和 constant 数组取值.
    '''
    if coefficient is None:
        coefficient = np.array([[0/1, -1*(1/1)**2, 1],
                                [1/1, -1*(1/1)**2, 1],
                                [2/3, -1*(1/1)**2, 1]])
    if constant is None:
        constant = np.array([0, 1.0, 0.7])
    # 求解三元一次方程组
    _x, _y, _z = np.linalg.solve(coefficient, constant)
    return _x, _y, _z

该函数默认情况下无解，因为开源数据集下难度失效了，因此简化为：
   0.66 * capability + 0.25 = correct_rate
=> (correct_rate - 0.25) * 1.5 = capability
"""

with open(f'{datasets}/train.json', "r", encoding='UTF-8') as f:
    data = json.load(f)  # list of dicts

def init_stu_capability(n_q, start=0, padding_value=-1):
    keys = range(start, n_q+start)
    result = {key: padding_value for key in keys}
    return result

# # check
# print(data[0].keys())
# print(data[0]["student"])
# print(init_stu_capability(93))

# 声明必要的参数
n_questions = 93
n_pid = 212
data_out = {}

# 遍历每个学生，更新能力表征
for stu in data:
    stu_cap = init_stu_capability(n_questions)  # 初始化能力字典
    stu_id = stu["student"]["id"]               # 学生ID

    for record in stu["old_p"]:   # list of dicts
        for q in record["q"]:     # list of ints
            if record["s"] == 1:  # 答对
                if isinstance(stu_cap[q], dict):
                    stu_cap[q][1] += 1
                else:  # 此前未涉及该kp(为填充值-1)，则初始化
                    stu_cap[q] = {0:0,1:1}
            else:                 # 答错
                if isinstance(stu_cap[q], dict):
                    stu_cap[q][0] += 1
                else:
                    stu_cap[q] = {0:1,1:0}
            
    # 根据统计后的stu_cap计算正确率，再通过公式反演能力
    for key, val in stu_cap.items():  # 形如1: {0:20,1:25}
        if isinstance(val, dict):
            corrate = round(val[1]/(val[0]+val[1]), 2)
            capability = (corrate - 0.25) * 1.5      # 公式来源参考下一单元格
            capability = max(0, min(capability, 1))  # clip
            stu_cap[key] = capability                # update
        else:     # 全程未涉及知识点的值仍为-1
            pass  # 保持-1不变
    
    # 将计算得到的能力记录至data_out(以学生ID作为键)
    data_out[stu_id] = stu_cap

# 将结果保存为json
with open(f'{datasets}/stu_cap_stat.json', 'w', encoding='utf-8') as f:
    json.dump(data_out, f, ensure_ascii=False, indent=4)

### （2）通过策略调整，生成每个学生的能力预测（整合自线上项目的concept_predict_linear.ipynb）——后续用于与 统计 和 模型预报 结果进行对比（同时qid的修改对齐也是在这里进行的！）

##### 导入和自定义策略函数

In [22]:
# import math
import sys
import os
import json
import numpy as np
import matplotlib
matplotlib.use('Agg')   # 确保你的图表在后台生成且不显示图形窗口
import matplotlib.pyplot as plt
from datetime import datetime
import functools
import copy

import seaborn as sns
from collections import defaultdict
# from scipy.sparse import lil_matrix

# cur_dir = os.path.dirname(os.path.abspath(__file__))
from config import BASE_DIR
cur_dir = BASE_DIR
# root_dir = os.path.dirname(os.path.dirname(os.path.dirname(cur_dir)))  # 向上追溯三级
root_dir = cur_dir
sys.path.append(root_dir)

print(cur_dir)   # /mnt/new_pfs/liming_team/auroraX/songchentao/ketcat/data
print(root_dir)  # 同上


def ensure_directory_exists(path_in):
    """确保路径存在，否则递归地建立文件夹"""
    if not os.path.exists(path_in):
        os.makedirs(path_in)


def get_solution(coefficient=None, constant=None):
    """
    求解线性方程组（diff为离散的二次项）
    :param coefficient: np.ndarray, 系数矩阵
    :param constant: np.ndarray, 常数项
    说明：映射关系如下：
        正确率 = A * mean(掌握度)/(max-min) + B * (难度/max)^2 + C
    对于KC掌握度取值1-4，难度取值1-3：=====================================================================改为0-1连续
        正确率 = A * mean(掌握度)/(4-1) + B * (难度/3)^2 + C  == 正确率 = A * mean(掌握度)/(1-0) + B * (难度/3)^2 + C
    设定三种情景：
        （1）最菜的学生（掌握度1+1+2）做最简单的题（难度1），正确率为 0
        （2）最犇的学生（掌握度4+4+4）做最困难的题（难度3），正确率为 1
        （3）普通的学生（掌握度2+3+3）做中等难的题（难度2），正确率为 0.7
    则有以下三元一次方程组：
        （1） 4/9 * A + (-1*(1/3)**2) * B + 1 * C = 0    ======最菜的学生（平均掌握度0）做最简单的题（难度1），正确率为 0
        （2）11/9 * A + (-1*(3/3)**2) * B + 1 * C = 1    ======最犇的学生（平均掌握度1）做最困难的题（难度3），正确率为 1
        （3） 8/9 * A + (-1*(2/3)**2) * B + 1 * C = 0.7  ======普通的学生（平均掌握度0.66）做中等难的题（难度2），正确率为 0.7
    新方程组为：
        （1） 0/1 * A + (-1*(1/3)**2) * B + 1 * C = 0
        （2） 1/1 * A + (-1*(3/3)**2) * B + 1 * C = 1
        （3） 2/3 * A + (-1*(2/3)**2) * B + 1 * C = 0.7
    即为函数中默认的 coefficient 和 constant 数组取值.
    """
    if coefficient is None:
        # coefficient = np.array([[4/9, -1*(1/3)**2, 1],
        #                         [11/9,-1*(3/3)**2, 1],
        #                         [8/9, -1*(2/3)**2, 1]])
        coefficient = np.array([[0/1, -1*(1/3)**2, 1],
                                [1/1, -1*(3/3)**2, 1],
                                [2/3, -1*(2/3)**2, 1]])
    if constant is None:
        constant = np.array([0, 1.0, 0.7])
    # 求解三元一次方程组
    _x, _y, _z = np.linalg.solve(coefficient, constant)
    return _x, _y, _z


def plot_pdf(data_in_1, label_1='error', data_in_2=None, label_2=None):
    """
    绘制概率密度函数图
    :param data_in: 1d array
    """
    # if data_in is None:
    #     # 生成正态分布示例数据
    #     np.random.seed(42)
    #     data_in = np.random.normal(loc=0, scale=1, size=1000)

    # 使用 seaborn 绘制概率密度函数图
    plt.figure(figsize=(8, 6))
    sns.kdeplot(data_in_1, fill=True, color="b", legend=label_1)
    if data_in_2 is not None:
        sns.kdeplot(data_in_2, fill=True, color="b", legend=label_1)

    # 添加标题和标签
    plt.title('Probability Density Function (PDF)')
    plt.xlabel('Value')
    plt.ylabel('Density')

    # 显示网格
    plt.grid(True)

    # 保存
    now = datetime.now()
    now = now.strftime("%Y-%m-%d_%H:%M:%S.%f")  # 默认格式：YYYY-MM-DD HH:MM:SS.mmmmmm
    plt.savefig('pics/pdf_{}.png'.format(now))


def plot_histogram(data_in, text=None, file_out=None):
    """
    绘制直方图
    :param data_in: 1d array
    :param text: str or None
    :param file_out: str or None
    """
    # if data_in is None:
    #     # 生成正态分布示例数据
    #     np.random.seed(42)
    #     data_in = np.random.normal(loc=0, scale=1, size=1000)
    if text is None:
        text = ''

    # 使用 seaborn 绘制概率密度函数图
    plt.figure(figsize=(8, 6))

    bins = np.linspace(-1,1,31).tolist()
    plt.hist(data_in, bins=bins, color='b', alpha=0.7, edgecolor='black')  # 设定箱子数量为30

    plt.xlim([-1,1])
    # plt.ylim([0,20])  # 不设置ylim

    # 计算文本显示位置
    ax = plt.gca()
    x_limits = ax.get_xlim()
    y_limits = ax.get_ylim()
    x_text = x_limits[1] + (x_limits[1] - x_limits[0]) * 0.05  # 超出横坐标最大值的5%
    y_text = y_limits[1] + (y_limits[1] - y_limits[0]) * 0.1  # 超出纵坐标最大值的10%
    plt.text(x_text, y_text, text, fontsize=12, ha='right', va='top')  # 显示文本

    # 添加标题和标签
    plt.title('Histogram')
    plt.xlabel('Value')
    plt.ylabel('Density')

    # 显示网格
    plt.grid(True)

    # 保存
    now = datetime.now()
    now = now.strftime("%Y-%m-%d_%H:%M:%S.%f")  # 默认格式：YYYY-MM-DD HH:MM:SS.mmmmmm
    if file_out is None:
        file_out = 'pics/histogram_{}.png'.format(now)  # 默认保存文件名
    plt.savefig(file_out)
    plt.close()

/mnt/new_pfs/liming_team/auroraX/songchentao/ketcat/data
/mnt/new_pfs/liming_team/auroraX/songchentao/ketcat/data


In [23]:
class ConceptPredictStrategy:
    """
    self.history 学生在知识点上历史答题记录的统计，可以包括该知识点历史做题数，平均难度，平均正确率
    self.knowledge_points 学生的知识点掌握度，由历史答题记录初始化，每次做题后更新
    240919更新：
        （1）输入输出对齐在线文档《0827-接口文档》，以及engine.py中的调用代码
        （2）拆分 topic和 word 相应的方法，适用于两种实际调用场合（topic+单词 / 只需要返回topic维度）
        （3）部署时，本class只初始化一次，实例化对象服务于所有用户，因此全局不发生变化的参数在__init__中初始化，对应于config中给定，应当尽可能齐全
            目前包含：
                知识点相似度矩阵；
                和已有参数对齐的各种embedding表大小/起止索引；
        （4）self.history：新的框架结构下，调整功能，用于区分topic和word策略：
            具体来说，单词层面延续原有设想，将之前所有session一视同仁，利用self.history进行初始化；
            而topic层面，不再利用self.history，而是引入session间的遗忘机制（需要仔细考虑衰减+停止参数设置的组合，怎样对于v02的约200个知识点是最好的）
    """
    def __init__(self,
                 n_diff, 
                 list_kp=(1, 208), 
                 n_words=(1, 2348), 
                 n_diff_w=None, 
                 valid_range=None, 
                 similarity=None, 
                 padding_value=-1, 
                 last_n=10, 
                 threshold=0.05,
                 lr=0.5,
                 decay_IntraSess=0.98,
                 decay_InterSess=0.98,
                 decay_PracInSess=0.8,
                 hyperparams=None,
                 nxneg=1.0,
                 ):
        """
        初始化
        :param valid_range: tuple, 知识点掌握度取值范围，(0,1)
        :param list_kp:     tuple, 知识点ID取值范围，   (1, n_questions)
        :param similarity:  np.ndarray, KC相似度稀疏矩阵，(n_questions, n_questions)
        :param padding_value: int, 填充值(未接触到的KC掌握度赋值为-1)
        :param n_diff: int, 题目难度档位数，一般为3，即难度取值为1,2,3
        :param n_diff_w: int, 单词难度档位数，一般为3，即难度取值为1,2,3
        :param n_words: tuple, 单词ID取值范围，   (1, n_words)
        :param last_n: int, 记录last_n步调整量用list，用于判断停止推题
        :param threshold: float, 停止推题的阈值
        learning_rate = 0.5    # 学习率
        decay_factor_1 = 0.98  # 衰减因子1:session内/间遗忘（较弱）
        decay_factor_2 = 0.9   # 衰减因子2:session内练习量（较强）
        """
        self.n_diff = n_diff                     # 题目难度档位数，一般为3，即难度取值为1,2,3
        if n_diff_w is None:
            self.n_diff_w = n_diff               # 单词难度档位数，一般为3，即难度取值为1,2,3
        if valid_range is None:
            self.valid_range = (0, 1)            # 知识点掌握度取值范围（240919改为不再映射至分类，而是直接使用原有的0-1连续值，注意需要对应重新计算超参数）
        else:
            self.valid_range = valid_range
        self.list_kp = list(range(list_kp[0], list_kp[1]+1))  # 知识点id列表（冷启动初始化用）
        self.n_words = list(range(n_words[0], n_words[1]+1))  # 同上，单词id列表
        if similarity is None:                   # KC相似度稀疏矩阵
            self.similarity = None
        else:
            self.similarity = similarity         # (n_questions, n_questions) np.ndarray
        self.padding_value = padding_value       # 填充值(未接触到的KC掌握度赋值为-1)
        # self.history = {}                      # (deprecated) dict，形如 ind_kp: (n_p, avg_diff, avg_correctness)，仅在单词维度使用（记录学生答题记录的历史平均用）
        # self.knowledge_points = {}             # (deprecated) 学生的topic/知识点掌握度
        # self.mastery_words = {}                # (deprecated) 学生的单词掌握度
        # self.adjustments = []                  # (deprecated) 记录last_n步调整量用list，用于判断停止推题
        if hyperparams is None:
            self.A, self.B, self.C = get_solution()  # 正确率-难度-掌握度 映射关系超参数，调用get_solution求解
        else:
            self.A, self.B, self.C = hyperparams     # 直接指定超参数
        self.last_n = last_n
        self.threshold = threshold
        self.lr = lr                             # 学习率和衰减项相关
        self.decay_IntraSess = decay_IntraSess
        self.decay_InterSess = decay_InterSess
        self.decay_PracInSess = decay_PracInSess
        self.nxneg = nxneg                       # 负例的调整倍率

    def map_difficulty(self, diff):
        """难度取值scaling至[0,1]"""
        return diff / self.n_diff

    def map_difficulty_w(self, diff):
        """难度取值scaling至[0,1]"""
        return diff / self.n_diff_w

    def init_from_history(self, user_history):
        """
        单词维度：使用 self.history 初始化 self.mastery_words（取值范围[0,1]）
                240920: 分离reset功能；对于未接触到的单词掌握度padding为-1
        """
        mastery_words = {}
        if len(user_history) > 0:                                                                                  # 若有历史记录，则调用映射公式计算得到的单词掌握度作为初始化结果
            for ind_kp, (n_p, avg_diff, avg_corr) in user_history.items():
                mastery_words[ind_kp] = ((self.valid_range[1] - self.valid_range[0])/self.A) * (avg_corr - self.C + self.B * (self.map_difficulty(avg_diff))**2)
                mastery_words[ind_kp] = max(self.valid_range[0], min(mastery_words[ind_kp], self.valid_range[1]))  # clip（单词和topic的掌握度取值范围统一为0-1）
                for _elem in self.n_words:                                                                         # 根据self.n_words，将所有不存在历史记录的单词统一初始化为padding值(-1)
                    if _elem not in mastery_words.keys():
                        # mastery_words[_elem] = 0.5 * (self.valid_range[1] + self.valid_range[0])                 # 初始化为取值范围中位数(0.5)
                        mastery_words[_elem] = self.padding_value                                                  # 初始化为padding值(-1)
        elif self.n_words is not None:                                                                             # 若无历史记录，则根据单词表统一初始化为-1
            for _elem in self.n_words:
                # mastery_words[_elem] = (self.valid_range[1] + self.valid_range[0]) / 2                           # 初始化为取值范围中位数(0.5)
                mastery_words[_elem] = self.padding_value                                                          # 初始化为padding值(-1)

        return mastery_words

    def init_from_padding_value(self):
        """
        topic/知识点维度：使用填充值初始化 self.knowledge_points（注意更新时需要先reset为有效取值范围的中位数）
        """
        mastery_topic = {}
        for _elem in self.list_kp:                       # 遍历整个topic/知识点表
            mastery_topic[_elem] = self.padding_value    # 初始化为padding值(-1)
        return mastery_topic

    @staticmethod
    def preprocess_data(data_in):
        """
        预处理接口数据（统一进行，不在外部流程中单独调用，注意避免重复操作，包含topic & 单词维度，"old_p" & "new_p"）
        """
        # "old_p"
        pid = []
        q = []    # topic
        w_q = []  # topic权重
        w = []    # 单词
        w_w = []  # 单词权重
        s = []
        diff = []
        sess = []
        # 逐条遍历历史做题序列中的做题记录（dict）
        _data = data_in["old_p"]
        for _dict in _data:                # append 至对应list的元素及其属性
            pid.append(_dict["pid"])       # 题目id                 int
            q.append(_dict["q"])           # 题目涉及的topic/知识点   list of int
            w.append(_dict["w"])           # 题目涉及的单词           list of int
            w_q.append(_dict["q_w"])       # 题目涉及的知识点权重      list of float
            w_w.append(_dict["w_w"])       # 题目涉及的知识点权重      list of float
            s.append(_dict["s"])           # 答题结果                int
            diff.append(_dict["diff"])     # 题目难度                int
            sess.append(_dict["session"])  # 题目所属session         int

        # "new_p"
        if "new_p" in data_in.keys():
            new_pid = []
            new_q = []    # topic
            new_w_q = []  # topic权重
            new_w = []    # 单词
            new_w_w = []  # 单词权重
            # new_s = []  # 待预报序列无答题结果和session两项属性
            new_diff = []
            # new_sess = []
            # 逐条遍历历史做题序列中的做题记录（dict）
            _data = data_in["new_p"]
            for _dict in _data:                    # append 至对应list的元素及其属性
                new_pid.append(_dict["pid"])       # 题目id                 int
                new_q.append(_dict["q"])           # 题目涉及的topic/知识点   list of int
                new_w.append(_dict["w"])           # 题目涉及的单词           list of int
                new_w_q.append(_dict["q_w"])       # 题目涉及的知识点权重      list of float
                new_w_w.append(_dict["w_w"])       # 题目涉及的知识点权重      list of float
                # new_s.append(_dict["s"])           # 答题结果                int
                new_diff.append(_dict["diff"])     # 题目难度                int
                # new_sess.append(_dict["session"])  # 题目所属session         int

            # 静态方法，不进行外部单独调用，有返回值
            return pid, q, w_q, w, w_w, s, diff, sess, (new_pid, new_q, new_w_q, new_w, new_w_w, new_diff)
        else:
            return pid, q, w_q, w, w_w, s, diff, sess, None

    @staticmethod
    def split_sessions(pid, q, w_q, w, w_w, s, diff, sess):
        """
        接续preprocess_data，拆分session（topic/单词维度通用，仅针对"old_p"数据）
        返回list形式，每个元素为一个session的数据dict
        """
        # topic维度，需要拆分所有session数据，因此建立session id的list（按json序列先后顺序）
        session_ids = []
        for _sess in sess:
            if _sess not in session_ids:
                session_ids.append(_sess)
        # 初始化一个字典，用于存储分割后的子列表
        split_data = defaultdict(lambda: {'pid': [], 'q': [], 'w_q':[], 'w': [], 'w_w': [], 's': [], 'diff': [], 'sess': []})
        # 遍历数据，并根据session id进行分割，append至对应session的list
        for _sess, _pid, _q, _wq, _w, _ww, _s, _diff in zip(sess, pid, q, w_q, w, w_w, s, diff):
            split_data[_sess]['pid'].append(_pid)
            split_data[_sess]['q'].append(_q)
            split_data[_sess]['w_q'].append(_wq)
            split_data[_sess]['w'].append(_w)
            split_data[_sess]['w_w'].append(_ww)
            split_data[_sess]['s'].append(_s)
            split_data[_sess]['diff'].append(_diff)
            split_data[_sess]['sess'].append(_sess)

        result = []  # 将字典转换为列表（按照json序列的先后顺序）
        for _sess in session_ids:
            result.append(split_data[_sess])
        return result  # list of dicts（由每个session数据字典组成的list）

    @staticmethod
    def calc_history(data_in):
        """
        （单词维度）根据历史session数据，计算history
        遍历非当前session的所有数据，按照word建立历史统计，内容包含 ind_word: (n_p, avg_diff, avg_correctness)
        :return: dict
        """
        user_history = {}  # 初始化输出
        # 遍历session
        for ind_sess, current_session in enumerate(data_in):  # 解析数据：当前（遍历）session
            # pids = current_session['pid']       # 题目id序列
            # qs = current_session['q']           # topic维度
            # w_qs = current_session['w_q']       # 权重序列
            ws = current_session['w']           # 单词维度
            # w_ws = current_session['w_w']       # 权重序列
            ss = current_session['s']           # 答题结果序列
            diffs = current_session['diff']     # 题目难度序列
            # sessions = current_session['sess']  # 题目session序列

            # 遍历当前session题目序列
            for _i, (w, s, diff) in enumerate(zip(ws, ss, diffs)):
                for _ii, _w in enumerate(w):
                    # 遍历题目涉及的所有单词
                    if _w not in user_history.keys():                   # 若第一次做涉及单词_w的题
                        user_history[_w] = (1, diff, s)                 # 原始diff，未scaling
                    else:                                               # 否则running更新user_history[_w]
                        n_p, avg_diff, avg_corr = user_history[_w]      # 读取
                        n_p += 1                                        # 更新元组内容
                        avg_diff = (avg_diff * (n_p - 1) + diff) / n_p  # n_p已更新，因此-1
                        avg_corr = (avg_corr * (n_p - 1) + s) / n_p
                        user_history[_w] = (n_p, avg_diff, avg_corr)    # 覆盖

        return user_history

    def update_mastery_words(self, data_in):  # , pids, qs, ss, diffs, sessions, weights=None, last_n=10, threshold=0.05
        """
        单词维度：传入历史+当前session做题序列，更新并返回知识点掌握度
        （deprecated，同时判断是否满足停止条件）
        :param data_in: list of dict（self.split_sessions的输出），每个元素包含：
             pid: list of int, 题目id序列
             q: list of list,  知识点序列
             w_q: list of list, 知识点权重序列
             w: list of list,  单词序列
             w_w: list of list, 单词权重序列
             s: list of int,   答题结果序列
             diff: list of int, 题目难度序列
             sess: list of int, 题目session序列
        :return: dict, 单词掌握度，形如 {1:0.3,2:0.5,3:0.1,4:-1,5:0.2...}
        """
        # （1）解析输入数据
        history = self.calc_history(data_in[:-1])        # 使用历史session数据计算user_history dict，形如 ind_kp: (n_p, avg_diff, avg_correctness)
        mastery_words = self.init_from_history(history)  # 初始化学生的单词掌握度 dict（不再使用self.knowledge_points等属性）

        current_session = data_in[-1]                  # dict，当前session
        pids = current_session['pid']                  # 题目id序列
        # qs = current_session['q']                    # topic维度
        # w_qs = current_session['w_q']                # 权重序列
        ws = current_session['w']                      # 单词维度
        w_ws = current_session['w_w']                  # 权重序列
        ss = current_session['s']                      # 答题结果序列
        diffs = current_session['diff']                # 题目难度序列
        sessions = current_session['sess']             # 题目session序列

        # （2）参数设置
        learning_rate = self.lr                        # 学习率
        decay_factor_1 = self.decay_IntraSess          # 衰减因子1:session内遗忘（较弱）
        decay_factor_2 = self.decay_PracInSess         # 衰减因子2:session内练习量（较强）
        n_ = len(pids)                                 # 当前session的题目序列长度
        kp_count = {}                                  # 记录当前session已做过的涉及某知识点的题数，如 1: 3

        # （3）遍历当前session题目序列
        for _i, (pid, w, s, diff, weight) in enumerate(zip(pids, ws, ss, diffs, w_ws)):
            diff_norm = self.map_difficulty_w(diff)    # 难度取值scaling至[0,1]
            # 遍历当前题目涉及的单词
            for _ii, k_id in enumerate(w):
                if mastery_words[k_id] == self.padding_value:  # 对于无历史记录的单词，初始化掌握度为取值范围中位数 (0.5)
                    mastery_words[k_id] = 0.5 * (self.valid_range[1] + self.valid_range[0])
                """
                掌握度调整：考虑时间衰减和题目难度
                调整量 = 学习率 * 
                        指数形式的时间衰减（遗忘，如果是题目在材料中的顺序的话比较合理）*
                        题目难度（越难调整越多，这个不尽然，难题作对了奖励多，但做错了的话惩罚应该相对温和，反而是容易的题做错了惩罚大，作对了奖励小）
                        从历史统计or当前session中获取当前kp的历史做题数，用于调整量的指数衰减（以使其稳定）
                """
                # （直接调整）当前session已做过的涉及该单词的题数
                if k_id in kp_count.keys():
                    n_sess = kp_count[k_id]
                    kp_count[k_id] += 1
                else:
                    n_sess = 0
                    kp_count[k_id] = 1
                """
                # 两部分指数衰减，一是距做完当前遍历题目&知识点已过去多少道题（session内遗忘），二是对当前遍历知识点的历史做题数
                # adjustment = learning_rate * (decay_factor_1 ** (n_-_i)) * (decay_factor_2 ** n_hist)  # 调整量
                # adjustment = learning_rate * (decay_factor_2 ** n_hist)                                # (deprecated) 调整量(历史总做题数衰减)
                # adjustment = learning_rate * (decay_factor_2 ** n_sess)                                # 调整量（只考虑当前session内做题量的衰减）
                """
                adjustment = learning_rate * (decay_factor_1 ** (n_-_i)) * (decay_factor_2 ** n_sess)    # 调整量（session内遗忘+做题量衰减）
                adjustment = adjustment * weight[_ii] * len(w)                                           # 单词对于题目的相对重要性加权（denorm）
                if s == 1:                                                                               # 不同情况下的调整策略
                    adjustment = adjustment * diff_norm                                                  # 答对(容易题奖励小，难题奖励大)
                else:
                    adjustment = -1 * adjustment * ((1/self.n_diff) + 1-diff_norm)                       # 答错(容易题惩罚大，难题惩罚小)

                mastery_words[k_id] += adjustment                                                              # 执行直接调整
                mastery_words[k_id] = max(self.valid_range[0], min(mastery_words[k_id], self.valid_range[1]))  # clip

        return mastery_words

    def update_mastery_topics(self, data_in):
        """
        topic维度：传入历史+当前session做题序列，更新并返回知识点掌握度，同时判断是否满足停止条件
        :param data_in: list of dict（self.split_sessions的输出），每个元素包含：
             pid: list of int, 题目id序列
             q: list of list,  知识点序列
             w_q: list of list, 知识点权重序列
             w: list of list,  单词序列
             w_w: list of list, 单词权重序列
             s: list of int,   答题结果序列
             diff: list of int, 题目难度序列
             sess: list of int, 题目session序列
        :return: dict, topic掌握度，形如 {1:0.3,2:0.5,3:0.1,4:-1,5:0.2...}
        """
        # （0）参数设置
        learning_rate = self.lr                         # 学习率
        decay_factor_1 = self.decay_InterSess           # 衰减因子1:session间遗忘（较弱）
        decay_factor_2 = self.decay_PracInSess          # 衰减因子2:session内练习量（较强）

        # （1）初始化输出
        mastery_topic = self.init_from_padding_value()  # 初始化topic掌握度dict（不再使用self.knowledge_points等属性）
        avg_adj = 0.0                                   # 初始化平均调整量

        # （2，topic维度）遍历session
        for ind_sess, current_session in enumerate(data_in):  # 解析数据：当前（遍历）session
            pids = current_session['pid']       # 题目id序列
            qs = current_session['q']           # topic维度
            w_qs = current_session['w_q']       # 权重序列
            # ws = current_session['w']         # 单词维度
            # w_ws = current_session['w_w']     # 权重序列
            ss = current_session['s']           # 答题结果序列
            diffs = current_session['diff']     # 题目难度序列
            sessions = current_session['sess']  # 题目session序列

            list_adjustments = []               # 初始化记录当前session调整量list，用于判断是否停止推题
            n_ = len(pids)                      # 当前session题目序列长度
            kp_count = {}                       # 记录当前遍历session已做过的涉及某知识点的题数，如'1': 3

            # （3）遍历当前session题目序列
            for _i, (pid, q, s, diff, weight) in enumerate(zip(pids, qs, ss, diffs, w_qs)):
                _list_adj = []                         # 初始化记录当前题目的调整量的子list（append至list_adjustments）
                diff_norm = self.map_difficulty(diff)  # 难度取值scaling至[0,1]
                # 遍历当前题目涉及的topic/知识点
                for _ii, k_id in enumerate(q):
                    if mastery_topic[k_id] == self.padding_value:  # 对于无做题记录的topic，初始化掌握度为取值范围中位数（0.5）
                        mastery_topic[k_id] = 0.5 * (self.valid_range[1] + self.valid_range[0])
                    """
                    掌握度调整：考虑时间衰减和题目难度
                    调整量 = 学习率 * 
                            指数形式的时间衰减（遗忘，如果是题目在材料中的顺序的话比较合理）*
                            题目难度（越难调整越多，这个不尽然，难题作对了奖励多，但做错了的话惩罚应该相对温和，反而是容易的题做错了惩罚大，作对了奖励小）
                            从历史统计or当前session中获取当前kp的历史做题数，用于调整量的指数衰减（以使其稳定）
                    """
                    # 当前session内已做过的涉及该topic的题数
                    if k_id in kp_count.keys():
                        n_sess = kp_count[k_id]
                        kp_count[k_id] += 1
                    else:
                        n_sess = 0
                        kp_count[k_id] = 1
                    """
                    # 两部分指数衰减，一是距做完当前遍历题目&知识点已过去多少道题（session内遗忘），二是对当前遍历知识点的历史做题数
                    # adjustment = learning_rate * (decay_factor_1 ** (n_-_i)) * (decay_factor_2 ** n_hist)  # 调整量
                    # adjustment = learning_rate * (decay_factor_2 ** n_hist)                                # 调整量(历史总做题数衰减)
                    # adjustment = learning_rate * (decay_factor_2 ** n_sess)                                # 调整量（只考虑当前session内做题量的衰减）
                    # adjustment = learning_rate * (decay_factor_1 ** (len(data_in)-1-ind_sess))             # 调整量（session间遗忘）
                    """
                    adjustment = learning_rate * (decay_factor_1 ** (len(data_in)-1-ind_sess))  * (decay_factor_2 ** n_sess)  # 调整量（session间遗忘+session内做题量衰减）
                    adjustment = adjustment * weight[_ii] * len(q)                                           # topic对于题目的相对重要性加权（denorm）
                    if s == 1:                                                                               # 不同情况下的调整策略
                        adjustment = adjustment * diff_norm                                                  # 答对(容易题奖励小，难题奖励大)
                    else:                                                                                    # 240930: 新增答错的调整倍率
                        adjustment = -1 * adjustment * ((1 / self.n_diff) + 1 - diff_norm) * self.nxneg      # 答错(容易题惩罚大，难题惩罚小)

                    mastery_topic[k_id] += adjustment                                                              # 执行直接调整
                    mastery_topic[k_id] = max(self.valid_range[0], min(mastery_topic[k_id], self.valid_range[1]))  # clip

                    """基于相似度的邻域调整"""
                    if self.similarity is not None:
                        for _ind, _val in enumerate(self.similarity[:, int(k_id)]):    # 取相似度矩阵的对应列进行遍历
                            if _val > 0:                                               # 相似度矩阵对角线元素均为0——不涉及直接调整
                                mastery_topic[_ind] += adjustment * _val               # 以_val作为倍率，执行间接调整
                                mastery_topic[_ind] = max(self.valid_range[0],         # clip
                                                          min(mastery_topic[_ind], self.valid_range[1]))

                    _list_adj.append(adjustment)                                       # 记录当前遍历题目涉及的各个知识点（掌握度）的直接调整量至 _list_adj

                # learning_rate *= decay_factor                                        # 设置session内学习率衰减（序列不长or知识点覆盖度不高的情况下不需要）
                list_adjustments.append(_list_adj)                                     # 将当前题目的直接调整量（长度为题目涉及topic数的list）整合至list_adjustments

            # （4）判断是否满足停止条件（对于最后一个session）
            if (n_ > self.last_n) and (ind_sess == len(data_in) - 1):    # 当前session是最后一个session
                list_adjustments = list_adjustments[-self.last_n:]       # 取最后last_n步的调整量
                list_adjustments = sum(list_adjustments, [])             # flatten
                list_adjustments = [abs(_x) for _x in list_adjustments]  # 取绝对值
                avg_adj = np.mean(list_adjustments)                      # 计算最后last_n道题的平均调整量

            # 打印时：按照值的大小升序排列
            # kp_count_sorted = dict(sorted(kp_count.items(), key=lambda item: item[1]))
            # print(kp_count_sorted)  # check

        return mastery_topic, avg_adj                            # 返回掌握度和平均调整量

    def predict_knowledge_mastery(self, data_in, topic_valid=None, figure=False, reduce=True, calc_mae=True):
        """
        （topic & 单词维度）预测学生对单词的掌握情况，基本流程如下：
            调用self.preprocess_data_w(data_in) 聚合接口输入；
            调用self.split_sessions_w(pid, w, w_w, s, diff, sess) 拆分session；——————————————————需要考虑一下history为空的情况下是否有问题！！！
            调用self.update_mastery_topics 传入拆分后的session数据，更新topic掌握度 & 近期平均变化量；
            调用self.update_mastery_words 传入拆分后的session数据，更新words掌握度；
            组装为输出接口格式（"data"项），形如：
            {
                "code": 0,
                "data": {
                    "abilities": [0.3,0.5,0,1,0.8,...],//每个知识点对应的掌握情况 0-1
                    "is_stable": 0 or 1,
                    "w_abilities":[0.3,0.5,0,1,0.8,...], //每个单词对应的掌握情况 0-1
                    "ability_change": 3.45 //如果is_stable是false，不触发计算ability_change，默认为0，为True才计算，代表 最后一个session对应的能力与上一个session的能力变化
                "msg": "success",
                "trace_id": "e363a894900e869d489470c22bfe1096"
            }
        """
        # 预处理接口数据
        pid, q, w_q, w, w_w, s, diff, sess, new_data = self.preprocess_data(data_in)
        # 拆分session
        session_data = self.split_sessions(pid, q, w_q, w, w_w, s, diff, sess)  # list of dicts（由每个session数据字典组成的list）
        # 更新topic/知识点掌握度
        updated_knowledge_points, avg_adj = self.update_mastery_topics(session_data)
        # 更新words/单词掌握度
        updated_mastery_words = self.update_mastery_words(session_data)
        # 组装为输出接口格式
        _result = {
            "abilities": updated_knowledge_points,       # {1:0.3,2:0.5,3:0.1,4:-1,5:0.2...}, //每个知识点对应的掌握情况 0-1
            "is_stable": int(avg_adj < self.threshold),  # 0 or 1，结果是否稳定并停止推题（json不能出现bool）
            "w_abilities": updated_mastery_words,        # {1:0.3,2:0.5,3:0.1,4:-1,5:0.2...}, //每个单词对应的掌握情况 0-1
            "ability_change": avg_adj,                   # deprecated（topic维度）
        }

        # 240926:计算mae & 画图（241009新增参数，防止某些情况下无法计算mae）
        if calc_mae:
            mae = self.calc_pred_error(_result, data_in, topic_valid, figure=figure, reduce=reduce)
        else:
            mae = None

        return _result, mae

    def predict_correct_rate(self, interface_mastery, exercise_in):
        """
        根据新题目和掌握情况预测答对概率.
        :param exercise_in: tuple, 待预测新题目的信息;                     1
        :param interface_mastery: self.predict_knowledge_mastery 的输出;
        :return: float 答对概率.
        """
        q, w, diff, weights_q, weights_w = exercise_in    # 解析待预测题目
        topic_mastery = interface_mastery['abilities']    # {1:0.3,2:0.5,3:0.1,4:-1,5:0.2...}, //每个知识点对应的掌握情况 0-1
        words_mastery = interface_mastery['w_abilities']  # {1:0.3,2:0.5,3:0.1,4:-1,5:0.2...}, //每个单词对应的掌握情况 0-1

        # 计算平均掌握程度
        avg_mastery_topic = 0
        for _ind, t_id in enumerate(q):
            temp = topic_mastery.get(t_id, 0)             # 获取掌握度，默认值为0
            if temp == self.padding_value:                # 未接触过的单词(-1)默认未掌握
                temp = 0
            avg_mastery_topic += temp * weights_q[_ind]   # 加权求和
        avg_mastery_words = 0
        for _ind, w_id in enumerate(w):
            temp = words_mastery.get(w_id, 0)             # 获取掌握度，默认值为0
            if temp == self.padding_value:                # 未接触过的单词(-1)默认未掌握
                temp = 0
            avg_mastery_words += temp * weights_w[_ind]   # 加权求和
        # topic和单词权重对半分，若单词为空，则topic权重为1
        if len(w) > 0:
            avg_knowledge_level = 0.5 * (avg_mastery_topic + avg_mastery_words)
        else:
            avg_knowledge_level = avg_mastery_topic

        # 逆映射至作答正确率：对应于init_from_history中映射公式
        corr_rate = avg_knowledge_level / ((self.valid_range[1] - self.valid_range[0])/self.A)
        corr_rate = corr_rate + self.C - self.B * (self.map_difficulty(diff))**2

        return max(0, min(corr_rate, 1))  # clip and return

    def predict_answer_accuracy(self, data_in):
        """
        根据topic&单词掌握情况预测新题目的答对概率，于batch_process或其他外部脚本如server.py中调用
        :param data_in: dict, 包括当前session的历史做题序列和未做题目序列
        数据接口形如
        {
            "old_p":[  //一个学生的所有做题序列，按时间排序
                {
                    "pid": 50,              //题目 id
                    "q": [5,18,11],         //题目对应的知识点id,一道题目包含多个知识点
                    "q_w": [0.3,0.2,0.5],   //每个知识点对应的权重
                    "w": [1,4,8],           //题目对应的单词 id
                    "w_w":[2,3,1],          //每个单词在题目中的重要性
                    "s":1,                  //对应是否答对，1对正确，0为错误，
                    "diff":2,               //每道题对应的难度
                    "session": 1            //题目对应的session id
                },
                {
                    "pid": 34,
                    "q": [16,13,11],
                    "q_w": [0.3,0.2,0.5],
                    "w": [1,4,8],
                    "w_w":[2,3,1],
                    "s":1,
                    "diff":2,
                    "session":2
                },
                ...
            ],
            "new_p":[  //需要预测的新题
                {
                    "pid": 30,
                    "q": [5,18,11],
                    "q_w": [0.3,0.2,0.5],
                    "w": [1,4,8],
                    "w_w":[2,3,1],
                    "diff":2,
                },
                {
                    "pid": 28,
                    "q": [5,18,11],
                    "q_w": [0.3,0.2,0.5],
                    "w": [1,4,8],
                    "w_w":[2,3,1],
                    "diff":2,
                },
                ...
            ]
        }
        输出接口形如
        {
            "code": 0,
            "data": {
                30: 0.2985307652665116,
                28: 0.1256192803111844,
                46: 0.1467879400252059,
                3: 0.3068059584165894
               ] //学生对每个新题的做题概率
            },
            "msg": "success",
            "trace_id": "285d3c22db106c59098c8e6734834604"
        }
        """
        # 调用获取能力接口
        interface_mastery, _ = self.predict_knowledge_mastery(data_in)
        # 待预报题目序列
        _, _, _, _, _, _, _, _, new_data = self.preprocess_data(data_in)
        new_pid, new_q, new_weights_q, new_w, new_weights_w, new_diff = new_data
        # 调用self.predict_correct_rate，逐题预测答对概率
        answer_prob = {}                            # 初始化
        for i in range(len(new_pid)):               # 逐题遍历
            new_exercise = new_q[i], new_w[i], new_diff[i], new_weights_q[i], new_weights_w[i]
            result_ = self.predict_correct_rate(interface_mastery,
                                                new_exercise
                                                )   # 返回答对概率，float
            answer_prob[new_pid[i]] = result_       # 以new_pid（int）为key，答对概率（float）为value

        return answer_prob

    @staticmethod
    def calculate_loss(predictions, targets):
        """使用均方误差（MSE）作为损失函数"""
        return np.mean((np.array(predictions) - np.array(targets)) ** 2)

    '''def batch_process(self, data_batches):
        """(deprecated)对于list形式的batch输入，遍历并调用predict_answer_accuracy，统计平均损失"""
        total_loss = 0
        total_len = 0
        # 遍历batch中的每个session/序列
        for _data in data_batches:
            predictions_dict = self.predict_answer_accuracy(_data)           # 返回字典，key为new_pid，value为答对概率
            new_pid = _data.get('new_pid')                                   # [30, 28, 45, 3]
            predictions_list = [predictions_dict[_key] for _key in new_pid]  # 按new_pid的顺序，获取答对概率
            targets = _data.get('new_s')                                     # 真实的答对情况
            loss = self.calculate_loss(predictions_list, targets)            # 计算MSE损失
            total_loss += loss                                               # 总损失
            total_len += len(new_pid)                                        # 题目总数
        return total_loss / total_len                                        # 返回平均损失'''
    
    def calc_pred_error(self, predictions, targets, topic_valid=None, figure=False, reduce=True):
        """
        计算topic掌握度的预测误差
        """
        pred = predictions["abilities"]             # dict
        true = targets["student"]["topic_mastery"]  # dict

        # 若存在有效kp list，则过滤知识点
        if topic_valid is not None and isinstance(topic_valid, list):
            pred = {k: v for k, v in pred.items() if int(k) in topic_valid}
            true = {k: v for k, v in true.items() if int(k) in topic_valid}
        
        # 若存在能力字典，则：（240930）
        if topic_valid is not None and isinstance(topic_valid, dict):
            # 对齐格式 & 过滤统计估计中值为-1的item
            true = {k: v for k, v in topic_valid.items() if v != self.padding_value}  # 键为str
            pred = {k: v for k, v in pred.items() if str(k) in true.keys()}           # 注意与true对齐keys

        # 遍历预报字典中的键值对（键为int），剔除未涉及topic（-1），同时匹配真实值中的能力(注意由于是读取自json，键为str)
        _p = np.ones(self.list_kp[-1]+1) * self.padding_value
        _t = np.ones(self.list_kp[-1]+1) * self.padding_value
        for _key, _val in pred.items():
            if _val == self.padding_value:
                continue
            _p[int(_key)] = float(_val)             # 确保一下数据格式正确
            _t[int(_key)] = float(true[str(_key)])
        
        # 计算difference，并以pdf的形式统计出来
        ind_filter = np.where(_p==self.padding_value)[0]
        pred = np.delete(_p, ind_filter)
        true = np.delete(_t, ind_filter)
        difference = pred - true
        mae = np.mean(np.abs(difference))  # MAE

        # print(difference.shape)  # check
        # print(difference)

        if figure:
            # 调用函数绘制pdf图
            # plot_pdf(difference)

            text_in = "MAE={}, lr={}, decay={}".format(round(mae,2),self.lr,self.decay_PracInSess)
            # 保存图片路径为：pics/histogram_lr_{}_decay_{}_{timestamp}.png
            now = datetime.now()
            now = now.strftime("%Y-%m-%d_%H-%M-%S.%f")      # 默认格式：YYYY-MM-DD_HH-MM-SS.mmmmmm
            filename = 'pics/histogram_lr_{}_dacay_{}_{}.png'.format(self.lr, self.decay_PracInSess, now)
            plot_histogram(difference, text=text_in, file_out=filename)  # 直方图
        
        # 返回值
        if reduce:
            return mae
        else:
            return difference

    '''def calc_acc_error(self, predictions, targets):
        """
        (未完成)计算答题正确率的预测误差
        """
        pred = predictions["abilities"]             # dict
        true = targets["student"]["topic_mastery"]  # dict
        
        # 遍历预报字典中的键值对（键为int），剔除未涉及topic（-1），同时匹配真实值中的能力(注意由于是读取自json，键为str)
        _p = np.ones(self.list_kp[-1]+1) * self.padding_value
        _t = np.ones(self.list_kp[-1]+1) * self.padding_value
        for _key, _val in pred.items():
            if _val == self.padding_value:
                continue
            _p[int(_key)] = float(_val)             # 确保一下数据格式正确
            _t[int(_key)] = float(true[str(_key)])
        
        # 计算difference，并以pdf的形式统计出来
        ind_filter = np.where(_p==self.padding_value)[0]
        pred = np.delete(_p, ind_filter)
        true = np.delete(_t, ind_filter)
        difference = pred - true
        mae = np.mean(np.abs(difference))  # MAE

        # print(difference.shape)  # check
        # print(difference)

        # 调用函数绘制pdf图
        # plot_pdf(difference)

        text_in = "MAE={}".format(round(mae,2))
        plot_histogram(difference, text=text_in)  # 直方图'''

##### 调用策略，生成学生能力并保存为json接口格式

In [24]:
# 确定参数取值
nxneg = 1.4
decay_factor = 0.9
lr = 0.5

# 需要进行设置的参数
out_dir = os.path.join(root_dir, f'{datasets}/')  # 输出文件夹
ensure_directory_exists(out_dir)
kp_id_start = 0
n_kp = 93
list_kp = (kp_id_start, kp_id_start+n_kp-1)
padding_value = -1

# 读取开源数据
with open(os.path.join(root_dir, f'{datasets}/train.json'), "r", encoding='UTF-8') as f:
    data_dbe = json.load(f)  # list of dicts
# 读取统计估算的能力文件(统计baseline)
with open(os.path.join(root_dir, f'{datasets}/stu_cap_stat.json'), "r", encoding='UTF-8') as f:
    data_cap = json.load(f)  # dict of dicts（"stu_id":{"0":xxx,"1":xxx,...}）

# # check
# print(data_dbe[0].keys())                       # ['old_p', 'new_p', 'student']
# print(data_dbe[0]["student"].keys())            # ['topic_mastery', 'id']
# print(data_dbe[0]["student"]["topic_mastery"])  # {}，待填充
# first_id = data_dbe[0]["student"]["id"]
# print(type(first_id))                           # int
# print(data_cap[str(first_id)].keys())           # 0-92，齐全
# print(data_cap[str(first_id)].values())         # -1填充

# 根据gridsearch直接实例化
complete_init = ConceptPredictStrategy(n_diff=1,                # 无难度，统一为1
                                       list_kp=list_kp,         # n_question=93（从0开始，最大为92）
                                       n_words=(1,2),           # 无单词
                                       n_diff_w=None,           # deprecated
                                       valid_range=(0,1),
                                       similarity=None,         # 无间接调整
                                       padding_value=-1,
                                       last_n=10,
                                       threshold=0.05,
                                       lr=lr,                            # 直接指定为gridsearch结果
                                       decay_IntraSess=1.0,     # 不生效（无单词）
                                       decay_InterSess=1.0,     # 不生效（无session）
                                       decay_PracInSess=decay_factor,    # 直接指定
                                       hyperparams=(2/3, 0, 0.25),       # 240930新增：手动指定超参数
                                       nxneg=nxneg,                      # 直接指定为gridsearch结果
                                       )

# 遍历数据集中的每个学生，获取能力
for ind, stu in enumerate(data_dbe):
    stu_id = stu["student"]["id"]      # int
    # stu_cap = data_cap[str(stu_id)]  # dict，根据stu_id查询统计的能力统计估计，键为str
    stu_cap = {}                       # 正常初始化
    for key in range(kp_id_start, kp_id_start+n_kp):
        stu_cap[str(key)] = padding_value

    temp, _ = complete_init.predict_knowledge_mastery(stu,
                                                      calc_mae=False,         # 241009新增：不计算mae（返回None）
                                                      topic_valid=None,       # list（参与计算mae的所有topic）
                                                      figure=False,           # 不画图
                                                      reduce=False)           # 不计算平均
    # 记录至data_dbe
    data_dbe[ind]["student"]["topic_mastery"] = temp["abilities"]             # dict
    

# 保存结果
filename = os.path.join(root_dir, f'{datasets}/train_cap_strategy.json')
with open(filename, 'w', encoding='utf-8') as f:
    json.dump(data_dbe, f, ensure_ascii=False, indent=4)

##### 检查：绘制能力分布直方图

In [31]:
# 读取上一步骤的输出
out_dir = os.path.join(root_dir, f'{datasets}/')  # 输出文件夹
filename = os.path.join(out_dir, "train_cap_strategy.json")
with open(filename, "r", encoding='UTF-8') as f:
    data_dbe = json.load(f)

# check
print(data_dbe[0].keys())                       # ['old_p', 'new_p', 'student']
print(data_dbe[0]["student"].keys())            # ['topic_mastery', 'id']
print(data_dbe[0]["student"]["topic_mastery"].keys())
print(data_dbe[0]["student"]["topic_mastery"].values())
first_id = data_dbe[0]["student"]["id"]
print(type(first_id))                           # int
print(data_dbe[0]["old_p"][0])

capacities = []
for stu in data_dbe:
    temp = list(stu["student"]["topic_mastery"].values())
    capacities.extend(temp)

# 画图(batch process中单独进行)
text_in = "student capacities"
now = datetime.now()
now = now.strftime("%Y-%m-%d_%H-%M-%S.%f")    # 默认格式：YYYY-MM-DD_HH-MM-SS.mmmmmm
filename = 'histogram_stu_cap_strategy_DBE_KT22_{}.png'.format(now)
plot_histogram(capacities, text=text_in, file_out=os.path.join('.', filename))  # 直方图

# 顺便也检查一下整体正确率
answer = []
for stu in data_dbe:
    for exercise in stu["old_p"]:
        answer.append(exercise["s"])

print("correct rate in total: {:.2f}%".format(sum(answer)/len(answer)*100))

dict_keys(['old_p', 'new_p', 'student'])
dict_keys(['topic_mastery', 'id'])
dict_keys(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', '91', '92'])
dict_values([1, 0.9485272169763377, 1, 1, 1, 0.7021273195, 1, 1, 0.81742195, 0.775, 1.0, 0.030429999999999957, 0.36450125787699983, 1, 0.855, 1, 0.45, 0.775, 1, 0.45, 1.0, 0.64885369138507, 1, 1, 1.0, 0, 0.9995, 0.6414791423769549, 1, 1.0, 0, -1, -1, -1, -1, -1, -1, -1, 0.9995, 1, 1, 1, 1, 0.9995, 0.45, 0.45, 1.0, 1.0, 1, 1, 1.0, 1.0, 1, 0.9234833009003334, 0, 1

##### 能力分类映射函数，并使用其统计类别比例，为后续加权做准备

In [40]:
import copy
from collections import Counter

def mastery_level(score):
    if score == 1:
        return 3   # "精通"
    elif score >= 0.5:
        return 2   # "熟练"
    elif score > 0:
        return 1   # "模糊"
    elif score == 0:
        return 0   # "未掌握"
    else:
        # return -1   # 与padding值对齐
        return score  # -1
    
# 深拷贝上一步中的结果
data_in = copy.deepcopy(capacities)
for ind, elem in enumerate(capacities):
    data_in[ind] = mastery_level(elem)

# 统计
category_proportion = dict(Counter(data_in))
total_count = sum(category_proportion.values())  # 59241
for key, val in category_proportion.items():
    category_proportion[key] = val/total_count

# check
print(category_proportion)
print(category_proportion.keys())
print(category_proportion.values())
print(total_count)

{3: 0.45922587397241776, 2: 0.1357168177444675, 1: 0.1315305278438919, 0: 0.11321550952887359, -1: 0.16031127091034925}
dict_keys([3, 2, 1, 0, -1])
dict_values([0.45922587397241776, 0.1357168177444675, 0.1315305278438919, 0.11321550952887359, 0.16031127091034925])
59241


##### 能力映射分类，格式转换json->txt，适配模型输入（qid的修改在这里进行！！！）

In [29]:
# 输出路径
out_path = os.path.join(root_dir, f'{datasets}/train_cap_strategy.txt')

# 读取开源数据
with open(os.path.join(root_dir, f'{datasets}/train_cap_strategy.json'), "r", encoding='UTF-8') as f:
    data_dbe = json.load(f)  # list of dicts

# # check
# print(data_dbe[0].keys())                       # ['old_p', 'new_p', 'student']
# print(data_dbe[0]["old_p"][0])
# print(data_dbe[0]["student"].keys())            # ['topic_mastery', 'id']
# print(data_dbe[0]["student"]["topic_mastery"])  # {}，待填充
# first_id = data_dbe[0]["student"]["id"]
# print(type(first_id))                           # int

# 逐个学生进行遍历
for stu in data_dbe:
    n_pid= str(len(stu["old_p"]))  # 做题序列长度
    pids = []                      # 初始化
    qs = []
    ss = []
    
    # 逐道题遍历，提取信息
    for dict_exercise in stu["old_p"]:
        pids.append(dict_exercise["pid"])  # int

        # qs.append(dict_exercise["q"])    # list of lists
        """================================ 修改qid（对齐模型需要）=================================="""
        temp = copy.deepcopy(dict_exercise["q"])
        temp = [x+n_kp if x==kp_id_start else x for x in temp]  # 将 id 0 替换为 93
        qs.append(temp)                    # list of lists
        """======================================= done! ========================================"""
        ss.append(dict_exercise["s"])      # int

    # 读取掌握度相关的信息
    qid = stu["student"]["topic_mastery"].keys()
    qval = stu["student"]["topic_mastery"].values()
    # 映射qval到分类（对齐模型输入输出）
    qval = [mastery_level(x) for x in qval]
    
    # 转为字符串格式
    pids = ",".join(map(str, pids))
    qs = ",".join(["_".join(map(str, x)) for x in qs])
    ss = ",".join(map(str, ss))
    qid = "_".join(qid)
    qval = "_".join(map(str, qval))
   
    # # check
    # print(qs)
    # print(qs.count(","))
    # print(pids.count(","))
    # print(qid)
    # print(qval)
    # print(qid.count("_"))
    # print(qval.count("_"))

    # 整合当前学生的信息并保存
    stu = [n_pid, pids, qs, ss, qid, qval]
    with open(out_path, "a", encoding="utf-8") as file:  # 打开文件（追加模式），并逐行写入
        for line in stu:
            file.write(line + "\n")

print("done!")

done!


##### train_test_split

In [30]:
# %% 拆分训练/测试集
import os

def split_txt_file(input_file, n, output_file1, output_file2):
    """
    分割txt文件为两个部分：前n行保存为一个文件，n+1行到最后一行保存为另一个文件。
    :param input_file: 输入的txt文件路径
    :param n: 前n行的行数
    :param output_file1: 输出的第一个txt文件路径（保存前n行）
    :param output_file2: 输出的第二个txt文件路径（保存n+1至最后一行）
    """
    with open(input_file, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        print(len(lines))

    with open(output_file1, 'w', encoding='utf-8') as file:
        file.writelines(lines[:n])

    with open(output_file2, 'w', encoding='utf-8') as file:
        file.writelines(lines[n:])


# 创建输出路径
out_dir = os.path.join(root_dir, f'{datasets}/cap_strategy/')  # 输出文件夹
ensure_directory_exists(out_dir)

input_file = os.path.join(root_dir, f'{datasets}/train_cap_strategy.txt')  # 上一步骤输出
output_file1 = os.path.join(out_dir, 'train.txt')
output_file2 = os.path.join(out_dir, 'test.txt')
n = 6*500  # 500 of 637

split_txt_file(input_file, n, output_file1, output_file2)

3822


# 241028 temp

In [1]:
# import math
import sys
import os
import json
import numpy as np
import matplotlib
matplotlib.use('Agg')   # 确保你的图表在后台生成且不显示图形窗口
import matplotlib.pyplot as plt
from datetime import datetime
import functools
import copy

import seaborn as sns
from collections import defaultdict
# from scipy.sparse import lil_matrix

# cur_dir = os.path.dirname(os.path.abspath(__file__))
from config import BASE_DIR
cur_dir = BASE_DIR
# root_dir = os.path.dirname(os.path.dirname(os.path.dirname(cur_dir)))  # 向上追溯三级
root_dir = cur_dir
sys.path.append(root_dir)

print(cur_dir)   # /mnt/new_pfs/liming_team/auroraX/songchentao/ketcat/data
print(root_dir)  # 同上


def ensure_directory_exists(path_in):
    """确保路径存在，否则递归地建立文件夹"""
    if not os.path.exists(path_in):
        os.makedirs(path_in)


def plot_pdf(data_in):
    """
    绘制概率密度函数图
    :param data_in: 1d array
    """
    # if data_in is None:
    #     # 生成正态分布示例数据
    #     np.random.seed(42)
    #     data_in = np.random.normal(loc=0, scale=1, size=1000)

    # 使用 seaborn 绘制概率密度函数图
    plt.figure(figsize=(8, 6))
    sns.kdeplot(data_in, fill=True, color="b")

    # 添加标题和标签
    plt.title('Probability Density Function (PDF)')
    plt.xlabel('Value')
    plt.ylabel('Density')

    # 显示网格
    plt.grid(True)

    # 保存
    now = datetime.now()
    now = now.strftime("%Y-%m-%d_%H:%M:%S.%f")  # 默认格式：YYYY-MM-DD HH:MM:SS.mmmmmm
    plt.savefig('pics/pdf_{}.png'.format(now))


def plot_histogram(data_in=None, text=None, file_out=None):
    """
    绘制直方图
    :param data_in: 1d array
    :param text: str or None
    :param file_out: str or None
    """
    if data_in is None:
        # data_in = [0.0274, 0.0818, 0.1141, 0.2205, 0.4185, 0.0846, 0.02, 0.0207, 0.0123]
        data_in = [2.504, 2.789, 3.830, 63.902, 11.380, 9.520, 6.074]
    if text is None:
        text = ''

    # 使用 seaborn 绘制概率密度函数图
    plt.figure(figsize=(8, 6))

    # bins = np.linspace(-1,1,31).tolist()
    # bins = np.linspace(-4,4,9).tolist()
    bins = np.linspace(-3,3,7).tolist()

    plt.bar(bins, data_in, color='b', alpha=0.7, edgecolor='black', width=1.0)  # bar图
    # plt.hist(data_in, bins=bins, color='b', alpha=0.7, edgecolor='black')  # 柱状统计图

    plt.xlim([-4,4])
    plt.ylim([0,100])

    # 计算文本显示位置
    ax = plt.gca()
    x_limits = ax.get_xlim()
    y_limits = ax.get_ylim()
    x_text = x_limits[1] + (x_limits[1] - x_limits[0]) * 0.05  # 超出横坐标最大值的5%
    y_text = y_limits[1] + (y_limits[1] - y_limits[0]) * 0.1  # 超出纵坐标最大值的10%
    plt.text(x_text, y_text, text, fontsize=12, ha='right', va='top')  # 显示文本

    # 添加标题和标签
    plt.title('Histogram')
    plt.xlabel('Error')
    plt.ylabel('%')

    # 显示网格
    plt.grid(True)

    # 保存
    now = datetime.now()
    now = now.strftime("%Y-%m-%d_%H:%M:%S.%f")  # 默认格式：YYYY-MM-DD HH:MM:SS.mmmmmm
    if file_out is None:
        file_out = 'histogram_{}.png'.format(now)  # 默认保存文件名
    plt.savefig(file_out)
    plt.close()


plot_histogram()

/mnt/new_pfs/liming_team/auroraX/songchentao/ketcat/data
/mnt/new_pfs/liming_team/auroraX/songchentao/ketcat/data
