## 分词

In [None]:
import csv
import jieba
import pkuseg
import ast

def is_word_valid(word):
    """
    检查词语是否包含非法字符。
    
    参数：
    - word: 要检查的词语。
    
    返回：
    - 如果词语不包含非法字符，则返回 True；否则返回 False。
    """
    # 方法1：使用定义的非法字符集
    for char in word:
        if char in ILLEGAL_CHARS:
            return False
    return True

    # 方法2：使用正则表达式
    # return not bool(ILLEGAL_CHARS_PATTERN.search(word))

def escape_illegal_chars(text):
    """
    将非法字符转换为 Unicode 转义序列。
    
    参数：
    - text: 原始字符串。
    
    返回：
    - 转义后的字符串。
    """
    return text.encode('unicode_escape').decode('utf-8')

# 定义非法字符集
ILLEGAL_CHARS = {'§', '#', '$', '%', '&', '*', '!', '@', '^', '(', ')', '-', '=', '+', '{', '}', '[', ']', '|', '\\', ':', ';', '"', "'", '<', '>', ',', '.', '?', '/'}

# 或者使用正则表达式排除含有非法字符的词语
# ILLEGAL_CHARS_PATTERN = re.compile(r'[§#$%&*!@^()\-=+{}\[\]|\\:;"\'<>,.?/]')

# 选择分词工具
tool_choice = input("请选择分词工具（输入 'jieba' 或 'pkuseg'）：")
if tool_choice.lower() == 'jieba':
    use_jieba = True
    print("使用 Jieba 分词")
elif tool_choice.lower() == 'pkuseg':
    use_jieba = False
    print("使用 PKUSeg 分词")
else:
    print("输入有误，默认使用 Jieba 分词")
    use_jieba = True

# 加载停用词表
stopwords_file = 'data/cn_stopwords.txt'
stopwords = set()
with open(stopwords_file, 'r', encoding='utf-8') as f:
    for line in f:
        word = line.strip()
        if word:
            stopwords.add(word)

# 加载同义词词典
synonyms_file = 'data/syno_from_baidu_hanyu.txt'
synonym_dict = {}
with open(synonyms_file, 'r', encoding='utf-8') as f:
    for line in f:
        words = line.strip().split()
        if words:
            representative = words[0]
            for word in words:
                synonym_dict[word] = representative

# 定义文件列表
file_list = [
    {'input': '../data/selected_book_top_1200_data_tag.csv', 'output': 'data/book_output.csv'},
    {'input': '../data/selected_movie_top_1200_data_tag.csv', 'output': 'data/movie_output.csv'}
]

# 初始化 pkuseg 分词器
if not use_jieba:
    seg = pkuseg.pkuseg()

for file_pair in file_list:
    input_file = file_pair['input']
    output_file = file_pair['output']
    print(f"正在处理文件：{input_file}")

    with open(input_file, 'r', encoding='utf-8') as csvfile_in, \
         open(output_file, 'w', encoding='utf-8', newline='') as csvfile_out:
        reader = csv.reader(csvfile_in)
        writer = csv.writer(csvfile_out, quoting=csv.QUOTE_MINIMAL)

        # 跳过输入文件的第一行
        next(reader)

        # 写入输出文件的表头
        header = ['id', 'words']
        writer.writerow(header)

        for row in reader:
            item_id = row[0]
            tags_str = row[1]

            # 解析 Tags 字段
            try:
                tags_set = ast.literal_eval(tags_str)
            except Exception as e:
                print(f"解析文件 {input_file} 中的 ID {item_id} 的标签时出错: {e}")
                continue

            # 分词、去停用词、替换同义词
            new_tags = set()
            for tag in tags_set:
                if use_jieba:
                    words = jieba.lcut(tag)
                else:
                    words = seg.cut(tag)

                for word in words:
                    word = word.strip()
                    if word and word not in stopwords and is_word_valid(word):
                        # 替换同义词
                        representative = synonym_dict.get(word, word)
                        # 转义单引号
                        representative = representative.replace("'", "\\'")
                        # 转义非法字符（此步骤可选，如果已经过滤，可以移除）
                        # representative = escape_illegal_chars(representative)
                        new_tags.add(representative)

            # 重建 Tags 字符串，确保格式正确
            new_tags_str = "{" + ", ".join(f"'{tag}'" for tag in new_tags) + "}"
            writer.writerow([item_id, new_tags_str])

    print(f"文件 {input_file} 处理完成，结果已保存到 {output_file}")

print("所有文件处理完成！")

## 生成倒排索引表

In [None]:
# -*- coding: utf-8 -*-
# Author: Zhenyu Bo
# Date: 2024-11-06

import ast
import pandas as pd
from math import sqrt


def read_words_from_csv(file_path):
    """
    读取 CSV 文件，提取所有单词和文档内容。

    参数：
    - file_path: CSV 文件路径。

    返回：
    - all_words: 所有词项的集合。
    - documents: 字典，键为文档 ID，值为该文档包含的单词集合。
    """
    data = pd.read_csv(file_path, dtype={'id': int, 'words': str})
    all_words = set()
    documents = {}
    for idx in range(len(data)):
        words = ast.literal_eval(data.at[idx, 'words'])
        doc_id = data.at[idx, 'id']
        documents[doc_id] = words
        all_words.update(words)
    return all_words, documents


def generate_inverted_index_table(all_words, documents, output_file):
    """
    生成倒排索引表并保存为 CSV 文件。

    参数：
    - all_words: 集合，包含所有词项。
    - documents: 字典，键为文档 ID，值为该文档的标签集合。
    - output_file: 输出的 CSV 文件路径。
    """
    inverted_index_table = []
    for word in all_words:
        doc_ids = [doc_id for doc_id, words in documents.items() if word in words]
        doc_ids_sorted = sorted(doc_ids)
        num_docs = len(doc_ids_sorted)
        l = int(sqrt(num_docs))  # 设置跳表的间隔
        skip_table = []
        if num_docs > l:
            for i in range(num_docs):
                if i % l == 0:
                    if i < num_docs - l:
                        skip_info = {'index': i + l, 'value': doc_ids_sorted[i + l]}
                    else:
                        # 最后一个跳表指针指向末尾
                        skip_info = {'index': num_docs - 1, 'value': doc_ids_sorted[num_docs - 1]}
                    skip_table.append(skip_info)
        else:
            skip_info = {'index': None, 'value': None}
            skip_table.append(skip_info)
        inverted_index_table.append({'word': word, 'id_list': doc_ids_sorted, 'skip_table': skip_table})
    pd.DataFrame(inverted_index_table).to_csv(output_file, index=False)


# 处理书籍数据
all_book_words, book_documents = read_words_from_csv("../data/book_output.csv")
generate_inverted_index_table(all_book_words, book_documents, "../data/book_inverted_index_table.csv")
print("书籍倒排索引表已成功生成")

# 处理电影数据
all_movie_words, movie_documents = read_words_from_csv("../data/movie_output.csv")
generate_inverted_index_table(all_movie_words, movie_documents, "../data/movie_inverted_index_table.csv")
print("电影倒排索引表已成功生成")


## 提取id序列

In [None]:
# -*- coding: utf-8 -*-
# Author: Zhenyu Bo
# Date: 2024-11-14

"""
从数据集中提取 ID 列，保存到文本文件中。
"""

import pandas as pd

def extract_ids(input_csv, output_txt):
    # 读取 CSV 文件
    df = pd.read_csv(input_csv)

    # 提取 ID 列（假设 ID 在第一列）
    ids = df.iloc[:, 0]

    # 将 ID 写入文本文件
    with open(output_txt, 'w', encoding='utf-8') as f:
        for id in ids:
            f.write(f"{id}\n")


# 处理书籍 ID
extract_ids('../../data/selected_book_top_1200_data_tag.csv', '../data/Book_id.txt')
print('Book_id.txt 文件已生成。')

# 处理电影 ID
extract_ids('../../data/selected_movie_top_1200_data_tag.csv', '../data/Movie_id.txt')
print('Movie_id.txt 文件已生成。')


## 执行布尔查询

In [None]:
import csv
import pandas as pd
import ast
import re

# 倒排索引表，movie_inverted_index_table.csv 
# 全id表，Movie_id.txt
# 词表，movie_words.csv 用于打印结果 目前格式同助教提供的selected_book_top_1200.csv


def read_inverted_index(file_path):
    """
    读取倒排索引表，返回字典格式的数据。
    """
    inverted_index = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            word = row['word']
            id_list = ast.literal_eval(row['id_list'])
            # skip_table 不在此处使用，可省略或保留
            inverted_index[word] = id_list
    return inverted_index

def read_all_ids(file_path):
    """
    读取所有ID，返回ID的集合。
    """
    all_ids = set()
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            id = int(line.strip())
            all_ids.add(id)
    return all_ids

def tokenize(expression):
    """
    将布尔表达式分词。
    """
    tokens = re.findall(r'AND|OR|NOT|\w+|\(|\)', expression)
    return tokens

def infix_to_postfix(tokens):
    """
    将中缀表达式转换为后缀表达式（逆波兰表达式）。
    """
    precedence = {'NOT': 3, 'AND': 2, 'OR': 1}
    output = []
    operator_stack = []
    operators = {'AND', 'OR', 'NOT'}
    for token in tokens:
        if token not in operators and token not in {'(', ')'}:
            output.append(token)
        elif token == '(':
            operator_stack.append(token)
        elif token == ')':
            while operator_stack and operator_stack[-1] != '(':
                output.append(operator_stack.pop())
            operator_stack.pop()  # 弹出 '('
        else:  # 操作符
            while operator_stack and operator_stack[-1] != '(' and precedence.get(operator_stack[-1], 0) >= precedence.get(token, 0):
                output.append(operator_stack.pop())
            operator_stack.append(token)
    while operator_stack:
        output.append(operator_stack.pop())
    return output

def evaluate_postfix(postfix_tokens, inverted_index, all_ids):
    """
    评估后缀表达式，返回符合条件的ID集合。
    """
    stack = []
    operators = {'AND', 'OR', 'NOT'}
    for token in postfix_tokens:
        if token not in operators:
            if token in inverted_index:
                stack.append(set(inverted_index[token]))
            else:
                stack.append(set())
        elif token == 'NOT':
            operand = stack.pop()
            stack.append(all_ids - operand)
        else:
            right = stack.pop()
            left = stack.pop()
            if token == 'AND':
                stack.append(left & right)
            elif token == 'OR':
                stack.append(left | right)
    return stack.pop() if stack else set()

def display_results(result_ids, words_df):
    """
    根据查询结果的ID，从 DataFrame 中查找并打印ID和标签。
    """
    df = words_df[words_df['id'].isin(result_ids)]
    if not df.empty:
        for index, row in df.iterrows():
            id = row['id']
            words = ast.literal_eval(row['words'])
            print(f"ID: {id}")
            print(f"标签: {', '.join(words)}\n")
    else:
        print("没有符合条件的结果。")


import time

# 主程序
print("正在加载数据，请稍候...")

# 加载书籍数据
book_inverted_index = read_inverted_index('../data/book_inverted_index_table.csv')
book_all_ids = read_all_ids('../data/Book_id.txt')
book_words_df = pd.read_csv('../data/book_words.csv', dtype={'id': int, 'words': str})

# 加载电影数据
movie_inverted_index = read_inverted_index('../data/movie_inverted_index_table.csv')
movie_all_ids = read_all_ids('../data/Movie_id.txt')
movie_words_df = pd.read_csv('../data/movie_words.csv', dtype={'id': int, 'words': str})

print("数据加载完成！")

while True:
    choice = input("请选择查询类型（1 - 书籍，2 - 电影）：\n")
    if choice == '1':
        inverted_index = book_inverted_index
        all_ids = book_all_ids
        words_df = book_words_df
    elif choice == '2':
        inverted_index = movie_inverted_index
        all_ids = movie_all_ids
        words_df = movie_words_df
    else:
        print("输入错误，请输入 1 或 2。")
        continue  # 重新开始循环

    expression = input("请输入布尔查询表达式：\n")
    # 记录查询开始时间
    start_time = time.time()
    tokens = tokenize(expression)
    postfix_tokens = infix_to_postfix(tokens)
    result_ids = evaluate_postfix(postfix_tokens, inverted_index, all_ids)
    # 计算查询时间
    elapsed_time = time.time() - start_time
    
    if result_ids:
        print("查询结果：\n")
        display_results(result_ids, words_df)
    else:
        print("没有符合条件的结果。")
    
    print(f"查询耗时：{elapsed_time:.6f} 秒\n")

    cont = input("是否继续查询？(Y/N): ")
    if cont.strip().lower() != 'y':
        print("感谢您的使用！")
        break  # 退出循环，结束程序
