In [3]:
import os
import sys
import yaml
import time
from tqdm import tqdm
from src.utils.logger import setup_logger
from src.utils.data_loader import DataLoader
from src.feature_extraction import FeatureExtractor
from src.fingerprint.minhash import MinHash
from src.fingerprint.simhash import SimHash
from src.fingerprint.bitsampling import BitSampling
from src.lsh.lsh_index import MinHashLSHIndex, SimHashLSHIndex, BitSamplingLSHIndex
from src.lsh.evaluation import Evaluator
from src.preprocessing import preprocess_text


def load_config(config_path: str) -> dict:
    with open(config_path, "r") as file:
        return yaml.safe_load(file)

In [4]:
# 加载配置文件
config_path = "config/config.yaml"
config = load_config(config_path)

log_file = config["logging"]["log_file"]
log_level = config["logging"]["log_level"]
logger = setup_logger(log_file, log_level)
logger.info("系统启动，加载配置完成。")

2025-04-08 22:11:32,901 - data/processed/system.log - INFO - 系统启动，加载配置完成。


In [5]:
# 加载数据
data_loader = DataLoader()
raw_data_path = config["data"]["raw_data_path"]
logger.info(f"加载数据路径：{raw_data_path}")

raw_data = []

if os.path.isfile(raw_data_path) and raw_data_path.endswith(".parquet"):
    try:
        logger.info(f"加载单个文件：{raw_data_path}")
        raw_data = data_loader.load_data(raw_data_path)
    except Exception as e:
        logger.error(f"文件加载失败：{e}")
elif os.path.isdir(raw_data_path):
    logger.info(f"加载目录中的 Parquet 文件：{raw_data_path}")
    parquet_files = [os.path.join(raw_data_path, f) for f in os.listdir(
        raw_data_path) if f.endswith(".parquet")]
    for file_path in parquet_files:
        try:
            logger.info(f"加载文件：{file_path}")
            raw_data.extend(data_loader.load_data(file_path))
        except Exception as e:
            logger.warning(f"{file_path} 加载失败：{e}")
else:
    logger.error("无效路径，退出。")
    raise SystemExit()

logger.info(f"数据加载完成，共加载 {len(raw_data)} 条记录。")

2025-04-08 22:11:32,955 - data/processed/system.log - INFO - 加载数据路径：data/raw/sample_test.parquet
2025-04-08 22:11:32,956 - data/processed/system.log - INFO - 加载单个文件：data/raw/sample_test.parquet
2025-04-08 22:11:32,956 - data/processed/system.log - INFO - 加载单个文件：data/raw/sample_test.parquet
2025-04-08 22:11:33,039 - data/processed/system.log - INFO - 数据加载完成，共加载 8 条记录。


In [6]:
# 文本处理和特征提取示例
logger.info("开始文本预处理...")
preprocessed_data = [preprocess_text(text) for text in raw_data]

feature_method = config["feature_extraction"]["method"]
ngram_size = config["feature_extraction"].get("ngram_size", 3)

logger.info(f"特征提取方法：{feature_method}")
extractor = FeatureExtractor(method=feature_method, n=ngram_size)
features = [extractor.extract_features(text) for text in preprocessed_data]
logger.info("预处理和特征提取完成。")

2025-04-08 22:11:33,043 - data/processed/system.log - INFO - 开始文本预处理...
2025-04-08 22:11:33,044 - data/processed/system.log - INFO - 特征提取方法：ngram
2025-04-08 22:11:33,044 - data/processed/system.log - INFO - 预处理和特征提取完成。
2025-04-08 22:11:33,044 - data/processed/system.log - INFO - 特征提取方法：ngram
2025-04-08 22:11:33,044 - data/processed/system.log - INFO - 预处理和特征提取完成。


In [7]:
# 指纹生成
fingerprint_method = config["fingerprint"]["method"]
logger.info(f"开始指纹生成，方法：{fingerprint_method}")

if fingerprint_method == "minhash":
    minhash = MinHash(config["fingerprint"]["num_hashes"],
                      seed=config["fingerprint"].get("seed"))
    signatures = [minhash.compute_signature(
        f) for f in tqdm(features, desc="生成 MinHash 签名")]
elif fingerprint_method == "simhash":
    simhash = SimHash(hash_bits=config["fingerprint"]["hash_bits"])
    signatures = [simhash.compute_signature(
        f) for f in tqdm(features, desc="生成 SimHash 签名")]
elif fingerprint_method == "bitsampling":
    bitsampling = BitSampling(
        sample_size=config["fingerprint"]["sample_size"],
        hash_bits=config["fingerprint"]["hash_bits"],
        seed=config["fingerprint"].get("seed")
    )
    signatures = [bitsampling.compute_signature(
        f) for f in tqdm(features, desc="生成 BitSampling 签名")]
else:
    logger.error("未知指纹方法，退出。")
    raise SystemExit()

# 可选：保存签名
fingerprint_output_path = config["fingerprint"]["output_path"]
data_loader.save_signatures(signatures, fingerprint_output_path)
logger.info(f"签名保存至：{fingerprint_output_path}")

2025-04-08 22:11:33,049 - data/processed/system.log - INFO - 开始指纹生成，方法：minhash
生成 MinHash 签名: 100%|██████████| 8/8 [00:00<00:00, 4444.30it/s]
2025-04-08 22:11:33,066 - data/processed/system.log - INFO - 签名保存至：data/processed/fingerprints.csv


数据已成功保存为 CSV 文件: data/processed/fingerprints.csv


In [8]:
# LSH 索引构建
lsh_method = config["lsh"]["method"]
logger.info(f"开始构建 LSH 索引，方法：{lsh_method}")

if lsh_method == "minhash":
    lsh_index = MinHashLSHIndex(
        config["lsh"]["num_bands"], config["lsh"]["rows_per_band"])
elif lsh_method == "simhash":
    lsh_index = SimHashLSHIndex(radius=config["lsh"]["radius"])
elif lsh_method == "bitsampling":
    lsh_index = BitSamplingLSHIndex(
        config["lsh"]["num_hash_tables"], config["lsh"]["bits_per_table"])
else:
    logger.error("未知 LSH 方法，退出。")
    raise SystemExit()

lsh_index.index(signatures)
candidate_pairs = lsh_index.get_candidate_pairs()
logger.info(f"生成候选文档对数量：{len(candidate_pairs)}")

2025-04-08 22:11:33,070 - data/processed/system.log - INFO - 开始构建 LSH 索引，方法：minhash
2025-04-08 22:11:33,072 - data/processed/system.log - INFO - 生成候选文档对数量：1
2025-04-08 22:11:33,072 - data/processed/system.log - INFO - 生成候选文档对数量：1


In [9]:
# 评估
ground_truth_path = config["evaluation"]["ground_truth_path"]

if os.path.exists(ground_truth_path):
    with open(ground_truth_path, "r") as f:
        ground_truth = set(tuple(map(int, line.strip().split(",")))
                           for line in f)
else:
    logger.warning("未提供 ground truth，跳过准确率评估。")
    ground_truth = None

evaluator = Evaluator(candidate_pairs, ground_truth)

if ground_truth:
    metrics = evaluator.compute_performance_metrics()
    evaluator.generate_report(metrics, runtime=time.time())
else:
    dup_rate = evaluator.compute_duplicate_rate()
    logger.info(f"估计重复率：{dup_rate:.4f}")

2025-04-08 22:11:33,079 - data/processed/system.log - INFO - 估计重复率：1.0000
2025-04-08 22:11:33,079 - data/processed/system.log - INFO - 估计重复率：1.0000


In [10]:
# 保存候选对
results_path = config["output"]["results_path"]
os.makedirs(os.path.dirname(results_path), exist_ok=True)

with open(results_path, "w") as f:
    for pair in candidate_pairs:
        f.write(f"{pair[0]},{pair[1]}\n")

logger.info(f"候选对写入完成：{results_path}")

2025-04-08 22:11:33,085 - data/processed/system.log - INFO - 候选对写入完成：data/results/candidate_pairs.csv
