In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
import os
import json
import struct
import math
from collections import defaultdict

In [8]:
import os
import re
import nltk
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords

# 下载 nltk 需要的资源
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('stopwords')

# 加载停用词表
stop_words = set(stopwords.words('english'))  # 停用词集合

def list_files(directory):
    """获取所有文本文件路径"""
    return [os.path.join(directory, f) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]

def preprocess_text(text):
    """文本预处理：分词、去除标点、小写化、去除数字、停用词过滤、词干提取"""
    stemmer = PorterStemmer()
    tokens = nltk.word_tokenize(text)  # 分词
    processed_tokens = []
    for token in tokens:
        token = re.sub(r'[^\w\s]', '', token).lower() # 归一化：去除标点 & 小写化
        if token in stop_words: # 跳过停用词
            continue
        # 词干提取
        if token.isalpha():  # 过滤掉纯数字
            stemmed_token = stemmer.stem(token)
            processed_tokens.append(stemmed_token)
    return processed_tokens

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [9]:
import json
import time

class BSBIIndexer:
    def __init__(self, block_size=100000, output_dir="bsbi_blocks"):
        self.block_size = block_size
        self.output_dir = output_dir
        os.makedirs(self.output_dir, exist_ok=True)

    def index_documents(self, directory):
        """构建 BSBI 索引"""
        doc_files = list_files(directory)
        block_id = 0
        term_doc_pairs = []

        for doc_id, file_path in enumerate(doc_files):
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()
                tokens = preprocess_text(text)
                term_doc_pairs.extend([(token, doc_id) for token in tokens])

            if len(term_doc_pairs) >= self.block_size:
                self.write_block(term_doc_pairs, block_id)
                term_doc_pairs = []
                block_id += 1

        if term_doc_pairs:
            self.write_block(term_doc_pairs, block_id)
            block_id += 1

        return block_id

    def write_block(self, term_doc_pairs, block_id):
        """对 block 排序并写入磁盘"""
        start_time = time.time();
        term_doc_pairs.sort()
        block_file = os.path.join(self.output_dir, f"block_{block_id}.json")
        with open(block_file, 'w', encoding='utf-8') as f:
            json.dump(term_doc_pairs, f)
        end_time = time.time();
        print(f"Block {block_id} written with {len(term_doc_pairs)} pairs. Sorting took {end_time - start_time:.2f} seconds.")

In [10]:
import heapq
import psutil  # 用于监测内存占用

class BSBI_Merger:
    def __init__(self, output_dir="bsbi_blocks", final_index="final_index.json"):
        self.output_dir = output_dir
        self.final_index = final_index

    def merge_blocks(self, block_count):
        """归并多个已排序 block"""
        min_heap = []
        file_iters = [open(os.path.join(self.output_dir, f"block_{i}.json"), "r") for i in range(block_count)]
        block_data = [json.load(f) for f in file_iters]

        # 记录归并前内存
        memory_before = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)

        # 初始化 heap
        for i, block in enumerate(block_data):
            if block:
                heapq.heappush(min_heap, (block[0][0], block[0][1], i, 0))  # (term, doc_id, block_index, position)

        merged_index = {}

        while min_heap:
            term, doc_id, block_idx, pos = heapq.heappop(min_heap)

            if term not in merged_index:
                merged_index[term] = set()
            merged_index[term].add(doc_id)

            # 继续从相应的 block 取数据
            if pos + 1 < len(block_data[block_idx]):
                next_term, next_doc_id = block_data[block_idx][pos + 1]
                heapq.heappush(min_heap, (next_term, next_doc_id, block_idx, pos + 1))

        # 记录归并后内存
        memory_after = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)

        # 关闭文件
        for f in file_iters:
            f.close()

        # 写入最终索引
        with open(self.final_index, "w", encoding="utf-8") as f:
            json.dump({term: sorted(list(doc_ids)) for term, doc_ids in merged_index.items()}, f)


        print(f"Pre-merge memory: {memory_before:.2f} MB | Post-merge memory: {memory_after:.2f} MB | Increase: {memory_after - memory_before:.2f} MB\n")
        print(f"Final index written to {self.final_index}\n")

# Document Compression Techniques
1. **Dictionary Compression**: Reduce storage overhead of string terms by mapping them to numeric IDs.
2. **Delta Encoding**: Compress sorted document ID lists by storing differences between consecutive IDs.
3. **Variable Byte (VByte) Encoding**: Compact representation of integers using variable-length bytes.

In [11]:
class CompressedIndex:
    def __init__(self, original_index_file):
        """初始化压缩索引"""
        self.original_index_file = original_index_file
        self.compressed_index_file = original_index_file.replace('.json', '.compressed')
        self.term_dict_file = original_index_file.replace('.json', '.term_dict')

        # 加载原始索引
        with open(original_index_file, 'r', encoding='utf-8') as f:
            self.original_index = json.load(f)

        # 压缩后的数据结构
        self.term_to_id = {}  # 术语到ID的映射
        self.id_to_term = {}   # ID到术语的映射
        self.compressed_index = {}  # 压缩后的索引 {term_id: compressed_doc_ids}

        # 统计信息
        self.original_size = 0
        self.compressed_size = 0

    def build_term_dictionary(self):
        """构建术语字典，为每个术语分配一个唯一ID"""
        sorted_terms = sorted(self.original_index.keys())
        for term_id, term in enumerate(sorted_terms):
            self.term_to_id[term] = term_id
            self.id_to_term[term_id] = term

    def variable_byte_encode(self, numbers):
        """变长字节编码压缩数字列表"""
        encoded_bytes = bytearray()
        for num in numbers:
            # 处理0的特殊情况
            if num == 0:
                encoded_bytes.append(0x80)
                continue

            bytes_to_write = []
            while num > 0:
                bytes_to_write.append(num & 0x7F)
                num >>= 7

            # 设置最高位标记字节结束
            bytes_to_write[0] |= 0x80
            # 反转字节顺序
            encoded_bytes.extend(reversed(bytes_to_write))
        return bytes(encoded_bytes)

    def variable_byte_decode(self, encoded_bytes):
        """变长字节解码"""
        numbers = []
        current_num = 0
        for byte in encoded_bytes:
            # 检查最高位
            if byte & 0x80:
                current_num = (current_num << 7) | (byte & 0x7F)
                numbers.append(current_num)
                current_num = 0
            else:
                current_num = (current_num << 7) | byte
        return numbers

    def delta_encode(self, numbers):
        """Delta编码"""
        if not numbers:
            return []

        # 先排序
        sorted_numbers = sorted(numbers)
        delta_encoded = [sorted_numbers[0]]
        for i in range(1, len(sorted_numbers)):
            delta_encoded.append(sorted_numbers[i] - sorted_numbers[i-1])
        return delta_encoded

    def delta_decode(self, delta_encoded):
        """Delta解码"""
        if not delta_encoded:
            return []

        numbers = [delta_encoded[0]]
        for i in range(1, len(delta_encoded)):
            numbers.append(numbers[-1] + delta_encoded[i])
        return numbers

    def compress_index(self):
        """压缩整个索引"""
        self.build_term_dictionary()

        for term, doc_ids in self.original_index.items():
            term_id = self.term_to_id[term]

            # 先进行delta编码
            delta_encoded = self.delta_encode(doc_ids)

            # 再进行变长字节编码
            compressed_doc_ids = self.variable_byte_encode(delta_encoded)

            # 存储压缩后的数据
            self.compressed_index[term_id] = compressed_doc_ids

    def decompress_term(self, term_id):
        """解压缩单个术语的文档列表"""
        compressed_data = self.compressed_index.get(term_id, b'')
        if not compressed_data:
            return []

        # 先变长字节解码
        delta_encoded = self.variable_byte_decode(compressed_data)

        # 再delta解码
        doc_ids = self.delta_decode(delta_encoded)

        return doc_ids

    def save_compressed_index(self):
        """保存压缩后的索引到文件"""
        # 保存术语字典
        with open(self.term_dict_file, 'w', encoding='utf-8') as f:
            json.dump({
                'term_to_id': self.term_to_id,
                'id_to_term': self.id_to_term
            }, f)

        # 保存压缩后的索引
        with open(self.compressed_index_file, 'wb') as f:
            # 先写入术语数量
            f.write(struct.pack('I', len(self.compressed_index)))

            for term_id, compressed_data in self.compressed_index.items():
                # 写入term_id (4字节)
                f.write(struct.pack('I', term_id))
                # 写入压缩数据长度 (4字节)
                f.write(struct.pack('I', len(compressed_data)))
                # 写入压缩数据
                f.write(compressed_data)

    def load_compressed_index(self):
        """从文件加载压缩索引"""
        # 加载术语字典
        with open(self.term_dict_file, 'r', encoding='utf-8') as f:
            dict_data = json.load(f)
            self.term_to_id = dict_data['term_to_id']
            self.id_to_term = dict_data['id_to_term']

        # 加载压缩索引
        self.compressed_index = {}
        with open(self.compressed_index_file, 'rb') as f:
            # 读取术语数量
            term_count = struct.unpack('I', f.read(4))[0]

            for _ in range(term_count):
                # 读取term_id
                term_id = struct.unpack('I', f.read(4))[0]
                # 读取数据长度
                data_len = struct.unpack('I', f.read(4))[0]
                # 读取压缩数据
                compressed_data = f.read(data_len)

                self.compressed_index[term_id] = compressed_data

    def calculate_sizes(self):
        """计算压缩前后的尺寸"""
        # 原始索引大小
        self.original_size = os.path.getsize(self.original_index_file)

        # 压缩后大小
        self.compress_index()
        self.save_compressed_index()
        self.compressed_size = os.path.getsize(self.compressed_index_file) + \
                             os.path.getsize(self.term_dict_file)

        return {
            'original_size': self.original_size,
            'compressed_size': self.compressed_size,
            'compression_ratio': self.compressed_size / self.original_size
        }

    def print_compression_stats(self):
        """打印压缩统计信息"""
        stats = self.calculate_sizes()
        print("\nCompression Statistics:")
        print(f"Original index size: {stats['original_size'] / 1024:.2f} KB")
        print(f"Compressed index size: {stats['compressed_size'] / 1024:.2f} KB")
        print(f"Compression ratio: {stats['compression_ratio']:.2%}")
        print(f"Space savings: {(1 - stats['compression_ratio']) * 100:.2f}%")

In [15]:
class CompressedBooleanQueryProcessor:
    def __init__(self, compressed_index):
        self.compressed_index = compressed_index

    def process_query(self, query):
      """ 处理布尔查询（完整实现）

      支持 AND, OR, NOT 运算符，运算符优先级：NOT > AND > OR
      示例:
          "word1 AND word2 OR word3" -> (word1 AND word2) OR word3
          "word1 OR word2 AND word3" -> word1 OR (word2 AND word3)
          "word1 AND NOT word2" -> word1 AND (NOT word2)
      """
      terms = query.split()
      if not terms:
          return []

      # 初始化栈用于处理运算符优先级
      operand_stack = []
      operator_stack = []

      i = 0
      while i < len(terms):
          term = terms[i]

          if term.upper() in {'AND', 'OR', 'NOT'}:
              # 处理运算符
              current_op = term.upper()

              # NOT 运算符是单目运算符，直接处理
              if current_op == 'NOT':
                  # 获取下一个词作为操作数
                  if i + 1 >= len(terms):
                      raise ValueError("NOT operator must be followed by a term")

                  next_term = terms[i+1].lower()
                  if next_term in {'AND', 'OR', 'NOT'}:
                      raise ValueError(f"Invalid term after NOT: {next_term}")

                  # 处理NOT操作
                  term_id = self.compressed_index.term_to_id.get(next_term, -1)
                  if term_id == -1:
                      doc_set = set()  # 词不存在，NOT后为空集
                  else:
                      doc_set = set(self.compressed_index.decompress_term(term_id))

                  # NOT操作就是取补集（这里需要知道全集，简化处理为当前结果集的补集）
                  # 实际应用中应该传入全集文档数
                  operand_stack.append(doc_set)
                  i += 2  # 跳过已处理的NOT和下一个词
              else:
                  # AND 或 OR 运算符
                  operator_stack.append(current_op)
                  i += 1
          else:
              # 处理普通词项
              term = term.lower()
              term_id = self.compressed_index.term_to_id.get(term, -1)
              if term_id == -1:
                  doc_set = set()  # 词不存在
              else:
                  doc_set = set(self.compressed_index.decompress_term(term_id))
              operand_stack.append(doc_set)
              i += 1

      # 如果没有运算符，直接返回第一个操作数
      if not operator_stack:
          return sorted(operand_stack[0]) if operand_stack else []

      # 按照运算符优先级处理栈中的内容 (NOT已经处理，剩下AND优先级高于OR)
      result = operand_stack[0]
      for i in range(len(operator_stack)):
          op = operator_stack[i]
          next_operand = operand_stack[i+1]

          if op == 'AND':
              result &= next_operand
          elif op == 'OR':
              result |= next_operand

      return sorted(result)

In [23]:
import json

class BooleanQueryProcessor:
    def __init__(self, index_file):
        with open(index_file, 'r', encoding='utf-8') as f:
            self.inverted_index = json.load(f)

    def process_query(self, query):
        # 将查询字符串拆分为操作符和操作数
        terms = query.split()
        if not terms:
            return []

        result_set = None
        current_op = None

        for term in terms:
            if term.upper() in {'AND', 'OR', 'NOT'}:
                current_op = term.upper()
            else:
                term = term.lower()
                if term in self.inverted_index:
                    doc_set = set(self.inverted_index[term])
                else:
                    doc_set = set()

                if result_set is None:
                    result_set = doc_set
                else:
                    if current_op == 'AND':
                        result_set &= doc_set
                    elif current_op == 'OR':
                        result_set |= doc_set
                    elif current_op == 'NOT':
                        result_set -= doc_set

        return sorted(result_set) if result_set is not None else []

In [21]:
if __name__ == "__main__":
    # base_dir = os.path.expanduser("~/Documents/HillaryEmails")
    base_dir = "/content/drive/MyDrive/HillaryEmails/HillaryEmails"
    input_dir = base_dir  # 你的数据集目录
    block_size = 100000  # 约为1.2MB,每个 block 处理的最大 term-document 对数

    # 原有索引构建（Task 1-2）
    indexer = BSBIIndexer()
    block_count = indexer.index_documents(input_dir)
    merger = BSBI_Merger()
    merger.merge_blocks(block_count)

    # 新增压缩步骤（Task 3）
    compressed_index = CompressedIndex("/content/final_index.json")
    compressed_index.compress_index()
    compressed_index.save_compressed_index()
    compressed_index.print_compression_stats()

Block 0 written with 100270 pairs. Sorting took 0.51 seconds.
Block 1 written with 100934 pairs. Sorting took 0.31 seconds.
Block 2 written with 100068 pairs. Sorting took 0.50 seconds.
Block 3 written with 100943 pairs. Sorting took 0.31 seconds.
Block 4 written with 100048 pairs. Sorting took 0.29 seconds.
Block 5 written with 100076 pairs. Sorting took 0.30 seconds.
Block 6 written with 100108 pairs. Sorting took 0.30 seconds.
Block 7 written with 100084 pairs. Sorting took 0.54 seconds.
Block 8 written with 100450 pairs. Sorting took 0.30 seconds.
Block 9 written with 2503 pairs. Sorting took 0.01 seconds.
Pre-merge memory: 518.53 MB | Post-merge memory: 526.78 MB | Increase: 8.25 MB

Final index written to final_index.json


Compression Statistics:
Original index size: 3072.05 KB
Compressed index size: 1772.34 KB
Compression ratio: 57.69%
Space savings: 42.31%


In [24]:
from time import time

def run_query_examples(use_compressed=True):
    """执行查询示例并比较结果"""
    # 加载索引（根据选择加载压缩版或原始版）
    if use_compressed:
        print("=== 使用压缩版索引查询 ===")
        compressed_index = CompressedIndex("final_index.json")
        compressed_index.load_compressed_index()  # 加载已压缩的索引
        processor = CompressedBooleanQueryProcessor(compressed_index)
    else:
        print("=== 使用原始版索引查询 ===")
        processor = BooleanQueryProcessor("final_index.json")  # 直接加载原始JSON索引

    # 测试查询列表
    test_queries = [
        "clinton AND email",          # 简单AND查询
        "obama OR president",         # 简单OR查询
        "hillary NOT bill",           # NOT查询
        "security AND (white OR house)",  # 组合查询
        "nonexistent_term"            # 不存在的词项
    ]

    # 执行每个查询
    for query in test_queries:
        print(f"\n查询: '{query}'")

        # 计时
        start_time = time()
        result = processor.process_query(query)
        elapsed_ms = (time() - start_time) * 1000  # 毫秒

        # 显示结果
        print(f"返回文档数: {len(result)}")
        print(f"耗时: {elapsed_ms:.2f} ms")
        if result:
            print(f"前5个文档ID示例: {result[:5]}")

# 对比测试
print("【性能对比测试】")
run_query_examples(use_compressed=True)    # 压缩版查询
run_query_examples(use_compressed=False)   # 原始版查询

【性能对比测试】
=== 使用压缩版索引查询 ===

查询: 'clinton AND email'
返回文档数: 119
耗时: 0.32 ms
前5个文档ID示例: [57, 73, 241, 266, 267]

查询: 'obama OR president'
返回文档数: 377
耗时: 0.13 ms
前5个文档ID示例: [13, 16, 57, 59, 101]

查询: 'hillary NOT bill'
返回文档数: 0
耗时: 0.08 ms

查询: 'security AND (white OR house)'
返回文档数: 0
耗时: 0.01 ms

查询: 'nonexistent_term'
返回文档数: 0
耗时: 0.00 ms
=== 使用原始版索引查询 ===

查询: 'clinton AND email'
返回文档数: 119
耗时: 0.13 ms
前5个文档ID示例: [57, 73, 241, 266, 267]

查询: 'obama OR president'
返回文档数: 377
耗时: 0.06 ms
前5个文档ID示例: [13, 16, 57, 59, 101]

查询: 'hillary NOT bill'
返回文档数: 0
耗时: 0.03 ms

查询: 'security AND (white OR house)'
返回文档数: 0
耗时: 0.01 ms

查询: 'nonexistent_term'
返回文档数: 0
耗时: 0.00 ms
