In [1]:
# cache_manager.py
import numpy as np
from typing import List, Dict, Any, Tuple
from embeeder import VectorEmbedder
from qwen_model import AnchorExtractor, SentenceExpander
from rag_client import RAGAttackClient

# from chat import askanythingLLM

from qwen_model import QwenModel

# 初始化Qwen模型
qwen_model = QwenModel()
import time
import json
import os

  from .autonotebook import tqdm as notebook_tqdm



0.2582094073295593
Loading model Qwen/Qwen2.5-1.5B-Instruct on cuda...
Model loaded successfully!


In [None]:
class CacheManager:
    """缓存管理器 - 管理短期和长期缓存"""

    def __init__(self, 
                 qwen_model: Any,
                 similarity_threshold: float = 0.8,
                 overlap_ratio: float = 0.2,isload=False):
        """
        初始化缓存管理器
        
        Args:
            qwen_model: Qwen模型实例
            similarity_threshold: 相似度阈值，高于此值则舍去
            overlap_ratio: 重叠率
        """
        # 初始化向量化模型
        self.embedder = VectorEmbedder()

        # 初始化锚点提取器和句子扩展器
        self.anchor_extractor = AnchorExtractor(qwen_model)
        self.sentence_expander = SentenceExpander(qwen_model, overlap_ratio)

        # 初始化RAG客户端
        self.rag_client = RAGAttackClient()
        # self.rag_client = askanythingLLM()

        # 缓存设置
        self.similarity_threshold = similarity_threshold
        self.cache_file = "cache_data.json"
        if not isload:
            # 初始化缓存
            self.short_term_anchor_cache = []  # 短期锚点缓存 [(text, vector), ...]
            self.long_term_anchor_cache = []   # 长期锚点缓存 [(text, vector), ...]

            self.short_term_sentence_cache = []  # 短期语句缓存 [(text, vector), ...]
            self.long_term_sentence_cache = []   # 长期语句缓存 [(text, vector), ...]

            # 存储RAG查询结果
            self.rag_results = []
        else:
            # 初始化缓存（会被load_cache()覆盖，若文件存在）
            self.short_term_anchor_cache: List[Tuple[str, List[float]]] = []  # (text, vector)
            self.long_term_anchor_cache: List[Tuple[str, List[float]]] = []
            self.short_term_sentence_cache: List[Tuple[str, List[float]]] = []
            self.long_term_sentence_cache: List[Tuple[str, List[float]]] = []
            self.rag_results: List = []

            # 初始化时加载缓存
            self.load_cache()

    def save_cache(self) -> None:
        # 整合需要保存的缓存数据（确保所有数据可JSON序列化）
        cache_data = {
            "short_term_anchor": self.short_term_anchor_cache,
            "long_term_anchor": self.long_term_anchor_cache,
            "short_term_sentence": self.short_term_sentence_cache,
            "long_term_sentence": self.long_term_sentence_cache,
            "rag_results": self.rag_results
        }
        
        # 写入文件（ensure_ascii=False确保中文正常显示）
        with open(self.cache_file, "w", encoding="utf-8") as f:
            json.dump(cache_data, f, ensure_ascii=False, indent=2)
        print(f"缓存已保存到 {self.cache_file}")

    def load_cache(self) -> None:
        """从文件加载缓存数据，若文件不存在则保持初始空列表"""
        if not os.path.exists(self.cache_file):
            print(f"缓存文件 {self.cache_file} 不存在，使用空缓存")
            return
        
        try:
            with open(self.cache_file, "r", encoding="utf-8") as f:
                cache_data = json.load(f)
            
            # 恢复缓存数据（注意与初始化时的变量对应）
            self.short_term_anchor_cache = cache_data.get("short_term_anchor", [])
            self.long_term_anchor_cache = cache_data.get("long_term_anchor", [])
            self.short_term_sentence_cache = cache_data.get("short_term_sentence", [])
            self.long_term_sentence_cache = cache_data.get("long_term_sentence", [])
            self.rag_results = cache_data.get("rag_results", [])
            
            print(f"已从 {self.cache_file} 加载缓存")
        except (json.JSONDecodeError, KeyError) as e:
            # 处理文件损坏或格式错误的情况
            print(f"缓存文件解析错误，使用空缓存：{e}")

    def add_to_anchor_cache(self, text: str) -> bool:
        """
        将锚点文本添加到缓存中
        
        Returns:
            bool: 是否成功添加（相似度检查通过）
        """

        # 向量化文本
        vector = self.embedder.encode(text)[0]

        # 检查与长期缓存的相似度
        if self.long_term_anchor_cache:
            max_similarity = self._get_max_similarity(vector, self.long_term_anchor_cache)
            if max_similarity > self.similarity_threshold:
                return False  # 相似度过高，舍去

        # 添加到短期缓存和长期缓存
        self.short_term_anchor_cache.append((text, vector))
        self.long_term_anchor_cache.append((text, vector))

        return True

    def add_to_sentence_cache(self, text: str) -> bool:
        """
        将语句文本添加到缓存中
        
        Returns:
            bool: 是否成功添加（相似度检查通过）
        """
        # 向量化文本
        vector = self.embedder.encode(text)[0]

        # 检查与长期缓存的相似度
        if self.long_term_sentence_cache:
            max_similarity = self._get_max_similarity(vector, self.long_term_sentence_cache)    ###
            if max_similarity > self.similarity_threshold:
                return False  # 相似度过高，舍去

        # 添加到短期缓存和长期缓存
        self.short_term_sentence_cache.append((text, vector))
        self.long_term_sentence_cache.append((text, vector))

        return True

    def _get_max_similarity(self, query_vector: np.ndarray, cache: List[Tuple[str, np.ndarray]]) -> float:
        """计算查询向量与缓存中所有向量的最大相似度"""
        max_similarity = 0.0
        for _, cached_vector in cache:
            similarity = self.embedder.cosine_similarity(query_vector, cached_vector)
            max_similarity = max(max_similarity, similarity)
        return max_similarity

    def generate_and_cache_anchors(self, text: str, max_anchors: int = 5) -> List[str]:
        """生成锚点并添加到缓存"""
        anchors = self.anchor_extractor.extract_anchors_intelligent(text, max_anchors)

        added_anchors = []
        for anchor in anchors:
            if self.add_to_anchor_cache(anchor):
                added_anchors.append(anchor)

        return added_anchors

    def generate_and_cache_queries(self, chunk: str) -> Dict[str, List[str]]:
        """生成多样化查询并添加到缓存"""
        queries_dict = self.sentence_expander.generate_diversified_queries(chunk)

        # 合并所有查询
        all_queries = []
        for query_type, queries in queries_dict.items():
            all_queries.extend(queries)

        # 添加到缓存
        added_queries = []
        for query in all_queries:
            if self.add_to_sentence_cache(query):
                added_queries.append(query)       ###会比较相似度然后被去掉

        return queries_dict

    def query_short_term_cache(self) -> List[Dict[str, Any]]:
        """查询短期缓存中的所有内容"""
        results = []

        # 查询短期锚点缓存
        # for text, _ in self.short_term_anchor_cache:
        #     rag_result = self.rag_client.query(text)
        #     results.append({
        #         'type': 'anchor',
        #         'query': text,
        #         'response': rag_result.get('output', ''),
        #         'full_result': rag_result
        #     })
        #     self.rag_results.append(results[-1])

        # 查询短期语句缓存
        # print("short::",self.short_term_sentence_cache)
        i = 0
        for text, _ in self.short_term_sentence_cache:
            i+=1
            rag_result = self.rag_client.query(text)
            results.append({
                'type': 'sentence',
                'query': text,
                'response': rag_result.get('output', ''),
                'full_result': rag_result
            })
            self.rag_results.append(results[-1])
            time.sleep(30)
            print(i)

        # 清空短期缓存（可选，根据需求决定）
        self.clear_short_term_cache()

        return results

    def clear_short_term_cache(self):
        """清空短期缓存"""
        self.short_term_anchor_cache.clear()
        self.short_term_sentence_cache.clear()

    def get_cache_stats(self) -> Dict[str, int]:
        """获取缓存统计信息"""
        return {
            'short_term_anchors': len(self.short_term_anchor_cache),
            'long_term_anchors': len(self.long_term_anchor_cache),
            'short_term_sentences': len(self.short_term_sentence_cache),
            'long_term_sentences': len(self.long_term_sentence_cache),
            'rag_results': len(self.rag_results)
        }

    def process_text_comprehensive(self, text: str, ) -> Dict[str, Any]:
        """
        综合处理文本：生成锚点、查询，并执行RAG查询
        
        Returns:
            包含所有结果的字典
        """
        # 生成并缓存锚点
        anchors = self.generate_and_cache_anchors(text)
        # print("anchors:",anchors)
        # 生成并缓存查询
        queries = self.generate_and_cache_queries(text)
        # print("queries",queries)
        # 执行RAG查询
        rag_results = self.query_short_term_cache()
        # print(rag_results)

        return {
            # 'text':text,  
            'anchors_generated': anchors,
            'queries_generated': queries,
            'rag_results': rag_results,
            'cache_stats': self.get_cache_stats()
        }
    
    def iterative_content_generation(self,initial_text, iterations=2):
        """
        迭代生成内容：将每次RAG生成的结果作为下一轮的输入
        """
        all_generations = []
        current_texts = [initial_text]

        for iteration in range(iterations):
            print(f"=== 第 {iteration + 1} 轮生成 ===")
            new_texts = []

            for i, text in enumerate(current_texts):
                print(f"处理文本 {i + 1}: {text[:50]}...")

                # 处理当前文本
                results = self.process_text_comprehensive(text)

                # 收集本轮结果
                generation_result = {
                    "iteration": iteration + 1,
                    "input_text": text,
                    "outputs": results["rag_results"],
                    "anchors": results["anchors_generated"],
                    "queries": results["queries_generated"],
                }
                all_generations.append(generation_result)

                # 提取RAG结果中的文本内容作为下一轮输入
                for rag_result in results["rag_results"]:
                    # 假设rag_result中有'response'或'content'字段包含生成的文本
                    if "response" in rag_result:
                        new_texts.append(rag_result["response"])
                    elif "content" in rag_result:
                        new_texts.append(rag_result["content"])
                    # 如果没有明确字段，尝试获取第一个字符串值
                    else:
                        for value in rag_result.values():
                            if isinstance(value, str) and len(value.strip()) > 10:
                                new_texts.append(value)
                                break

            # 更新下一轮要处理的文本
            current_texts = new_texts[:]  # 复制列表
            print(f"生成了 {len(new_texts)} 个新文本用于下一轮\n")

        return all_generations

In [7]:
cache_manager = CacheManager(qwen_model, similarity_threshold=0.75,isload=False)

In [10]:
# 测试文本
test_text = """
老屋的门轴在风里生锈了，我推门时听见它发出悠长的叹息。檐角垂下的蛛网在夕阳里泛着金边，像母亲织了一半的毛衣。
院中的槐树还在，只是枝干裂开的缝隙里，爬满了时间的青苔。父亲蹲在门槛上抽旱烟，烟锅里火星明灭，仿佛在数着归家的路。
"""

# 使用迭代生成
iterations = 2
all_results = cache_manager.iterative_content_generation(test_text, iterations)

# 输出最终结果
print("=== 最终生成的所有内容 ===")
for result in all_results:
    print(f"\n迭代轮次: {result['iteration']}")
    print(f"输入文本: {result['input_text'][:100]}...")
    print(f"生成锚点: {result['anchors']}")
    print(f"生成查询: {result['queries']}")
    print(f"输出数量: {len(result['outputs'])}")

with open("all_results_output.json", "w", encoding="utf-8") as f:
    # ensure_ascii=False：保证中文正常显示；indent=4：格式化输出，更易读
    json.dump(all_results, f, ensure_ascii=False, indent=4)

NameError: name 'cache_manager' is not defined

In [None]:
# 测试文本
test_text = """
老屋的门轴在风里生锈了，我推门时听见它发出悠长的叹息。檐角垂下的蛛网在夕阳里泛着金边，像母亲织了一半的毛衣。
院中的槐树还在，只是枝干裂开的缝隙里，爬满了时间的青苔。父亲蹲在门槛上抽旱烟，烟锅里火星明灭，仿佛在数着归家的路。
"""
# 综合处理文本
results = cache_manager.process_text_comprehensive(test_text)

print(results)

with open("results_output.json", "w", encoding="utf-8") as f:
    # ensure_ascii=False：保证中文正常显示；indent=4：格式化输出，更易读
    json.dump(results, f, ensure_ascii=False, indent=4)

print("结果已保存到 results_output.json 文件中")

1
2
3
4
{'anchors_generated': ['门轴'], 'queries_generated': {'forward': ['老屋的门轴生锈后，可能会出现哪些新的情况？', '在夕阳下，蛛网泛出的金边象征着什么？', '父亲蹲在门槛上的抽烟动作，暗示了什么样的情感？'], 'backward': ['文本提到的“檐角垂下的蛛网”是何时出现的？', '“父亲蹲在门槛上抽旱烟”的场景暗示了什么？', '老屋的门轴为什么会在风里生锈？'], 'overlap': ['老屋的门轴为什么会在风里生锈？', '关于文中描述的场景，有哪些相关的背景故事或后续情节？']}, 'rag_results': [{'type': 'sentence', 'query': '老屋的门轴生锈后，可能会出现哪些新的情况？', 'response': '老屋的门轴生锈后，可能会出现以下几种新的情况：\n\n1. **门轴转动不灵活**：生锈的门轴会变得沉重，开启和关闭门时需要更大的力量，甚至可能发出卡顿的声音，影响使用体验。\n\n2. **门轴结构受损**：长期的锈蚀可能导致门轴的金属部件逐渐变形或断裂，影响门的正常开合，甚至可能引发安全隐患。\n\n3. **门轴与门框的连接松动**：锈蚀可能腐蚀门轴与门框之间的连接部件，导致门轴松动，门无法稳固关闭，甚至可能从门框中脱落。\n\n4. **门轴周围环境变化**：锈蚀的门轴可能吸引更多的灰尘和杂物，尤其是如果门轴周围没有定期清理，锈迹可能会扩散到门框或其他部件上，影响整体的美观和结构完整性。\n\n5. **门轴的维护成本增加**：随着锈蚀的加剧，门轴可能需要更频繁的维修或更换，增加了维护的难度和成本。\n\n6. **门轴的使用寿命缩短**：锈蚀会加速门轴的腐蚀过程，导致其使用寿命大大缩短，可能在短时间内就需要更换。\n\n这些情况不仅影响老屋的使用功能，也可能对房屋的整体结构和美观造成影响。因此，定期检查和维护门轴是非常重要的。', 'full_result': {'application_id': '3622e88a-0a58-4cfb-8e59-cb197e577833', 'output': '老屋的门轴生锈后，可能会出现以下几种新的情况：\n\n1. **门轴转动不灵活**：生锈的门轴会变得沉重，开启和关闭门时

In [5]:
print(cache_manager.long_term_sentence_cache)

[('老屋的门轴生锈后，可能会出现哪些新的情况？', array([-1.12916867e-03,  7.99205005e-02,  2.85395719e-02, -6.01845197e-02,
        1.23183168e-02,  7.31120184e-02,  6.91114813e-02,  8.51256680e-03,
        5.60837463e-02, -1.57834571e-02,  1.26910135e-01, -3.74121629e-02,
        1.29062328e-02, -6.39220104e-02,  1.92317367e-02, -6.61668628e-02,
       -3.69736925e-02,  3.69466990e-02, -6.53921440e-02,  2.07045879e-02,
       -1.78896524e-02, -2.94353236e-02,  4.66909707e-02,  6.02696761e-02,
       -3.56200412e-02, -4.55314741e-02,  3.22705470e-02, -4.68899292e-04,
        5.58954179e-02,  1.84160322e-02,  1.71098281e-02,  8.25680494e-02,
        7.02061653e-02, -5.77750355e-02,  3.39620784e-02,  3.75092179e-02,
       -1.03249243e-02, -8.69369432e-02, -7.81342294e-03,  2.89541688e-02,
       -3.00389640e-02, -2.36395355e-02, -1.85325000e-04, -4.99476902e-02,
        4.77813818e-02,  1.58564537e-04, -6.98136240e-02,  2.51863897e-02,
       -5.17589226e-02, -5.00038303e-02, -7.08199069e-02, -1.33118825e-03

In [9]:
del cache_manager