# 初始化

In [None]:
import os
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
from LLM_API import GLMService, SenseService, KimiService
from Json_Processor import JSProcessor
from dotenv import load_dotenv
import time

# 加载环境变量
dotenv_path = os.path.join(os.getcwd(), '.env')

# 设置项目根目录和图片目录
project_root = os.path.dirname(dotenv_path)

service_type = 'zhipu'

def initialize_service(service_type):
    if service_type in ['zhipu', None]:
        version = 'glm-3-turbo'
        #'glm-4' 'glm-4v' 'glm-3-turbo'
        service = GLMService(version)
    elif service_type in ['kimi']:
        version = '8k'
        #'8k'1M/12￥ '32k'1M/24￥ '128k'1M/60￥
        service = KimiService(version)
    elif service_type in ['sensetime']:
        version = 'SenseChat'
        #SenseChat SenseChat-32K SenseChat-128K SenseChat-Turbo SenseChat-FunctionCall
        service = SenseService(version=version)
    else:
        raise ValueError('未知的服务类型')
    
    return service

service = initialize_service(service_type)

js=JSProcessor()

# 书本目录处理为知识点

In [None]:
catalog_file_path = 'Edu_Resources.json'

# 辅助函数：查找特定三级目录
def extract_and_parse_json(original_dict, target_keys, service):
    # 读取JSON文件

    # Extracting content based on target keys
    extracted_dict = original_dict
    for key in target_keys:
        extracted_dict = extracted_dict.get(key, {})
    original_list = extracted_dict

    result = {}
    task_count = 0  # 初始化任务计数器
    start_time = time.time()  # 记录开始时间

    for dict_item in original_list:
        try:
            book_name = str(dict_item['书名'])
            catalog = str(dict_item['目录'])

            prompt = f'''
            以下内容是{target_keys[-1]}领域的书籍目录，书名{book_name}，目录内容为：{catalog}，我要求你输出一个列表，其中的值是知识点，必须是如下结构：['知识点1','知识点2','知识点3',...]
            省略一切无关内容
            '''
            NotSuccess = True
            while NotSuccess:
                msg = service.ask_once(prompt)  # 假设这是从某个服务获取的回应
                if js.parse_list(msg):
                    NotSuccess = False
                    print(f"成功解析：{book_name}")
                    result[book_name] = js.parse_list(msg)
                    task_count += 1  # 成功处理一个任务，计数器加1
                    if task_count % 5 == 0:  # 每五个任务
                        end_time = time.time()  # 记录结束时间
                        print(f"当前处理速度：{task_count/(end_time - start_time)}个任务/秒，截至目前一共消耗了{service.total_tokens_used*0.000006}元")
                else:
                    print(msg)
        except Exception as e:
            print(f"发生异常: {e}")
            # 在这里添加处理异常的代码，例如记录日志或者继续循环
            continue  # 发生异常时跳过当前书籍，处理下一本书籍


    # 根据选中的一、二、三级键名生成结果文件名
    json_file_name = '_'.join(target_keys) + '_results.json'

    # 将键列表保存到JSON文件
    js.write_json(content=result,file_path=json_file_name)

# 辅助函数：收集字典键的路径
def collect_key_paths(current_dict, current_path=[]):
    """递归收集字典键的路径。"""
    if isinstance(current_dict, dict):  # 确保当前对象是字典
        for key, value in current_dict.items():
            new_path = current_path + [key]
            if isinstance(value, dict):  # 如果值也是字典，则继续递归
                yield from collect_key_paths(value, new_path)
            else:
                yield new_path
    else:
        yield current_path

# 主解析函数
def process_directory(original_dict, top_level_key, service):
    # 检查顶层键是否存在于原始字典中
    if top_level_key not in original_dict:
        print(f"指定的顶级目录'{top_level_key}'在字典中不存在。")
        return

    # 获取指定一级目录下的所有子目录路径
    key_paths = list(collect_key_paths({top_level_key: original_dict[top_level_key]}))

    # 对每个键路径调用extract_and_parse_json
    for path in key_paths:
        try:
            extract_and_parse_json(original_dict, path, service)
            print(f"处理完成路径: {' -> '.join(path)}")
        except Exception as e:
            print(f"处理路径{' -> '.join(path)}时出错: {e}")

original_dict = js.read_json(catalog_file_path)


# **主处理功能块1：**

In [None]:
#orginal_dict要把catalog_file_path设为原始资料所在的目录
#第二个参数是指定的第一大类
#这个环节的service最好使用glm-3-turbo，高速廉价
process_directory(original_dict, '计算机类', service)

# 知识点第二层聚合

In [None]:
import json
import random
import os
#主函数
def transform_function(input_list):
    input_dict=str({'对外汉语教学':input_list})
    pre_prompt=f'这是对外汉语教学学科一些教材的知识点：{input_dict}我希望你帮我融合以上知识点列表，对于属于对外汉语教学的知识点，相同的部分合并精炼，不同的部分互相补充，并自行聚类总结知识点，对于无关内容直接摒弃，输出的结果最好是如下结构：'
    pro_prompt=f'恰当地设置聚类标准，使得一个类别下的内容相似度尽可能高，同时一个类别下的内容数量尽可能少。并且，这些类别确实能够组织起这个学科的知识体系'
    prompt = js.generate_prompt(level=3,pre_prompt=pre_prompt,pro_prompt=pro_prompt, words='知识点')
    NotSuccess = True
    while NotSuccess:
        response = service.ask_once(prompt)
        result = js.parse_dict(response)
        if result:
            NotSuccess = False
            print(result)
            return result
        else:
            print('无结果')
            continue
#辅助处理函数
def process_dict(input_dict, window_length=1800):
    current_length = 0
    current_sub_list = []
    transformed_dicts = []

    for key, value in input_dict.items():
        try:
            # 将键值对转换为字符串
            str_value = str({key:value})
            # 统计字符长度
            str_length = len(str_value)

            # 如果累计字符长度不超过窗口，则继续累加并加入当前子字典
            if current_length + str_length <= window_length:
                random.shuffle(value)
                current_sub_list.append(value)
                current_length += str_length
            # 否则，将当前子字典进行转换并重置
            else:
                transformed_dicts.append(transform_function(current_sub_list))
                current_sub_list = []  # 重置为空字典
                current_length = 0     # 重置长度为0
        except Exception as e:
            print("解析失败:", e)

    # 处理最后一部分
    if current_sub_list:
        try:
            transformed_dicts.append(transform_function(current_sub_list))
        except Exception as e:
            print("解析失败:", e)

    return transformed_dicts
#衔接函数
def process_all_json_files(input_directory, output_directory):
    """
    处理指定目录下所有由process_directory函数生成的JSON文件。

    :param input_directory: 包含由process_directory函数生成的JSON文件的目录路径。
    :param output_directory: 保存transform_function处理结果的目录路径。
    """
    for file_name in os.listdir(input_directory):
        if file_name.endswith('_results.json'):
            full_file_path = os.path.join(input_directory, file_name)
            print(f"正在处理文件: {full_file_path}")
            
            # 读取JSON文件
            with open(full_file_path, 'r', encoding='utf-8') as file:
                input_dict = json.load(file)
            
            # 对文件中的每个键值对执行transform_function
            transformed_results = process_dict(input_dict)
            
            # 构建输出文件路径并保存处理结果
            output_file_path = os.path.join(output_directory, file_name.replace('_results.json', '_transformed_results.json'))
            with open(output_file_path, 'w', encoding='utf-8') as file:
                json.dump(transformed_results, file, ensure_ascii=False, indent=4)
            print(f"处理结果已保存到: {output_file_path}")

# **主处理功能块2：**

In [None]:
input_directory = r'.'
output_directory = r'.\\json_output'
process_all_json_files(input_directory, output_directory)

# 近义词聚合

# **关键设置_路径：**

In [None]:
# 填你自己的Embeddings模型路径
model_path = "D:\Joining\Models\Text2Vec_base_zh"

In [None]:
from transformers import AutoModel, AutoTokenizer
import torch
import time

# 加载本地模型和分词器
model = AutoModel.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)

# 确保模型处于评估模式
model.eval()
print("模型加载成功！")

# **关键设置_功能点：**

In [5]:

def process_json_aggregate(input_directory, output_directory):
    """
    处理指定目录下所有由process_directory函数生成的JSON文件。

    :param input_directory: 包含由process_directory函数生成的JSON文件的目录路径。
    :param output_directory: 保存transform_function处理结果的目录路径。
    """
    for file_name in os.listdir(input_directory):
        if file_name.endswith('_results.json'):
            full_file_path = os.path.join(input_directory, file_name)
            print(f"正在处理文件: {full_file_path}")
            
            # 读取JSON文件
            with open(full_file_path, 'r', encoding='utf-8') as file:
                input_dict = json.load(file)
            
            # 对文件中的每个键值对执行transform_function
            transformed_results = process_dict(input_dict)
            
            # 构建输出文件路径并保存处理结果
            output_file_path = os.path.join(output_directory, file_name.replace('_results.json', '_transformed_results.json'))
            with open(output_file_path, 'w', encoding='utf-8') as file:
                json.dump(transformed_results, file, ensure_ascii=False, indent=4)
            print(f"处理结果已保存到: {output_file_path}")

In [None]:
import json
import torch
from transformers import AutoModel, AutoTokenizer

# 假设tokenizer和model已经加载
# vocab_list是你的词汇列表
# 新JSON文件路径
new_json_file_path = 'Test_full_Embeddings.json'

# 初始化一个字典来存储每个词和其嵌入向量
vocab_embeddings = {}

# 初始化变量以计算每秒速率
batch_size = 100  # 定义每批次处理的单词数量
start_position=0
# 分批处理词汇列表
for i in range(0, len(vocab_list), batch_size):
    if i < start_position:
        continue
    else:
        batch_words = vocab_list[i:i+batch_size]

        # 如果剩余的不足一个batch的内容，作为一个batch处理
        if len(batch_words) < batch_size:
            inputs = tokenizer(batch_words, return_tensors="pt", padding=True, truncation=True, max_length=512)
        else:
            inputs = tokenizer(batch_words[:batch_size], return_tensors="pt", padding=True, truncation=True, max_length=512)

        with torch.no_grad():
            outputs = model(**inputs)

        embeddings = outputs.last_hidden_state.mean(dim=1).detach().cpu().numpy()

        # 更新vocab_embeddings字典
        for j, word in enumerate(batch_words):
            vocab_embeddings[word] = embeddings[j].tolist()

        # 每处理完一个batch，存储到 JSON 文件中
        with open(new_json_file_path, 'w', encoding='utf-8') as file:
            json.dump(vocab_embeddings, file, ensure_ascii=False, indent=4)
        print(f"已处理 {i + len(batch_words)} 个单词。")

print(f"全部单词处理完毕，结果已存储到 JSON 文件。")


In [None]:
mapping_dict = vocab_embeddings

# 计算词向量之间的余弦相似度
def cosine_similarity_score(word1, word2):
    vector1 = mapping_dict[word1]
    vector2 = mapping_dict[word2]
    return cosine_similarity([vector1], [vector2])[0][0]

# 判断两个词是否近义词
def are_synonyms(word1, word2, threshold=0.8):
    similarity_score = cosine_similarity_score(word1, word2)
    return similarity_score >= threshold

# 输出结果
for current_set in all_sets:
    synonyms = {}
    for word in current_set:
        synonyms[word] = [other_word for other_word in sum(all_sets, []) if are_synonyms(word, other_word)]
    print(synonyms)

# 高层抽象并输出

In [None]:
with open(r'aggregated_result.json', 'r', encoding='utf-8') as file:
    original_dict = json.load(file)

# 上级关系的字典
upper_relations = {
  "语言学": {
    "对外汉语教学": [
      "对外汉语教学理论",
      "对外汉语翻译",
      "对外汉语应用",
      "对外汉语背景",
      "对外汉语思想史",
      "对外汉语社会文化",
      "对外汉语消费文化",
      "对外汉语教学总览"
    ],
    "现代汉语": [
      "语言点",
      "教学用语",
      "教师语言",
      "日常用语",
      "商务用语"
    ],
    "汉字教学": [
      "汉字课知识点"
    ],
    "语音学": [
      "语音课知识点"
    ],
    "口语教学": [
      "口语课知识点"
    ],
    "听力教学": [
      "听力课知识点"
    ],
    "阅读教学": [
      "阅读课知识点"
    ],
    "写作教学": [
      "写作课知识点"
    ],
    "文化教学": [
      "文化课知识点"
    ],
    "其他教学": [
      "综合课知识点",
      "手工艺课知识点",
      "语法练习"
    ]
  },
  "文学": {
    "文学作品": [
      "文学作品",
      "作家作品"
    ],
    "文学理论": [
      "文艺理论",
      "比较文学理论与方法",
      "文学评论",
      "文学与其他学科的关系",
      "文学创作方法",
      "文学体裁",
      "经典文学导读",
      "现代文学作家",
      "比较文学分支学科",
      "比较文学的历史与发展",
      "翻译文学研究",
      "跨文明比较文学",
      "其他专题研究"
    ]
  },
  "文化": {
    "文化概念": [
      "文化",
      "中国传统文化",
      "中国文化"
    ],
    "日常生活与交流": [
      "日常生活话题",
      "生活交际",
      "日常交流",
      "日常生活"
    ],
    "文化与文学交叉": [
      "汉字",
      "中国现当代文学"
    ]
  }
}

def aggregate_dict(upper_relations, original_dict, aggregated_dict=None, current_path=None):
    if aggregated_dict is None:
        aggregated_dict = {}
    if current_path is None:
        current_path = []

    for key, value in upper_relations.items():
        # 如果值是列表，那么我们找到了一个终端节点
        if isinstance(value, list):
            for item in value:
                if item in original_dict:
                    # 构建聚合字典的结构
                    d = aggregated_dict
                    for path_key in current_path + [key]:
                        d = d.setdefault(path_key, {})
                    d[item] = original_dict[item]
        else:
            # 如果值是字典，递归搜索
            aggregate_dict(value, original_dict, aggregated_dict, current_path + [key])

    return aggregated_dict

# 使用改进的函数进行聚合
aggregated_dict = aggregate_dict(upper_relations, original_dict)
print(aggregated_dict)
