<a href="https://colab.research.google.com/github/Ericfeng84/Leads_attribution/blob/main/Leads_attriution_vectorized.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import pandas as pd
import numpy as np # 我们会用到numpy的一些功能
from datetime import datetime, timedelta

# --- 模拟数据  ---
leads_data = {
    'lead_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
    'customer_id': [101, 101, 101, 102, 102, 102, 103, 103, 103, 101, 101, 104],
    'lead_timestamp': [
        '2023-01-01 10:00:00', '2023-01-15 12:00:00', '2023-03-05 14:00:00', # Cust 101
        '2023-02-01 09:00:00', '2023-02-10 11:00:00', '2023-04-10 16:00:00', # Cust 102 (lead 6 is >90 days before order)
        '2023-05-01 08:00:00', '2023-05-15 10:00:00', '2023-05-20 12:00:00', # Cust 103
        '2023-06-01 10:00:00', '2023-08-20 11:00:00', # Cust 101 (for second order)
        '2023-09-01 10:00:00'  # Cust 104 (no order)
    ],
    'lead_source': ['Email', 'Social', 'Search', 'Referral', 'Display', 'Email', 'Social', 'Search', 'Email', 'Organic', 'PPC', 'Direct']
}
leads_df = pd.DataFrame(leads_data)
leads_df['lead_timestamp'] = pd.to_datetime(leads_df['lead_timestamp'])

orders_data = {
    'order_id': [1001, 1002, 1003, 1004],
    'customer_id': [101, 102, 103, 101],
    'order_timestamp': [
        '2023-03-10 10:00:00', # Cust 101 Order 1
        '2023-03-15 12:00:00', # Cust 102 Order 1
        '2023-05-25 14:00:00', # Cust 103 Order 1
        '2023-08-25 18:00:00'  # Cust 101 Order 2
    ],
    'order_value': [100, 150, 200, 120]
}
orders_df = pd.DataFrame(orders_data)
orders_df['order_timestamp'] = pd.to_datetime(orders_df['order_timestamp'])

# --- 合并和筛选 (与之前相同) ---
merged_df = pd.merge(orders_df, leads_df, on='customer_id', how='left')
merged_df = merged_df[merged_df['lead_timestamp'] <= merged_df['order_timestamp']]
merged_df['time_diff_to_order'] = merged_df['order_timestamp'] - merged_df['lead_timestamp']
merged_df = merged_df[merged_df['time_diff_to_order'] <= timedelta(days=90)]

valid_touchpoints_df = merged_df.dropna(subset=['lead_id']).copy()
# 关键：确保排序，这对于后续的 rank, cumcount 等操作至关重要
valid_touchpoints_df = valid_touchpoints_df.sort_values(by=['order_id', 'lead_timestamp']).reset_index(drop=True)
print("--- 有效触点 (排序后) ---")
valid_touchpoints_df


--- 有效触点 (排序后) ---


Unnamed: 0,order_id,customer_id,order_timestamp,order_value,lead_id,lead_timestamp,lead_source,time_diff_to_order
0,1001,101,2023-03-10 10:00:00,100,1,2023-01-01 10:00:00,Email,68 days 00:00:00
1,1001,101,2023-03-10 10:00:00,100,2,2023-01-15 12:00:00,Social,53 days 22:00:00
2,1001,101,2023-03-10 10:00:00,100,3,2023-03-05 14:00:00,Search,4 days 20:00:00
3,1002,102,2023-03-15 12:00:00,150,4,2023-02-01 09:00:00,Referral,42 days 03:00:00
4,1002,102,2023-03-15 12:00:00,150,5,2023-02-10 11:00:00,Display,33 days 01:00:00
5,1003,103,2023-05-25 14:00:00,200,7,2023-05-01 08:00:00,Social,24 days 06:00:00
6,1003,103,2023-05-25 14:00:00,200,8,2023-05-15 10:00:00,Search,10 days 04:00:00
7,1003,103,2023-05-25 14:00:00,200,9,2023-05-20 12:00:00,Email,5 days 02:00:00
8,1004,101,2023-08-25 18:00:00,120,10,2023-06-01 10:00:00,Organic,85 days 08:00:00
9,1004,101,2023-08-25 18:00:00,120,11,2023-08-20 11:00:00,PPC,5 days 07:00:00


In [3]:
def calculate_attribution_vectorized(df_input, model_type, time_decay_half_life_days=7, position_based_weights=(0.4, 0.2, 0.4)):
    """
    使用向量化操作计算归因分数

    Args:
        df_input (pd.DataFrame): 包含 'order_id', 'lead_id', 'lead_timestamp', 'order_timestamp', 'time_diff_to_order'
                                 已排序 (order_id, lead_timestamp) 且已筛选的有效触点。
        model_type (str): 'last_touch', 'first_touch', 'linear', 'time_decay', 'position_based'.
        time_decay_half_life_days (int): 时间衰减半衰期.
        position_based_weights (tuple): 位置模型权重 (first, middle_sum, last).

    Returns:
        pd.DataFrame: 包含 'order_id', 'lead_id', 'lead_source', 'attribution_score'.
    """
    if df_input.empty:
        print(f"警告: 对于模型 {model_type}, 没有有效的线索-订单对进行归因。")
        return pd.DataFrame(columns=['order_id', 'lead_id', 'lead_source', 'attribution_score'])

    df = df_input.copy() # 操作副本以避免修改原始DataFrame
    df['attribution_score'] = 0.0 # 初始化

    if model_type == 'last_touch':
        # 标记每个订单组内的最后一个触点
        # 使用 transform 获取每个组内最大时间戳，然后比较
        df['is_last_touch'] = df['lead_timestamp'] == df.groupby('order_id')['lead_timestamp'].transform('max')
        # 如果有多个触点时间戳相同且都是最后，都标记为True。通常我们只取一个。
        # 更稳健的方法是使用 rank 或者 cumcount 倒序
        # 这里我们假设 lead_timestamp 在一个 order_id 内是唯一的，或者取其中一个即可
        # 或者，更精确地找到最后一个的索引
        last_touch_indices = df.groupby('order_id')['lead_timestamp'].idxmax()
        df.loc[last_touch_indices, 'attribution_score'] = 1.0

    elif model_type == 'first_touch':
        # 标记每个订单组内的第一个触点
        first_touch_indices = df.groupby('order_id')['lead_timestamp'].idxmin()
        df.loc[first_touch_indices, 'attribution_score'] = 1.0

    elif model_type == 'linear':
        # 计算每个订单组内的触点数量
        df['touchpoints_in_order'] = df.groupby('order_id')['lead_id'].transform('count')
        df['attribution_score'] = 1.0 / df['touchpoints_in_order']
        df.drop(columns=['touchpoints_in_order'], inplace=True) # 清理辅助列

    elif model_type == 'time_decay':
        df['days_before_conversion'] = df['time_diff_to_order'].dt.days
        df['raw_weight'] = 2 ** (-df['days_before_conversion'] / time_decay_half_life_days)

        # 计算每个订单组内的权重总和
        df['sum_raw_weights_per_order'] = df.groupby('order_id')['raw_weight'].transform('sum')

        # 计算归因分数，处理总权重为0的情况（例如所有触点都在转化当天且半衰期很小，导致权重为0）
        df['attribution_score'] = np.where(
            df['sum_raw_weights_per_order'] > 0,
            df['raw_weight'] / df['sum_raw_weights_per_order'],
            0 # 如果总权重为0，则分数也为0。或者可以回退到线性模型
        )
        # 如果需要回退到线性模型：
        if (df['sum_raw_weights_per_order'] == 0).any():
            print("警告: 时间衰减模型中存在总权重为0的订单组，将对这些组应用线性归因。")
            linear_fallback_mask = df['sum_raw_weights_per_order'] == 0
            df.loc[linear_fallback_mask, 'touchpoints_in_order_fallback'] = df[linear_fallback_mask].groupby('order_id')['lead_id'].transform('count')
            df.loc[linear_fallback_mask, 'attribution_score'] = 1.0 / df.loc[linear_fallback_mask, 'touchpoints_in_order_fallback']
            df.drop(columns=['touchpoints_in_order_fallback'], inplace=True, errors='ignore')


        df.drop(columns=['days_before_conversion', 'raw_weight', 'sum_raw_weights_per_order'], inplace=True)

    elif model_type == 'position_based':
        first_w, middle_sum_w, last_w = position_based_weights

        df['group_rank_asc'] = df.groupby('order_id').cumcount() # 从0开始的排名
        df['group_rank_desc'] = df.groupby('order_id').cumcount(ascending=False) # 从0开始的倒序排名
        df['group_size'] = df.groupby('order_id')['lead_id'].transform('size')

        # 条件列表
        conditions = [
            df['group_size'] == 1,  # 只有一个触点
            (df['group_size'] == 2) & (df['group_rank_asc'] == 0), # 两个触点，第一个
            (df['group_size'] == 2) & (df['group_rank_asc'] == 1), # 两个触点，第二个
            (df['group_size'] >= 3) & (df['group_rank_asc'] == 0), # >=3个触点，第一个
            (df['group_size'] >= 3) & (df['group_rank_desc'] == 0), # >=3个触点，最后一个
            (df['group_size'] >= 3) & (df['group_rank_asc'] != 0) & (df['group_rank_desc'] != 0) # >=3个触点，中间的
        ]

        # 对应的值列表
        # 对于两个触点，我们按首尾权重比例分配并归一化
        two_touch_total_w = first_w + last_w
        two_touch_first_score = first_w / two_touch_total_w if two_touch_total_w > 0 else 0.5
        two_touch_last_score = last_w / two_touch_total_w if two_touch_total_w > 0 else 0.5

        choices = [
            1.0, # 单个触点
            two_touch_first_score, # 两个触点，第一个
            two_touch_last_score,  # 两个触点，第二个
            first_w, # >=3个触点，第一个
            last_w,  # >=3个触点，最后一个
            middle_sum_w / (df['group_size'] - 2) # >=3个触点，中间的 (这是一个Series，np.select会正确处理)
        ]

        df['attribution_score'] = np.select(conditions, choices, default=0.0)

        # 对于中间权重，如果 group_size - 2 为 0 (即 group_size=2)，会导致除以零。
        # 上述条件已经处理了 group_size=2 的情况，所以这里是安全的。
        # 但如果 middle_sum_w / (df['group_size'] - 2) 结果为 NaN 或 inf (例如 group_size=2 时)，需要处理。
        # np.select 的 choices 可以是 Series，所以 middle_sum_w / (df['group_size'] - 2) 会按行计算。
        # 我们需要确保分母不为0。
        # (df['group_size'] - 2) 在 group_size < 2 时会是负数或0。
        # 我们已经通过条件 (df['group_size'] >= 3) 保证了 df['group_size'] - 2 >= 1。

        df.drop(columns=['group_rank_asc', 'group_rank_desc', 'group_size'], inplace=True)

    else:
        raise ValueError(f"未知的模型类型: {model_type}")

    return df[['order_id', 'lead_id', 'lead_source', 'attribution_score']]

# --- 测试向量化模型 ---
print("\n--- 最终互动模型 (Last Touch) - Vectorized ---")
last_touch_v = calculate_attribution_vectorized(valid_touchpoints_df, 'last_touch')
print(last_touch_v[last_touch_v['attribution_score'] > 0])

print("\n--- 首次互动模型 (First Touch) - Vectorized ---")
first_touch_v = calculate_attribution_vectorized(valid_touchpoints_df, 'first_touch')
print(first_touch_v[first_touch_v['attribution_score'] > 0])

print("\n--- 线性模型 (Linear) - Vectorized ---")
linear_v = calculate_attribution_vectorized(valid_touchpoints_df, 'linear')
print(linear_v)

print("\n--- 时间衰减模型 (Time Decay, 半衰期7天) - Vectorized ---")
time_decay_v = calculate_attribution_vectorized(valid_touchpoints_df, 'time_decay', time_decay_half_life_days=7)
print(time_decay_v)

print("\n--- 位置模型 (Position-Based / U-Shaped, 40%-20%-40%) - Vectorized ---")
position_based_v = calculate_attribution_vectorized(valid_touchpoints_df, 'position_based', position_based_weights=(0.4, 0.2, 0.4))
print(position_based_v)

# --- 汇总函数 (与之前相同) ---
def aggregate_attribution_by_source(attribution_df, orders_df_with_value):
    order_values = orders_df_with_value[['order_id', 'order_value']].drop_duplicates(subset=['order_id'])
    attributed_values = pd.merge(attribution_df, order_values, on='order_id', how='left')
    attributed_values['attributed_value'] = attributed_values['attribution_score'] * attributed_values['order_value']
    summary = attributed_values.groupby('lead_source').agg(
        total_attribution_score=('attribution_score', 'sum'),
        total_attributed_value=('attributed_value', 'sum'),
        attributed_conversions=('order_id', lambda x: x.nunique())
    ).sort_values(by='total_attributed_value', ascending=False)
    return summary

print("\n--- 线性模型 - Vectorized - 按渠道汇总功劳 ---")
linear_summary_v = aggregate_attribution_by_source(linear_v, orders_df)
print(linear_summary_v)



--- 最终互动模型 (Last Touch) - Vectorized ---
   order_id  lead_id lead_source  attribution_score
2      1001        3      Search                1.0
4      1002        5     Display                1.0
7      1003        9       Email                1.0
9      1004       11         PPC                1.0

--- 首次互动模型 (First Touch) - Vectorized ---
   order_id  lead_id lead_source  attribution_score
0      1001        1       Email                1.0
3      1002        4    Referral                1.0
5      1003        7      Social                1.0
8      1004       10     Organic                1.0

--- 线性模型 (Linear) - Vectorized ---
   order_id  lead_id lead_source  attribution_score
0      1001        1       Email           0.333333
1      1001        2      Social           0.333333
2      1001        3      Search           0.333333
3      1002        4    Referral           0.500000
4      1002        5     Display           0.500000
5      1003        7      Social           0.33

## 向量化实现的关键点解释

为了提高Pandas在归因计算中的效率，我们采用了向量化操作，避免了显式的Python `for`循环。以下是实现这些向量化操作时所依赖的关键Pandas和NumPy功能：

1.  **预排序 (`sort_values`)**:
    *   在调用核心归因计算函数（如 `calculate_attribution_vectorized`）之前，对包含有效触点的DataFrame (`valid_touchpoints_df`) **必须**按照分组键（如 `order_id`）和时间戳（如 `lead_timestamp`）进行排序。
    *   **重要性**: 这一步是后续操作正确性的基础。它确保了像 `cumcount`（累积计数）、`idxmin`（最小值索引）、`idxmax`（最大值索引）等方法能够准确地识别出每个订单转化路径中的第一个、最后一个以及中间的触点。

2.  **`groupby().transform(func)`**:
    *   这是一个非常强大的Pandas操作，它允许我们在分组数据上执行计算，并将结果广播（broadcast）回原始DataFrame的形状。这意味着计算结果会与原始数据的每一行对齐，方便进行行级别的比较和赋值。
    *   **常用函数**:
        *   `transform('max')`: 获取组内最大值。
        *   `transform('min')`: 获取组内最小值。
        *   `transform('count')`: 获取组内非空元素数量。
        *   `transform('size')`: 获取组内总元素数量（包括空值）。
        *   `transform('sum')`: 获取组内元素的总和。
    *   **示例**: `df.groupby('order_id')['lead_timestamp'].transform('max')` 会为 `df` 的每一行生成一个新列。该列中每一行的值，都是其所属 `order_id` 分组内所有 `lead_timestamp` 中的最大值。这使得我们可以在不改变DataFrame结构的情况下，为每一行赋予其所在组的聚合信息。

3.  **`groupby().idxmax()` 和 `groupby().idxmin()`**:
    *   这两个方法分别返回每个分组中，指定列达到最大值或最小值的**第一个出现位置的索引标签**。
    *   **应用**: 我们可以结合 `.loc` 索引器，使用这些返回的索引直接定位到特定行并修改其值。例如，在“最终互动模型”中，用 `idxmax()` 找到每个订单最后一个触点的索引，然后用 `.loc` 将该行的归因分数设为1.0。

4.  **`groupby().cumcount()`**:
    *   `cumcount()` (cumulative count) 为每个分组内的行进行从0开始的累积计数。
    *   `cumcount(ascending=False)` 则会进行倒序的累积计数（即组内最后一个元素为0，倒数第二个为1，依此类推）。
    *   **应用**: 这对于“位置模型”至关重要。通过正向和反向的 `cumcount`，我们可以轻松识别出每个转化路径中的第一个触点（`cumcount() == 0`）、最后一个触点（`cumcount(ascending=False) == 0`）以及中间的触点。

5.  **`np.select(conditions_list, choices_list, default)`**:
    *   这是NumPy库中的一个函数，提供了一种基于多条件进行选择性赋值的高效方法，可以看作是向量化的 `if-elif-else` 语句。
    *   `conditions_list`: 一个由布尔型Series（或数组）组成的列表。每个布尔Series代表一个条件。
    *   `choices_list`: 一个与 `conditions_list` 长度相同的列表。`choices_list[i]` 是当 `conditions_list[i]` 为 `True` 时所赋的值。这些值可以是标量，也可以是与DataFrame行数一致的Series（或数组），`np.select` 会智能地进行元素级别的赋值。
    *   `default`: 当所有条件都不满足时，赋予的默认值。
    *   **应用**: 在“位置模型”中，我们需要根据触点在路径中的位置（首、中、尾）以及路径的总长度（1个触点、2个触点、3个或更多触点）来分配不同的权重。`np.select` 可以清晰、高效地处理这些复杂的条件逻辑。

6.  **算术运算 (Vectorized Arithmetic)**:
    *   Pandas Series 和 DataFrame 支持直接的向量化算术运算。例如，`1.0 / df['touchpoints_in_order']` 会对 `df['touchpoints_in_order']` Series 中的每个元素执行除法操作，而无需编写循环。这利用了NumPy底层的优化。

7.  **辅助列清理 (`drop`)**:
    *   在进行向量化计算时，通常会创建一些临时的辅助列来存储中间结果（例如，每个订单的触点总数 `touchpoints_in_order`，或每个触点的排名 `group_rank_asc`）。
    *   **最佳实践**: 在这些辅助列完成其使命后，应使用 `df.drop(columns=['col_name'], inplace=True)` 及时将其删除，以保持DataFrame的整洁，并避免不必要的内存占用。

通过组合使用这些向量化技术，我们可以显著提升归因计算的性能，尤其是在处理大规模数据集时，同时也能使代码更加简洁和“Pandas化”。
