<a href="https://colab.research.google.com/github/iamjimmycai/five_star/blob/main/ziln.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import LogNormal

def zero_inflated_lognormal_pred(logits: torch.Tensor) -> torch.Tensor:
    """计算零膨胀对数正态分布的预测均值。

    参数:
        logits: [batch_size, 3] 的 logits 张量。

    返回:
        preds: [batch_size, 1] 的预测均值张量。
    """
    logits = logits.float()
    positive_probs = torch.sigmoid(logits[..., :1])
    loc = logits[..., 1:2]
    scale = F.softplus(logits[..., 2:])
    preds = positive_probs * torch.exp(loc + 0.5 * torch.square(scale))
    return preds

def zero_inflated_lognormal_loss(labels: torch.Tensor,
                                 logits: torch.Tensor) -> torch.Tensor:
    """计算零膨胀对数正态分布的损失。

    参数:
        labels: 真实目标，形状为 [batch_size, 1] 的张量。
        logits: 输出层的 logits，形状为 [batch_size, 3] 的张量。

    返回:
        零膨胀对数正态分布的损失值。
    """
    labels = labels.float()
    positive = (labels > 0).float()

    logits = logits.float()
    positive_logits = logits[..., :1]
    classification_loss = F.binary_cross_entropy_with_logits(positive_logits, positive)

    loc = logits[..., 1:2]
    scale = torch.maximum(F.softplus(logits[..., 2:]), torch.sqrt(torch.tensor(torch.finfo(torch.float32).eps)))
    safe_labels = positive * labels + (1 - positive) * torch.ones_like(labels)

    regression_loss = -torch.mean(positive * LogNormal(loc, scale).log_prob(safe_labels), dim=-1)

    return classification_loss + regression_loss


In [None]:
import numpy as np
import pandas as pd
from sklearn import metrics
from typing import Sequence

def cumulative_true(y_true: Sequence[float], y_pred: Sequence[float]) -> np.ndarray:
    """计算根据预测排序的生命周期值的累积和。

    参数:
        y_true: 真实的生命周期值。
        y_pred: 预测的生命周期值。

    返回:
        res: 根据预测排序的生命周期值的累积和。
    """
    df = pd.DataFrame({
        'y_true': y_true,
        'y_pred': y_pred,
    }).sort_values(by='y_pred', ascending=False)

    return (df['y_true'].cumsum() / df['y_true'].sum()).values

def gini_from_gain(df: pd.DataFrame) -> pd.DataFrame:
    """计算增益图的Gini系数。

    参数:
        df: 每列包含一个增益图。第一列必须是真实值。

    返回:
        gini_result: 包含原始和标准化Gini系数的两列数据框。
    """
    raw = df.apply(lambda x: 2 * x.sum() / df.shape[0] - 1.)
    normalized = raw / raw[0]
    return pd.DataFrame({
        'raw': raw,
        'normalized': normalized
    })[['raw', 'normalized']]

def _normalized_rmse(y_true, y_pred):
    return np.sqrt(metrics.mean_squared_error(y_true, y_pred)) / y_true.mean()

def _normalized_mae(y_true, y_pred):
    return metrics.mean_absolute_error(y_true, y_pred) / y_true.mean()

def _aggregate_fn(df):
    return pd.Series({
        'label_mean': np.mean(df['y_true']),
        'pred_mean': np.mean(df['y_pred']),
        'normalized_rmse': _normalized_rmse(df['y_true'], df['y_pred']),
        'normalized_mae': _normalized_mae(df['y_true'], df['y_pred']),
    })

def decile_stats(y_true: Sequence[float], y_pred: Sequence[float]) -> pd.DataFrame:
    """计算分位数级别的均值和误差。

    该函数首先根据排序后的 `y_pred` 将样本划分为十个等大小的桶，并计算每个桶中的聚合指标。

    参数:
        y_true: 真实标签。
        y_pred: 预测标签。

    返回:
        df: 分位数级别的统计数据。
    """
    num_buckets = 10
    decile = pd.qcut(y_pred, q=num_buckets, labels=['%d' % i for i in range(num_buckets)])

    df = pd.DataFrame({
        'y_true': y_true,
        'y_pred': y_pred,
        'decile': decile,
    }).groupby('decile').apply(_aggregate_fn)

    df['decile_mape'] = np.abs(df['pred_mean'] - df['label_mean']) / df['label_mean']
    return df


In [None]:
import os
import lifetime_value as ltv
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn import model_selection, preprocessing
from torch.utils.data import DataLoader, TensorDataset

from .utilities import *

def preprocess(data, target_ltv, day1_purchaseAmt_col="", numerical_features=[], categorical_features=[], sample_data=1, testing_size=0.2):
    """
    数据预处理
    """
    if isinstance(data, str):
        path, file_type = os.path.splitext(data)
        if file_type == ".csv":
            data = pd.read_csv(data)
        elif file_type == ".parquet":
            data = pd.read_parquet(data, engine='pyarrow')

    if 0 > sample_data or sample_data > 1:
        raise ValueError("Error - `sample`必须是0和1之间的值，表示用于建模的数据百分比。默认使用100%的数据")

    if 0 >= testing_size or testing_size >= 1:
        raise ValueError("Error - `testing_size`必须是0和1之间的值，表示用于测试的数据百分比。默认使用20%")

    data = data.sample(frac=sample_data, random_state=123)

    feature_map = {}

    if categorical_features == []:
        categorical_features = data.select_dtypes(["object"]).columns.tolist()

    if numerical_features == []:
        numerical_features = data.select_dtypes(["number"]).columns.tolist()
        numerical_features.remove(target_ltv)

    feature_map["categorical_features"] = categorical_features
    feature_map["numerical_features"] = numerical_features
    feature_map["target"] = target_ltv
    feature_map["day1_purchaseAmt_col"] = day1_purchaseAmt_col

    data = data[categorical_features + numerical_features + [target_ltv, day1_purchaseAmt_col]]

    if data[target_ltv].dtype != "float32":
        data[target_ltv] = data[target_ltv].astype("float32")

    for col in categorical_features:
        encoder = preprocessing.LabelEncoder()
        encoder.fit(data[col])
        levels = encoder.classes_
        feature_map[col] = {levels[i]: i for i in range(len(levels))}
        data[col] = encoder.transform(data[col])

    y0 = data[day1_purchaseAmt_col].values

    if day1_purchaseAmt_col not in numerical_features:
        print(day1_purchaseAmt_col + "未包含在numerical_features中，已移除")
        del data[day1_purchaseAmt_col]

    train, test, y0_train, y0_test = model_selection.train_test_split(data, y0, test_size=testing_size, random_state=123)
    x_train = feature_dict(train, numerical_features, categorical_features)
    y_train = train[target_ltv].values

    x_test = feature_dict(test, numerical_features, categorical_features)
    y_test = test[target_ltv].values

    return feature_map, x_train, x_test, y_train, y_test, y0_test

class DNNModel(nn.Module):
    def __init__(self, feature_map, layers):
        super(DNNModel, self).__init__()
        self.numeric_features = feature_map["numerical_features"]
        self.categorical_features = feature_map["categorical_features"]

        self.embeddings = nn.ModuleList([
            nn.Embedding(len(feature_map[key]), embedding_dim=10)  # 你可以根据需要调整embedding_dim
            for key in self.categorical_features
        ])
        input_dim = len(self.numeric_features) + len(self.categorical_features) * 10  # 假设embedding_dim为10
        self.dnn_layers = nn.Sequential(
            *[nn.Linear(input_dim, i) for i in layers],
            nn.Linear(layers[-1], 3)
        )

    def forward(self, numeric_input, categorical_inputs):
        embeddings = [embedding(input) for embedding, input in zip(self.embeddings, categorical_inputs)]
        embeddings = torch.cat(embeddings, dim=1)
        deep_input = torch.cat([numeric_input, embeddings], dim=1)
        output = self.dnn_layers(deep_input)
        return output

def fit_model(feature_map, x_train, y_train, x_test, y_test,
              layers=[64, 32, 3], epochs=100, learning_rate=0.0001, batch_size=1024,
              callback_patience=20, callback_lr=1e-06, verbose=0, log_directory_name="logs"):

    model = DNNModel(feature_map, layers)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = ltv.zero_inflated_lognormal_loss  # 替换为你定义的损失函数

    train_dataset = TensorDataset(torch.tensor(x_train), torch.tensor(y_train))
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    test_dataset = TensorDataset(torch.tensor(x_test), torch.tensor(y_test))
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    for epoch in range(epochs):
        model.train()
        for batch_x, batch_y in train_loader:
            optimizer.zero_grad()
            output = model(batch_x[:, :len(feature_map["numerical_features"])],
                           [batch_x[:, i] for i in range(len(feature_map["categorical_features"]))])
            loss = criterion(output, batch_y)
            loss.backward()
            optimizer.step()

        if verbose:
            print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item()}')

    return model

def model_predict(model, data, feature_map, print_performance=True):
    if isinstance(data, str):
        path, file_type = os.path.splitext(data)
        if file_type == ".csv":
            data = pd.read_csv(data)
        elif file_type == ".parquet":
            data = pd.read_parquet(data, engine='pyarrow')

    all_variables = feature_map["categorical_features"] + feature_map["numerical_features"] + [feature_map["target"], feature_map["day1_purchaseAmt_col"]]

    for col in all_variables:
        if col not in data.columns:
            raise ValueError("Error -" + col + " 列在`data`中未找到。请保持所有列名与建模时使用的列名一致")

    data = data[all_variables]
    if data[feature_map["target"]].dtype != "float32":
        data[feature_map["target"]] = data[feature_map["target"]].astype("float32")

    for cat in feature_map["categorical_features"]:
        levels = list(feature_map[cat].keys())
        data[cat] = data[cat].apply(lambda t: t if t in levels else 'UNDEFINED')
        data[cat] = data[cat].apply(lambda t: feature_map[cat][t])

    y0 = data[feature_map["day1_purchaseAmt_col"]].values

    x_test = feature_dict(data, feature_map["numerical_features"], feature_map["categorical_features"])
    x_test = {feat: torch.tensor(np.array(x_test[feat])) for feat in x_test.keys()}

    with torch.no_grad():
        logits = model(x_test['numeric'], [x_test[key] for key in feature_map["categorical_features"]])

    ltv_pred = ltv.zero_inflated_lognormal_pred(logits).numpy().flatten()
    churn_predictions = torch.sigmoid(logits[..., :1]).numpy().flatten()

    df = pd.DataFrame({
        'churn_predictions': churn_predictions,
        'ltv_prediction': ltv_pred
    })

    if print_performance:
        metrics, preds = ltv_performance(model, x_test, data[feature_map["target"]].values, y0)
        print(metrics.transpose())

    return df
