In [19]:
# Global dependencies required by this notebook:
# - python-dotenv: load environment variables like COHERE_API_KEY (install with `pip install python-dotenv`)
# These are environment-level installs and should be run once per environment.
%pip install python-dotenv
%pip install mermaid-python


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
Collecting mermaid-python
  Downloading mermaid_python-0.1-py3-none-any.whl.metadata (1.3 kB)
Downloading mermaid_python-0.1-py3-none-any.whl (3.2 kB)
Installing collected packages: mermaid-python
Successfully installed mermaid-python-0.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [20]:
from dotenv import load_dotenv
from mermaid import Mermaid

load_dotenv()

True

### RRF Rerank

RRF（Reciprocal Rank Fusion） 是一种 无模型、无训练的排序融合算法，核心思想是“一个结果在多个排序中都靠前，就更重要”。

> 专业项目中常将 **RRF 作为召回融合层** → 再用 **Cross-Encoder / LLM Reranker** 做精排。

**算法公式**

$$ \text{Score}(d) = \sum_{i=1}^{n} \frac{1}{k + \text{rank}_i(d)} $$

这里的 `k` 是平滑参数（默认 60，可调），用来控制前 `n` 名项的贡献分布。

**原理**

假设两个路径的 Top-5:

|id|sparse rank|dense rank|
|---|---|---|
|101|1|2|
|198|4|1|
|175|5|4|
|203|2|—|
|150|3|—|

RRF 融合后按综合得分排序，交叉出现的更高一致性者胜出。
    
**RRF Rerank 优点**

- 无模型，纯工程级算法实现
- 代码实现简单
- 可以对不同检索器召回结果进行融合
- 延迟极低

**典型流程示例**

In [22]:
Mermaid("""
flowchart LR
    A[Query] --> B[BM25]
    A --> C[Embeddings]
    B --> D[RRF Rank]
    C --> D[RRF Rank]
    D --> E[Cross-Encoder]
""")

In [11]:
from collections import defaultdict

def rrf_rerank(documnents: list[list[str]], k: int = 60, limit: int | None = None) -> list[tuple[str, float]]:
    """Reciprocal Rank Fusion (RRF)"""
    scores = defaultdict(float)
    for ranked in documnents:
        for rank, doc_id in enumerate(ranked, start=1):
            scores[doc_id] += 1.0 / (k + rank)
    ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    return ranked[:limit] if limit else ranked

**使用示例**

假设我们已经用 BM25、BGE、Qwen 得到了召回结果

In [17]:
data = [
    ["docA", "docB", "docC", "docD"],  # BM25 排名结果
    ["docB", "docE", "docA", "docF"],  # BGE Embedding 排名结果
    ["docC", "docA", "docG", "docH"],  # Qwen Embedding 排名结果
]

In [18]:
# ============================================================================
# 使用默认的 k=60
# ============================================================================
result = rrf_rerank(data)
print("RRF result (default k=60):")
for doc_id, score in result:
    print(f"  Doc ID: {doc_id}, Score: {score:.4f}")
# ============================================================================
# 指定 k 值和 limit
# ============================================================================
result = rrf_rerank(data, k=20, limit=3)
print("RRF result (k=20, limit=3):")
for doc_id, score in result:
    print(f"  Doc ID: {doc_id}, Score: {score:.4f}")

RRF result (default k=60):
  Doc ID: docA, Score: 0.0484
  Doc ID: docB, Score: 0.0325
  Doc ID: docC, Score: 0.0323
  Doc ID: docE, Score: 0.0161
  Doc ID: docG, Score: 0.0159
  Doc ID: docD, Score: 0.0156
  Doc ID: docF, Score: 0.0156
  Doc ID: docH, Score: 0.0156
RRF result (k=20, limit=3):
  Doc ID: docA, Score: 0.1366
  Doc ID: docB, Score: 0.0931
  Doc ID: docC, Score: 0.0911


### WeightedRanker

WeightedRanker 是一种加权融合排序器，对多个打分来源按权重加权求和，得到最终排序，常见于推荐系统、搜索排序融合。

**算法公式**

$$ Score(d) = \sum_{i=1}^{n} w_i \cdot score_i(d) $$

- $d$：文档 / 候选结果
- $score_i(d)$：第 $i$ 个模型或通道给 $d$ 的分数
- $w_i$：该通道的权重
- $n$：通道数量

**WeightedRanker vs RRF**

| 维度      | WeightedRanker | RRF     |
| ------- | -------------- | ------- |
| 是否用原始分数 |   用            |   只看名次  |
| 是否需要归一化 |   必须           |   不需要   |
| 是否可控    | 强        | 弱    |
| 抗异常能力   | 一般             | 强 |
| 工程复杂度   | 中              | 低       |

在 BI 领域业务中，WeightedRanker 比 RRF 更适合。

In [27]:
def weighted_rank(docs_scores: dict[str, dict[str, float]], weights: dict[str, float], alpha_tie_break: bool = False):
    final_scores = {}

    for doc, scores in docs_scores.items():
        # Calculate the weighted sum of scores for the current document
        # Using a generator expression with sum() for conciseness and readability
        # weights.get(k, 0) handles cases where a score type might not have a corresponding weight, defaulting to 0.
        total = sum(weights.get(k, 0) * v for k, v in scores.items())
        final_scores[doc] = total

    # Define the sorting key based on the alpha_tie_break parameter
    if alpha_tie_break:
        # Sort primarily by score (descending), then by document name (ascending) for tie-breaking
        # Multiplying score by -1 ensures descending order for scores when sorting ascending by the tuple.
        sort_key = lambda x: (-x[1], x[0])
        # Since the key is (-score, doc_name), sorted will naturally sort scores descending and doc_names ascending for ties.
        return sorted(final_scores.items(), key=sort_key)
    else:
        # Sort only by score (descending), relying on Python's stable sort for ties if alpha_tie_break is False.
        return sorted(final_scores.items(), key=lambda x: x[1], reverse=True)

In [31]:
print("\n--- Running tests for updated weighted_rank function ---")

print("\nCase 1: Basic functionality with multiple documents and scores")
docs_scores_1 = {
    'doc_A': {'score1': 0.8, 'score2': 0.6},
    'doc_B': {'score1': 0.9, 'score2': 0.5},
    'doc_C': {'score1': 0.7, 'score2': 0.7}
}
weights_1 = {'score1': 0.6, 'score2': 0.4}
expected_output_1 = [('doc_B', 0.74), ('doc_A', 0.72), ('doc_C', 0.7)]
actual_output_1 = weighted_rank(docs_scores_1, weights_1)
assert actual_output_1 == expected_output_1, f"Test Case 1 Failed: Expected {expected_output_1}, Got {actual_output_1}"
print(actual_output_1)

print("\nCase 2: Cases where some weights are zero")
docs_scores_2 = {
    'doc_A': {'score1': 0.8, 'score2': 0.6},
    'doc_B': {'score1': 0.9, 'score2': 0.5}
}
weights_2 = {'score1': 1.0, 'score2': 0.0}
expected_output_2 = [('doc_B', 0.9), ('doc_A', 0.8)]
actual_output_2 = weighted_rank(docs_scores_2, weights_2)
assert actual_output_2 == expected_output_2, f"Test Case 2 Failed: Expected {expected_output_2}, Got {actual_output_2}"
print(actual_output_2)

print("\nCase 3: Cases with missing keys in the weights dictionary (default weight should be 0)")
docs_scores_3 = {
    'doc_A': {'score1': 0.8, 'score2': 0.6},
    'doc_B': {'score1': 0.9, 'score2': 0.5}
}
weights_3 = {'score1': 1.0}
expected_output_3 = [('doc_B', 0.9), ('doc_A', 0.8)] # score2 should have 0 weight
actual_output_3 = weighted_rank(docs_scores_3, weights_3)
assert actual_output_3 == expected_output_3, f"Test Case 3 Failed: Expected {expected_output_3}, Got {actual_output_3}"
print(actual_output_3)

print("\nCase 4: Edge case - empty docs_scores dictionary")
docs_scores_4 = {}
weights_4 = {'score1': 0.5, 'score2': 0.5}
expected_output_4 = []
actual_output_4 = weighted_rank(docs_scores_4, weights_4)
assert actual_output_4 == expected_output_4, f"Test Case 4 Failed: Expected {expected_output_4}, Got {actual_output_4}"
print(actual_output_4)

print("\nCase 5: Edge case - empty weights dictionary (all scores should have 0 weight)")
docs_scores_5 = {
    'doc_A': {'score1': 0.8, 'score2': 0.6},
    'doc_B': {'score1': 0.9, 'score2': 0.5}
}
weights_5 = {}
actual_output_5 = weighted_rank(docs_scores_5, weights_5)
for doc, score in actual_output_5:
    assert score == 0.0, f"Test Case 5 Failed: Score for {doc} should be 0.0, Got {score}"
assert set([doc for doc, _ in actual_output_5]) == set(docs_scores_5.keys()), f"Test Case 5 Failed: Documents mismatch"
print(actual_output_5)

print("\nCase 6: Scenarios where all scores for a document sum to zero or are equal")
docs_scores_6 = {
    'doc_X': {'s1': 0.5, 's2': -0.5},
    'doc_Y': {'s1': 0.0, 's2': 0.0},
    'doc_Z': {'s1': 1.0, 's2': 0.0}
}
weights_6 = {'s1': 1.0, 's2': 1.0}
expected_output_6 = [('doc_Z', 1.0), ('doc_X', 0.0), ('doc_Y', 0.0)]
actual_output_6 = weighted_rank(docs_scores_6, weights_6)
assert round(actual_output_6[0][1], 5) == expected_output_6[0][1], f"Test Case 6 Failed doc_Z score: Expected {expected_output_6[0][1]}, Got {actual_output_6[0][1]}"
assert round(actual_output_6[1][1], 5) == expected_output_6[1][1], f"Test Case 6 Failed doc_X score: Expected {expected_output_6[1][1]}, Got {actual_output_6[1][1]}"
assert round(actual_output_6[2][1], 5) == expected_output_6[2][1], f"Test Case 6 Failed doc_Y score: Expected {expected_output_6[2][1]}, Got {actual_output_6[2][1]}"
assert set([doc for doc, _ in actual_output_6]) == set(docs_scores_6.keys())
print(actual_output_6)

print("\nCase 7: Negative weights")
docs_scores_7 = {
    'doc_A': {'s1': 1.0, 's2': 0.5},
    'doc_B': {'s1': 0.5, 's2': 1.0}
}
weights_7 = {'s1': 1.0, 's2': -1.0}
expected_output_7 = [('doc_A', 0.5), ('doc_B', -0.5)]
actual_output_7 = weighted_rank(docs_scores_7, weights_7)
assert actual_output_7 == expected_output_7, f"Test Case 7 Failed: Expected {expected_output_7}, Got {actual_output_7}"
print(actual_output_7)

print("\nCase 8: All scores equal, check descending order and stable sort")
docs_scores_8 = {
    'doc_1': {'s': 1.0},
    'doc_2': {'s': 1.0},
    'doc_3': {'s': 1.0}
}
weights_8 = {'s': 1.0}
expected_output_8 = [('doc_1', 1.0), ('doc_2', 1.0), ('doc_3', 1.0)] # relies on stable sort
actual_output_8 = weighted_rank(docs_scores_8, weights_8)
assert actual_output_8 == expected_output_8, f"Test Case 8 Failed: Expected {expected_output_8}, Got {actual_output_8}"
print(actual_output_8)

print("\nCase 9: All scores equal with alpha_tie_break = True")
docs_scores_9 = {
    'doc_Z': {'s': 1.0},
    'doc_A': {'s': 1.0},
    'doc_M': {'s': 1.0}
}
weights_9 = {'s': 1.0}
expected_output_9 = [('doc_A', 1.0), ('doc_M', 1.0), ('doc_Z', 1.0)] # Alphabetical tie-breaking
actual_output_9 = weighted_rank(docs_scores_9, weights_9, alpha_tie_break=True)
assert actual_output_9 == expected_output_9, f"Test Case 9 Failed (alpha_tie_break): Expected {expected_output_9}, Got {actual_output_9}"
print(actual_output_9)

print("\nCase 10: Mixed scores with alpha_tie_break = True, demonstrating tie-breaking")
docs_scores_10 = {
    'doc_B': {'score': 0.7},
    'doc_A': {'score': 0.7},
    'doc_C': {'score': 0.9}
}
weights_10 = {'score': 1.0}
# Expected: doc_C first, then doc_A, then doc_B due to alphabetical tie-breaking
expected_output_10 = [('doc_C', 0.9), ('doc_A', 0.7), ('doc_B', 0.7)]
actual_output_10 = weighted_rank(docs_scores_10, weights_10, alpha_tie_break=True)
assert actual_output_10 == expected_output_10, f"Test Case 10 Failed (alpha_tie_break mixed scores): Expected {expected_output_10}, Got {actual_output_10}"
print(actual_output_10)

print("\nAll updated test cases passed!")


--- Running tests for updated weighted_rank function ---

Case 1: Basic functionality with multiple documents and scores
[('doc_B', 0.74), ('doc_A', 0.72), ('doc_C', 0.7)]

Case 2: Cases where some weights are zero
[('doc_B', 0.9), ('doc_A', 0.8)]

Case 3: Cases with missing keys in the weights dictionary (default weight should be 0)
[('doc_B', 0.9), ('doc_A', 0.8)]

Case 4: Edge case - empty docs_scores dictionary
[]

Case 5: Edge case - empty weights dictionary (all scores should have 0 weight)
[('doc_A', 0.0), ('doc_B', 0.0)]

Case 6: Scenarios where all scores for a document sum to zero or are equal
[('doc_Z', 1.0), ('doc_X', 0.0), ('doc_Y', 0.0)]

Case 7: Negative weights
[('doc_A', 0.5), ('doc_B', -0.5)]

Case 8: All scores equal, check descending order and stable sort
[('doc_1', 1.0), ('doc_2', 1.0), ('doc_3', 1.0)]

Case 9: All scores equal with alpha_tie_break = True
[('doc_A', 1.0), ('doc_M', 1.0), ('doc_Z', 1.0)]

Case 10: Mixed scores with alpha_tie_break = True, demonstra

WeightedRanker 需要将不同通道的分数映射到同一尺度（通常是 0～1），才能发挥权重意义。

下面是常见的归一化算法介绍与代码实现

**Min-Max Scaling**

This technique rescales a feature to a fixed range, usually 0 to 1. It is useful when you need to normalize data to a specific boundary.

$$
\hat{s}_i = \frac{s_i - \min(s)}{\max(s) - \min(s)}
$$

**适用场景：**

- 分数区间稳定
- 没有太多离群点

**Z-score Normalization (Standardization)**

This technique rescales data to have a mean of 0 and a standard deviation of 1. It is useful when you have features with different scales and distributions, and it is less affected by outliers than Min-Max scaling.

$$
\hat{s}_i = \frac{s_i - \mu}{\sigma}
$$

**适用场景：**

- 消除均值/方差影响
- 分数分布接近正态

In [25]:
import math

def min_max_scale(scores: dict[str, float]) -> dict[str, float]:
    """Applies Min-Max scaling to a dictionary of scores.

    Args:
        scores (dict[str, float]): A dictionary where keys are score names (or doc IDs)
                                   and values are the scores to be scaled.

    Returns:
        dict[str, float]: A new dictionary with the Min-Max scaled scores.
    """
    if not scores:
        return {}

    values = list(scores.values())
    min_val = min(values)
    max_val = max(values)

    scaled_scores = {}
    if max_val == min_val:
        # Avoid division by zero, all scores become 0.5 (mid-range) or 0 if min_val is 0
        # If all values are the same, they all map to 0.5 (or 0 if range is 0) in [0,1]
        for k, v in scores.items():
            scaled_scores[k] = 0.5 if v != 0 else 0.0 # Handles case like all scores are 0, should stay 0.
                                                      # Otherwise, if all are same non-zero, map to 0.5
    else:
        for k, v in scores.items():
            scaled_scores[k] = (v - min_val) / (max_val - min_val)

    return scaled_scores

def z_score_normalize(scores: dict[str, float]) -> dict[str, float]:
    """Applies Z-score normalization (standardization) to a dictionary of scores.

    Args:
        scores (dict[str, float]): A dictionary where keys are score names (or doc IDs)
                                   and values are the scores to be normalized.

    Returns:
        dict[str, float]: A new dictionary with the Z-score normalized scores.
    """
    if not scores:
        return {}

    values = list(scores.values())
    n = len(values)

    if n == 0:
        return {}

    mean = sum(values) / n
    # Calculate sample standard deviation (unbiased estimate)
    std_dev = math.sqrt(sum((x - mean) ** 2 for x in values) / (n - 1)) if n > 1 else 0.0

    normalized_scores = {}
    if std_dev == 0:
        # If standard deviation is zero, all values are the same. Normalize to 0.
        for k, v in scores.items():
            normalized_scores[k] = 0.0
    else:
        for k, v in scores.items():
            normalized_scores[k] = (v - mean) / std_dev

    return normalized_scores

print("Min-Max scaling and Z-score normalization functions defined.")


Min-Max scaling and Z-score normalization functions defined.


In [30]:
print("\n--- Running Normalization Function Tests ---")

# --- Min-Max Scaling Tests ---
print("\n--- Min-Max Scaling Tests ---")

print("\n1: Basic Min-Max scaling (0 to 1 range)")
scores_mm_1 = {'s1': 10.0, 's2': 20.0, 's3': 30.0}
expected_mm_1 = {'s1': 0.0, 's2': 0.5, 's3': 1.0}
actual_mm_1 = min_max_scale(scores_mm_1)
assert all(abs(actual_mm_1[k] - expected_mm_1[k]) < 1e-9 for k in expected_mm_1), f"Min-Max Test 1 Failed: Expected {expected_mm_1}, Got {actual_mm_1}"
print(actual_mm_1)

print("\n2: Min-Max scaling with negative values")
scores_mm_2 = {'s1': -5.0, 's2': 0.0, 's3': 5.0}
expected_mm_2 = {'s1': 0.0, 's2': 0.5, 's3': 1.0}
actual_mm_2 = min_max_scale(scores_mm_2)
assert all(abs(actual_mm_2[k] - expected_mm_2[k]) < 1e-9 for k in expected_mm_2), f"Min-Max Test 2 Failed: Expected {expected_mm_2}, Got {actual_mm_2}"
print(actual_mm_2)

print("\n3: Min-Max scaling with all identical values")
scores_mm_3 = {'s1': 7.0, 's2': 7.0, 's3': 7.0}
expected_mm_3 = {'s1': 0.5, 's2': 0.5, 's3': 0.5}
actual_mm_3 = min_max_scale(scores_mm_3)
assert all(abs(actual_mm_3[k] - expected_mm_3[k]) < 1e-9 for k in expected_mm_3), f"Min-Max Test 3 Failed: Expected {expected_mm_3}, Got {actual_mm_3}"
print(actual_mm_3)

print("\n4: Min-Max scaling with empty input")
scores_mm_4 = {}
expected_mm_4 = {}
actual_mm_4 = min_max_scale(scores_mm_4)
assert actual_mm_4 == expected_mm_4, f"Min-Max Test 4 Failed: Expected {expected_mm_4}, Got {actual_mm_4}"
print(actual_mm_4)

print("\n5: Min-Max scaling with a single value")
scores_mm_5 = {'s1': 100.0}
expected_mm_5 = {'s1': 0.5}
actual_mm_5 = min_max_scale(scores_mm_5)
assert all(abs(actual_mm_5[k] - expected_mm_5[k]) < 1e-9 for k in expected_mm_5), f"Min-Max Test 5 Failed: Expected {expected_mm_5}, Got {actual_mm_5}"
print(actual_mm_5)

print("\n6: Min-Max scaling with all zero values")
scores_mm_6 = {'s1': 0.0, 's2': 0.0, 's3': 0.0}
expected_mm_6 = {'s1': 0.0, 's2': 0.0, 's3': 0.0}
actual_mm_6 = min_max_scale(scores_mm_6)
assert all(abs(actual_mm_6[k] - expected_mm_6[k]) < 1e-9 for k in expected_mm_6), f"Min-Max Test 6 Failed: Expected {expected_mm_6}, Got {actual_mm_6}"
print(actual_mm_6)

# --- Z-score Normalization Tests ---
print("\n--- Z-score Normalization Tests ---")

print("\n7: Basic Z-score normalization")
scores_zs_1 = {'s1': 1.0, 's2': 2.0, 's3': 3.0, 's4': 4.0, 's5': 5.0}
# Mean = 3.0, Std Dev = 1.58113883
expected_zs_1 = {
    's1': (1.0 - 3.0) / 1.58113883,
    's2': (2.0 - 3.0) / 1.58113883,
    's3': (3.0 - 3.0) / 1.58113883,
    's4': (4.0 - 3.0) / 1.58113883,
    's5': (5.0 - 3.0) / 1.58113883
}
actual_zs_1 = z_score_normalize(scores_zs_1)
assert all(abs(actual_zs_1[k] - expected_zs_1[k]) < 1e-9 for k in expected_zs_1), f"Z-score Test 7 Failed: Expected {expected_zs_1}, Got {actual_zs_1}"
print(actual_zs_1)

print("\n8: Z-score normalization with negative values")
scores_zs_2 = {'s1': -2.0, 's2': -1.0, 's3': 0.0, 's4': 1.0, 's5': 2.0}
# Mean = 0.0, Std Dev = 1.58113883
expected_zs_2 = {
    's1': (-2.0 - 0.0) / 1.58113883,
    's2': (-1.0 - 0.0) / 1.58113883,
    's3': (0.0 - 0.0) / 1.58113883,
    's4': (1.0 - 0.0) / 1.58113883,
    's5': (2.0 - 0.0) / 1.58113883
}
actual_zs_2 = z_score_normalize(scores_zs_2)
assert all(abs(actual_zs_2[k] - expected_zs_2[k]) < 1e-9 for k in expected_zs_2), f"Z-score Test 8 Failed: Expected {expected_zs_2}, Got {actual_zs_2}"
print(actual_zs_2)

print("\n9: Z-score normalization with all identical values (std_dev = 0)")
scores_zs_3 = {'s1': 5.0, 's2': 5.0, 's3': 5.0}
expected_zs_3 = {'s1': 0.0, 's2': 0.0, 's3': 0.0}
actual_zs_3 = z_score_normalize(scores_zs_3)
assert all(abs(actual_zs_3[k] - expected_zs_3[k]) < 1e-9 for k in expected_zs_3), f"Z-score Test 9 Failed: Expected {expected_zs_3}, Got {actual_zs_3}"
print(actual_zs_3)

print("\n10: Z-score normalization with empty input")
scores_zs_4 = {}
expected_zs_4 = {}
actual_zs_4 = z_score_normalize(scores_zs_4)
assert actual_zs_4 == expected_zs_4, f"Z-score Test 10 Failed: Expected {expected_zs_4}, Got {actual_zs_4}"
print(actual_zs_4)

print("\n11: Z-score normalization with a single value")
scores_zs_5 = {'s1': 100.0}
expected_zs_5 = {'s1': 0.0}
actual_zs_5 = z_score_normalize(scores_zs_5)
assert all(abs(actual_zs_5[k] - expected_zs_5[k]) < 1e-9 for k in expected_zs_5), f"Z-score Test 11 Failed: Expected {expected_zs_5}, Got {actual_zs_5}"
print(actual_zs_5)

print("\nAll normalization function tests passed!")


--- Running Normalization Function Tests ---

--- Min-Max Scaling Tests ---

1: Basic Min-Max scaling (0 to 1 range)
{'s1': 0.0, 's2': 0.5, 's3': 1.0}

2: Min-Max scaling with negative values
{'s1': 0.0, 's2': 0.5, 's3': 1.0}

3: Min-Max scaling with all identical values
{'s1': 0.5, 's2': 0.5, 's3': 0.5}

4: Min-Max scaling with empty input
{}

5: Min-Max scaling with a single value
{'s1': 0.5}

6: Min-Max scaling with all zero values
{'s1': 0.0, 's2': 0.0, 's3': 0.0}

--- Z-score Normalization Tests ---

7: Basic Z-score normalization
{'s1': -1.2649110640673518, 's2': -0.6324555320336759, 's3': 0.0, 's4': 0.6324555320336759, 's5': 1.2649110640673518}

8: Z-score normalization with negative values
{'s1': -1.2649110640673518, 's2': -0.6324555320336759, 's3': 0.0, 's4': 0.6324555320336759, 's5': 1.2649110640673518}

9: Z-score normalization with all identical values (std_dev = 0)
{'s1': 0.0, 's2': 0.0, 's3': 0.0}

10: Z-score normalization with empty input
{}

11: Z-score normalization 

### BGE Rerank

BGE Rerank 是 BAAI（智源）发布的重排序模型，属于 Cross-Encoder Reranker。

**`m3` = Multi-lingual + Multi-task + Multi-granularity**
- Multi-lingual：中英双强
- Multi-task：检索、问答、排序
- Multi-granularity：短文本 / 长文档

**BGE Rerank 优势**

- 可本地部署
- 可离线
- 开源
- 可二次微调
- 多语言（尤其中文非常强）

**模型定位**

- 面向 RAG / 搜索 / 问答
- 用于 Top-N → Top-K 精排
- 替代商业 Rerank API（如 Cohere）

**模型结构**

- Cross-Encoder
- Transformer（BERT 类）
- 输入：`[CLS] Query [SEP] Document [SEP]`
- 输出：`relevance_score ∈ ℝ`

**典型应用案例**

1. BM25 / 向量 / RRF 召回 Top-N（20~200）
2. 使用 `bge-rerank-v2-m3` 批量打分
3. 排序，取 Top-K

### CohereRerank

Cohere Rerank 是一种 基于大模型 Cross-Encoder 的重排序服务。它对查询、候选文档进行逐对语义理解打分，然后重新排序。

**Cohere 优点**

- 无需部署，API 调用，基于 Token 收费
- 语义理解能力，对自然语言、问句、业务表达非常友好
- 对复杂、长 Query 表现好
- 能纠正 embedding / BM25 的误召回

步骤一: 安装 cohere SDK

In [13]:
# Install the Cohere library (run this in a cell first)
%pip install cohere


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


步骤二：初始化 Cohere 客户端

In [None]:
import os
import cohere

# Initialize the Cohere client with your API key
# Replace with your actual API key from the dashboard
api_key = os.getenv("COHERE_API_KEY")
co = cohere.ClientV2(api_key=api_key)

示例 1: 英文文档排序

In [8]:
# Example documents to rerank
documents = [
    "Python is a programming language",
    "The Eiffel Tower is in Paris",
    "Machine learning is a subset of artificial intelligence",
    "Dogs are popular pets",
    "Coffee is made from roasted beans",
]

# Query you want to rerank documents for
query = "What is artificial intelligence?"

# Call the rerank endpoint
results = co.rerank(
    model="rerank-english-v3.0",  # or "rerank-multilingual-v3.0" for multiple languages
    query=query,
    documents=documents,
    top_n=3  # Return top 3 most relevant documents
)

# Display results
print(f"Query: {query}\n")
print("Reranked Results:")
print("-" * 50)
for result in results.results:
    print(f"Rank: {result.index + 1}")
    print(f"Document: {documents[result.index]}")
    print(f"Relevance Score: {result.relevance_score:.4f}")
    print("-" * 50)

Query: What is artificial intelligence?

Reranked Results:
--------------------------------------------------
Rank: 3
Document: Machine learning is a subset of artificial intelligence
Relevance Score: 0.2788
--------------------------------------------------
Rank: 1
Document: Python is a programming language
Relevance Score: 0.0001
--------------------------------------------------
Rank: 4
Document: Dogs are popular pets
Relevance Score: 0.0001
--------------------------------------------------


示例2: 中文文档排序

In [9]:
chinese_documents = [
    "狗是许多家庭中受欢迎的宠物",
    "深度学习是机器学习的一个分支，使用神经网络技术",
    "法国的首都是巴黎",
    "机器学习算法从数据中学习模式",
    "猫和狗都是常见的动物",
    "人工智能正在革新技术产业",
    "Python广泛用于机器学习",
]

chinese_query = "机器学习模型"

chinese_results = co.rerank(
    model="rerank-multilingual-v3.0",
    query=chinese_query,
    documents=chinese_documents,
    top_n=3
)

print(f"Query: {chinese_query}\n")
print("Reranked Results:")
print("-" * 50)
for result in chinese_results.results:
    print(f"Rank: {result.index + 1}")
    print(f"Document: {chinese_documents[result.index]}")
    print(f"Relevance Score: {result.relevance_score:.4f}")
    print("-" * 50)

Query: 机器学习模型

Reranked Results:
--------------------------------------------------
Rank: 4
Document: 机器学习算法从数据中学习模式
Relevance Score: 0.0680
--------------------------------------------------
Rank: 2
Document: 深度学习是机器学习的一个分支，使用神经网络技术
Relevance Score: 0.0378
--------------------------------------------------
Rank: 7
Document: Python广泛用于机器学习
Relevance Score: 0.0042
--------------------------------------------------


### ColBERT

ColBERT 是由斯坦福大学未来数据系统实验室开发的一个最先进的神经网络搜索系统，在多个顶级学术会议上发表论文（SIGIR'20、TACL'21、NeurIPS'21 等）。

**核心特点**

- 快速准确的检索：能在毫秒级时间内对大规模文本集合进行基于 BERT 的搜索
- 后期交互机制：关键创新是"晚期交互"（late interaction）- 将查询和文段编码成 token 级别的嵌入矩阵，而不是单个向量，实现更细粒度的相似度匹配
- 高效扩展：使用 MaxSim 操作符进行可扩展的向量相似性计算，能在保持质量的同时处理大规模语料库

**工作流程**

1. 预处理：将数据转换为 TSV 格式（查询和文段）
2. 下载模型：获取在 MS MARCO 数据集上预训练的 ColBERT 检查点
3. 索引：对所有文段进行编码和索引以实现快速检索
4. 搜索：使用模型和索引检索每个查询的 Top-k 文段

**应用场景**

该项目适合需要高效语义搜索的应用，如信息检索、问答系统、知识库搜索等。它在准确性和速度之间取得了很好的平衡。

**示例代码**

安装依赖

```bash
pip install colbert-ai[torch,faiss-gpu]
```

下面对 ColBERT 做一个简单的使用示例，ColBERT 提供 [Colab 在线版本 Notebook](https://colab.research.google.com/github/stanford-futuredata/ColBERT/blob/main/docs/intro2new.ipynb)

```python
# ============================================================================
# 基础检索示例
# ============================================================================
from colbert.data import Queries
from colbert.infra import Run, RunConfig, ColBERTConfig
from colbert import Searcher

with Run().context(RunConfig(nranks=1, experiment="msmarco")):
    config = ColBERTConfig(root="/path/to/experiments")
    
    # 创建搜索器（使用预构建的索引）
    searcher = Searcher(index="msmarco.nbits=2", config=config)
    
    # 加载查询
    queries = Queries("/path/to/MSMARCO/queries.dev.small.tsv")
    
    # 执行搜索（检索 Top-100）
    ranking = searcher.search_all(queries, k=100)
    
    # 保存结果
    ranking.save("msmarco.nbits=2.ranking.tsv")
```

```python
# ============================================================================
# 索引构建示例
# ============================================================================
from colbert.infra import Run, RunConfig, ColBERTConfig
from colbert import Indexer

with Run().context(RunConfig(nranks=1, experiment="msmarco")):
    config = ColBERTConfig(nbits=2, root="/path/to/experiments")
    
    # 创建索引器
    indexer = Indexer(checkpoint="/path/to/checkpoint", config=config)
    
    # 对集合进行索引
    indexer.index(
        name="msmarco.nbits=2",
        collection="/path/to/MSMARCO/collection.tsv"
    )
```