In [None]:
# 将构建流程封装成函数，便于脚本执行
from pathlib import Path

def run_part2_build(forward_file_path="normalized_tokens.json", out_path="inverted_index.json"):
    if not Path(forward_file_path).exists():
        raise FileNotFoundError(f"未找到正排表文件: {forward_file_path}")
    index = ImprovedInvertedIndex()
    ok = index.build_index_from_file(forward_file_path)
    if not ok:
        raise RuntimeError("构建失败：无法从JSON加载文档")
    index.print_index_statistics()
    index.save_inverted_index(out_path)
    return out_path



In [None]:
# 1) 位置索引：在倒排表中加入词项的位置信息
from collections import defaultdict

def build_positional_index_from_json(forward_file_path: str):
    """从正排表(文件->tokens列表)构建带位置信息的倒排索引。
    返回: vocabulary(词->id), postings(termId -> [{doc, tf, pos:[...]}])
    """
    import json
    with open(forward_file_path, 'r', encoding='utf-8') as f:
        token_dict = json.load(f)

    # 词汇表
    all_terms = sorted({t for tokens in token_dict.values() for t in tokens})
    vocabulary = {t: i for i, t in enumerate(all_terms)}

    # termId -> docId -> positions
    positions_map = defaultdict(lambda: defaultdict(list))
    for doc_id, tokens in token_dict.items():
        for idx, tok in enumerate(tokens):
            tid = vocabulary[tok]
            positions_map[tid][doc_id].append(idx)

    # 生成倒排结构（不加跳表，只演示位置信息）
    postings = {}
    for tid, doc2pos in positions_map.items():
        items = []
        for doc_id, pos_list in doc2pos.items():
            items.append({"doc": doc_id, "tf": len(pos_list), "skip": -1, "pos": pos_list})
        # 依据构建时的文档序保证顺序稳定
        items.sort(key=lambda x: x["doc"]) 
        postings[tid] = items

    return vocabulary, postings


def save_positional_index(vocabulary, postings, out_path: str):
    import json
    data = {"vocabulary": vocabulary, "postings": postings}
    with open(out_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    print(f"带位置信息的倒排索引已保存: {out_path}")


In [None]:
# 2) 词典压缩：按块存储（Blocking, k=4）与前端编码（Front Coding）
from typing import List, Tuple

def encode_blocking(terms: List[str], k: int = 4) -> Tuple[bytes, List[int]]:
    """简化版按块存储编码。
    输出: 编码后的字节序列、块起始偏移列表。
    规则：
    - 每个块的第一个词：写入 1字节长度 + 原词字节
    - 块内其余词：写入 1字节长度 + 原词字节（演示用途，现实可结合前缀差分）
    - 记录每块起始在字节流中的偏移
    注：这里使用utf-8编码，长度以单字节保存，适用于英文小写示例。
    """
    buf = bytearray()
    offsets = []
    for i, term in enumerate(terms):
        if i % k == 0:
            offsets.append(len(buf))
        b = term.encode('utf-8')
        l = len(b)
        buf.append(l if l < 256 else 255)  # 简化：>255截断
        buf.extend(b)
    return bytes(buf), offsets


def encode_front_coding(terms: List[str], k: int = 4) -> Tuple[bytes, List[int]]:
    """前端编码：每块记录第一个词原样，后续词存公共前缀长度+后缀。
    约定：
    - 块首：1字节长度 + 原词
    - 其余：1字节公共前缀长度 + 1字节后缀长度 + 后缀字节
    返回：编码字节、块偏移。
    """
    buf = bytearray()
    offsets = []
    for i in range(0, len(terms)):
        if i % k == 0:
            offsets.append(len(buf))
            head = terms[i].encode('utf-8')
            buf.append(len(head) if len(head) < 256 else 255)
            buf.extend(head)
            base = terms[i]
        else:
            t = terms[i]
            # 公共前缀
            p = 0
            while p < min(len(base), len(t)) and base[p] == t[p]:
                p += 1
            suffix = t[p:].encode('utf-8')
            buf.append(p if p < 256 else 255)
            buf.append(len(suffix) if len(suffix) < 256 else 255)
            buf.extend(suffix)
    return bytes(buf), offsets


def measure_vocab_and_compression(vocabulary: dict, k: int = 4) -> dict:
    terms_sorted = sorted(vocabulary.keys())
    # 原始：直接串联+长度
    raw_bytes = bytearray()
    for t in terms_sorted:
        b = t.encode('utf-8')
        raw_bytes.append(len(b) if len(b) < 256 else 255)
        raw_bytes.extend(b)
    raw_size = len(raw_bytes)

    blk_bytes, blk_offsets = encode_blocking(terms_sorted, k)
    fc_bytes, fc_offsets = encode_front_coding(terms_sorted, k)

    return {
        "vocab_size": len(terms_sorted),
        "raw_size": raw_size,
        "blocking": {"k": k, "bytes": len(blk_bytes), "blocks": len(blk_offsets)},
        "front_coding": {"k": k, "bytes": len(fc_bytes), "blocks": len(fc_offsets)}
    }


def save_compression_stats(stats: dict, out_path: str):
    import json
    with open(out_path, 'w', encoding='utf-8') as f:
        json.dump(stats, f, indent=2, ensure_ascii=False)
    print(f"压缩统计已保存: {out_path}")


In [None]:
# 3) 一键运行：生成 positional 索引与压缩统计
forward = "normalized_tokens.json"
pos_out = "inverted_index_pos.json"
comp_out = "compression_stats.json"

vocab, postings = build_positional_index_from_json(forward)
save_positional_index(vocab, postings, pos_out)

stats = measure_vocab_and_compression(vocab, k=4)
save_compression_stats(stats, comp_out)

print("完成：\n - 带位置信息索引:", pos_out, "\n - 压缩统计:", comp_out)


In [None]:
import math
import json
from typing import List, Tuple, Dict, Any, Set

class ImprovedInvertedIndex:
    """
    改进的倒排索引类，直接从JSON文件读取已处理的正排表数据
    """
    
    def __init__(self):
        # 词汇表：词项 -> 词项ID
        self.vocabulary = {}
        # 倒排列表：词项ID -> [(文档ID, 词频, 跳表指针), ...]
        self.inverted_lists = {}
        # 文档长度：文档ID -> 文档中的词项数量
        self.doc_lengths = {}
        # 文档ID到排序键的映射：文档ID -> 排序键（用于正确比较文档顺序）
        self.doc_sort_keys = {}
        # 排序键到文档ID的映射：排序键 -> 文档ID（用于反向查找）
        self.sort_key_to_doc = {}
        # 下一个可用的排序键
        self.next_sort_key = 0
    
    def load_documents_from_json(self, file_path: str) -> Dict[str, List[str]]:
        """
        从JSON文件加载正排表文档数据
        
        参数:
            file_path: JSON正排表文件路径
            
        返回:
            文档字典: {文档ID: [词项列表]}
        """
        print(f"📖 从JSON文件加载正排表: {file_path}")
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                documents = json.load(f)
            
            # 验证数据格式
            if not isinstance(documents, dict):
                raise ValueError("JSON文件顶层结构必须是字典")
            
            # 验证每个文档的词项列表
            for doc_id, tokens in documents.items():
                if not isinstance(tokens, list) or not all(isinstance(token, str) for token in tokens):
                    raise ValueError(f"文档 '{doc_id}' 的值必须是字符串列表")
            
            print(f"✅ 成功加载JSON格式，共 {len(documents)} 个文档")
            print(f"   文档ID示例: {list(documents.keys())[:5]}")
            print(f"   词项示例: {list(documents.values())[0][:5] if documents else '空'}")
            
            return documents
                
        except Exception as e:
            print(f"❌ JSON文件加载失败: {e}")
            return {}
    
    def build_index_from_file(self, file_path: str):
        """
        从JSON正排表文件构建倒排索引
        
        步骤:
        1. 从文件加载正排表数据
        2. 构建倒排索引
        3. 添加跳表指针优化
        
        参数:
            file_path: JSON正排表文件路径
        """
        # 步骤1: 加载正排表数据
        documents = self.load_documents_from_json(file_path)
        if not documents:
            print("❌ 无法加载文档数据，索引构建终止")
            return False
        
        # 步骤2: 构建倒排索引
        self._build_index_with_skips(documents)
        return True
    
    def _get_doc_sort_key(self, doc_id: str) -> int:
        """为文档ID生成排序键"""
        if doc_id not in self.doc_sort_keys:
            self.doc_sort_keys[doc_id] = self.next_sort_key
            self.sort_key_to_doc[self.next_sort_key] = doc_id
            self.next_sort_key += 1
        return self.doc_sort_keys[doc_id]
    
    def _get_doc_id_from_sort_key(self, sort_key: int) -> str:
        """从排序键获取文档ID"""
        return self.sort_key_to_doc.get(sort_key, "")
    
    def _compare_docs(self, doc1: str, doc2: str) -> int:
        """比较两个文档的顺序"""
        key1 = self._get_doc_sort_key(doc1)
        key2 = self._get_doc_sort_key(doc2)
        if key1 < key2:
            return -1
        elif key1 > key2:
            return 1
        else:
            return 0
    
    def _build_index_with_skips(self, token_dict: Dict[str, List[str]]):
        """构建带跳表指针的倒排索引"""
        print("🔨 构建带跳表指针的倒排索引...")
        
        # 步骤1: 构建词汇表
        print("  步骤1: 构建词汇表...")
        all_tokens = set()
        for doc_tokens in token_dict.values():
            all_tokens.update(doc_tokens)
        
        sorted_tokens = sorted(all_tokens)
        self.vocabulary = {token: idx for idx, token in enumerate(sorted_tokens)}
        print(f"    词汇表大小: {len(self.vocabulary)}")
        print(f"    词汇表示例: {list(self.vocabulary.keys())[:10]}")
        
        # 步骤2: 初始化文档排序系统
        print("  步骤2: 初始化文档排序系统...")
        for doc_id in token_dict.keys():
            self._get_doc_sort_key(doc_id)
            self.doc_lengths[doc_id] = len(token_dict[doc_id])
        print(f"    文档数量: {len(token_dict)}")
        
        # 步骤3: 构建基础倒排列表
        print("  步骤3: 构建基础倒排列表...")
        for doc_id, tokens in token_dict.items():
            term_freq = {}
            for token in tokens:
                term_id = self.vocabulary[token]
                term_freq[term_id] = term_freq.get(term_id, 0) + 1
            
            for term_id, freq in term_freq.items():
                if term_id not in self.inverted_lists:
                    self.inverted_lists[term_id] = []
                self.inverted_lists[term_id].append((doc_id, freq, -1))
        
        # 步骤4: 对每个倒排列表排序
        print("  步骤4: 对倒排列表排序...")
        for term_id, postings in self.inverted_lists.items():
            postings.sort(key=lambda x: self._get_doc_sort_key(x[0]))
        
        # 步骤5: 为长倒排列表添加跳表指针
        print("  步骤5: 添加跳表指针...")
        skip_count = 0
        for term_id, postings in self.inverted_lists.items():
            if len(postings) >= 4:
                skip_count += self._add_skip_pointers(term_id, postings)
        
        print(f"✅ 构建完成：{len(self.inverted_lists)} 个词项，{skip_count} 个跳表指针")
    
    def _add_skip_pointers(self, term_id: int, postings: List[Tuple]) -> int:
        """为倒排列表添加跳表指针"""
        n = len(postings)
        skip_interval = int(math.sqrt(n))
        skip_count = 0
        
        for i in range(0, n, skip_interval):
            if i + skip_interval < n:
                doc_id, freq, _ = postings[i]
                postings[i] = (doc_id, freq, i + skip_interval)
                skip_count += 1
        
        return skip_count
    
    def search_with_skips_improved(self, query_terms: List[str]) -> Dict[str, Any]:
        """使用跳表指针的改进搜索算法"""
        print(f"\n🔍 搜索查询: {query_terms}")
        
        if not query_terms:
            return {"documents": [], "stats": {"message": "空查询"}}
        
        term_ids = []
        for term in query_terms:
            if term in self.vocabulary:
                term_ids.append(self.vocabulary[term])
        
        if not term_ids:
            return {"documents": [], "stats": {"message": "未找到匹配的词项"}}
        
        print(f"  找到 {len(term_ids)} 个匹配词项")
        
        term_ids.sort(key=lambda tid: len(self.inverted_lists.get(tid, [])))
        
        first_term_id = term_ids[0]
        result_docs = set(self._get_doc_ids(first_term_id))
        print(f"  初始列表长度: {len(result_docs)}")
        
        for i, term_id in enumerate(term_ids[1:], 1):
            current_docs = self._get_doc_ids(term_id)
            print(f"  与第{i+1}个列表求交集，长度: {len(current_docs)}")
            
            result_docs = self._intersect_with_skips_improved(
                list(result_docs), current_docs, term_id
            )
            print(f"    交集后长度: {len(result_docs)}")
            
            if not result_docs:
                break
        
        sorted_docs = sorted(list(result_docs), key=lambda x: self._get_doc_sort_key(x))
        
        return {
            "documents": sorted_docs,
            "stats": {
                "query_terms": query_terms,
                "matched_terms": len(term_ids),
                "result_count": len(result_docs),
                "searched_lists": len(term_ids)
            }
        }
    
    def _get_doc_ids(self, term_id: int) -> List[str]:
        """获取词项对应的文档ID列表"""
        if term_id not in self.inverted_lists:
            return []
        return [doc_id for doc_id, _, _ in self.inverted_lists[term_id]]
    
    def _intersect_with_skips_improved(self, list1: List[str], list2: List[str], term2_id: int) -> Set[str]:
        """使用跳表指针求两个文档列表的交集"""
        result = set()
        i = j = 0
        postings2 = self.inverted_lists[term2_id]
        
        while i < len(list1) and j < len(list2):
            doc1 = list1[i]
            doc2 = list2[j]
            
            cmp_result = self._compare_docs(doc1, doc2)
            
            if cmp_result == 0:
                result.add(doc1)
                i += 1
                j += 1
            elif cmp_result < 0:
                i += 1
            else:
                new_j = self._skip_forward(postings2, j, doc1)
                if new_j > j:
                    j = new_j
                else:
                    j += 1
        
        return result
    
    def _skip_forward(self, postings: List[Tuple], current_pos: int, target_doc: str) -> int:
        """使用跳表指针向前跳转"""
        if current_pos >= len(postings):
            return current_pos
        
        _, _, skip_ptr = postings[current_pos]
        if skip_ptr != -1 and skip_ptr < len(postings):
            skip_doc_id, _, _ = postings[skip_ptr]
            
            if self._compare_docs(skip_doc_id, target_doc) <= 0:
                further_skip = self._skip_forward(postings, skip_ptr, target_doc)
                if further_skip > skip_ptr:
                    return further_skip
                return skip_ptr
        
        return current_pos
    
    def save_inverted_index(self, output_path: str):
        """
        将倒排索引保存到JSON文件
        
        参数:
            output_path: 输出JSON文件路径
        """
        print(f"\n💾 保存倒排索引到: {output_path}")
        
        try:
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write('{\n')
                f.write('  "vocabulary": ')
                json.dump(self.vocabulary, f, indent=2, ensure_ascii=False, separators=(',', ': '))
                f.write(',\n')
                
                f.write('  "inverted_lists": {\n')
                term_ids = sorted(self.inverted_lists.keys(), key=int)
                for i, term_id in enumerate(term_ids):
                    f.write(f'    "{term_id}": [')
                    postings = self.inverted_lists[term_id]
                    for j, (doc_id, freq, skip_ptr) in enumerate(postings):
                        if j > 0:
                            f.write(', ')
                        f.write(f'["{doc_id}", {freq}, {skip_ptr}]')
                    f.write(']')
                    if i < len(term_ids) - 1:
                        f.write(',')
                    f.write('\n')
                f.write('  },\n')
                
                f.write('  "doc_lengths": ')
                json.dump(self.doc_lengths, f, indent=2, ensure_ascii=False, separators=(',', ': '))
                f.write(',\n')
                
                f.write('  "doc_sort_keys": ')
                json.dump(self.doc_sort_keys, f, indent=2, ensure_ascii=False, separators=(',', ': '))
                f.write(',\n')
                
                f.write('  "sort_key_to_doc": ')
                json.dump(self.sort_key_to_doc, f, indent=2, ensure_ascii=False, separators=(',', ': '))
                f.write(',\n')
                
                f.write(f'  "next_sort_key": {self.next_sort_key}\n')
                f.write('}\n')
                
            print("✅ 倒排索引保存成功")
        except Exception as e:
            print(f"❌ 保存失败: {e}")
    
    def print_index_statistics(self):
        """打印索引统计信息"""
        print("\n📊 索引统计信息:")
        print(f"  词项数量: {len(self.vocabulary)}")
        print(f"  文档数量: {len(self.doc_lengths)}")
        
        total_postings = sum(len(postings) for postings in self.inverted_lists.values())
        print(f"  倒排记录总数: {total_postings}")
        
        skip_count = self._count_skips()
        print(f"  跳表指针数量: {skip_count}")
        
        if self.inverted_lists:
            avg_length = total_postings / len(self.inverted_lists)
            print(f"  平均倒排列表长度: {avg_length:.2f}")
        
        # 显示词汇表示例
        if self.vocabulary:
            sample_terms = list(self.vocabulary.keys())[:10]
            print(f"  词汇表示例: {sample_terms}")
    
    def print_skip_structure(self, term: str):
        """打印词项的跳表结构"""
        if term not in self.vocabulary:
            print(f"词项 '{term}' 不存在")
            return
        
        term_id = self.vocabulary[term]
        if term_id not in self.inverted_lists:
            print(f"词项 '{term}' 无倒排列表")
            return
        
        postings = self.inverted_lists[term_id]
        print(f"\n📋 词项 '{term}' 的跳表结构 (共{len(postings)}个文档):")
        
        for i, (doc_id, freq, skip_ptr) in enumerate(postings):
            skip_info = f" → [{skip_ptr}]" if skip_ptr != -1 else ""
            sort_key = self._get_doc_sort_key(doc_id)
            print(f"  [{i:2d}] 文档: {doc_id:15s} (排序键:{sort_key:2d}), 频率: {freq}{skip_info}")
    
    def _count_skips(self) -> int:
        """统计跳表指针数量"""
        count = 0
        for postings in self.inverted_lists.values():
            for _, _, skip_ptr in postings:
                if skip_ptr != -1:
                    count += 1
        return count


def main():
    """
    主函数：从JSON正排表文件构建倒排索引
    """
    print("=" * 60)
    print("🚀 倒排索引构建系统 - 从JSON正排表文件构建")
    print("=" * 60)
    
    # 创建倒排索引实例
    index = ImprovedInvertedIndex()
    
    # 文件路径配置 - 请根据实际情况修改这些路径
    forward_file_path = "normalized_tokens.json"    # 输入的JSON正排表文件路径
    inverted_index_path = "inverted_index.json" # 输出的倒排索引JSON文件路径
    
    # 从JSON文件构建倒排索引
    print(f"\n📁 输入文件: {forward_file_path}")
    success = index.build_index_from_file(forward_file_path)
    
    if not success:
        print("❌ 索引构建失败，程序退出")
        return
    
    # 打印统计信息
    index.print_index_statistics()
    
    # 保存倒排索引到JSON文件
    print("\n" + "=" * 40)
    index.save_inverted_index(inverted_index_path)
    
    print("\n✅ 倒排索引构建流程完成!")
    print(f"   输入: {forward_file_path} (JSON正排表)")
    print(f"   输出: {inverted_index_path} (JSON倒排索引)")


if __name__ == "__main__":
    main()

🚀 倒排索引构建系统 - 从JSON正排表文件构建

📁 输入文件: normalized_tokens.json
📖 从JSON文件加载正排表: normalized_tokens.json
✅ 成功加载JSON格式，共 177065 个文档
   文档ID示例: ['Group 1005141', 'Group 1014534', 'Group 1017036', 'Group 1021017', 'Group 1030709']
   词项示例: ['subnat', 'purpose', 'discover', 'identify', 'plant']
🔨 构建带跳表指针的倒排索引...
  步骤1: 构建词汇表...
    词汇表大小: 268574
    词汇表示例: ['!!&nbsp;if', '!!limite', '!!synopsis', '!!welcome', '!).after', '!access', '!after', '!contact', '!either', '!for']
  步骤2: 初始化文档排序系统...
    文档数量: 177065
  步骤3: 构建基础倒排列表...
  步骤4: 对倒排列表排序...
  步骤5: 添加跳表指针...
✅ 构建完成：268574 个词项，327296 个跳表指针

📊 索引统计信息:
  词项数量: 268574
  文档数量: 177065
  倒排记录总数: 8062721
  跳表指针数量: 327296
  平均倒排列表长度: 30.02
  词汇表示例: ['!!&nbsp;if', '!!limite', '!!synopsis', '!!welcome', '!).after', '!access', '!after', '!contact', '!either', '!for']


💾 保存倒排索引到: inverted_index.json
✅ 倒排索引保存成功

✅ 倒排索引构建流程完成!
   输入: normalized_tokens.json (JSON正排表)
   输出: inverted_index.json (JSON倒排索引)
