In [40]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from sklearn.datasets import make_classification
from sklearn.metrics import roc_auc_score
from sklearn.metrics import accuracy_score,roc_auc_score

In [41]:
class WideAndDeep(nn.Module):
    def __init__(self, num_wide, num_deep_dim, cat_deep_dims, cross_feature_indices,  hidden_units, embedding_dim):
        """
        Args:
            num_wide: Wide部分的特征数量
            cross_feature_indices: 需要交叉的特征索引列表，格式 [(i,j), (k,l), ...]
            num_deep_dim: Deep部分的数值型特征数量
            cat_deep_dims: Deep部分的类别特征维度列表
            hidden_units: 深度网络隐藏层维度列表
            embedding_dim: 嵌入维度
        """
        super(WideAndDeep, self).__init__()

        #wide 部分
        self.cross_indices = cross_feature_indices
        self.wide = nn.Linear(num_wide + len(cross_feature_indices), 1)

        #deep 部分
        self.embeddings = nn.ModuleList([
            nn.Embedding(dim, embedding_dim) 
            for dim in cat_deep_dims
        ])

        # 计算Deep部分输入维度
        deep_input_dim = num_deep_dim + len(cat_deep_dims) * embedding_dim

        #Deep 部分全连接层
        self.dnn = nn.Sequential()

        for i,unit in enumerate(hidden_units):
            self.dnn.add_module(
                name = f"fc_{i}",
                module = nn.Linear(deep_input_dim, unit)
            )

            self.dnn.add_module(
                name = f"relu_{i}",
                module = nn.ReLU()
            )

            deep_input_dim = unit

        #最终组合层
        self.final = nn.Linear(deep_input_dim + 1, 1) #wide部分和deep部分的输出相加

    def create_cross_features(self, x):
        """动态生成交叉特征"""
        cross_features = []
        for i, j in self.cross_indices:
            feature = x[:, i] * x[:, j]
            cross_features.append(feature.unsqueeze(1))

        return torch.cat(cross_features, dim=1)

    
    def forward(self, num_x, deep_x ,cat_x,):
        """
        Args:
            num_x: wide部分数值型特征
            deep_x: deep网络数值型特征
            cat_x: deep网络类别型特征
        """
        num_x = num_x.float()       # 确保转为float32
        deep_x = deep_x.float()     # 确保转为float32

        cross_features = self.create_cross_features(num_x)
        wide_input = torch.cat([num_x, cross_features], dim=1)
        wide_output = self.wide(wide_input)

        #deep部分
        embeds = [] 
        for i in range(len(cat_x[0])):
            embed = self.embeddings[i](cat_x[:, i])
            embeds.append(embed)
        
        deep_input = torch.cat(embeds, dim=1)
        deep_input = torch.cat([deep_input, deep_x], dim=1) #数值型特征和类别型特征拼接
        deep_output = self.dnn(deep_input)

        #wide和deep部分输出相加
        output = torch.cat([wide_output, deep_output], dim=1)
        output = self.final(output) #[batch_size,1]
        return torch.sigmoid(output).squeeze()




In [47]:
# 配置参数
config = {
    'num_features': 2,       # num特征数量
    'num_deep_features': 10,   # deep的数值型特征数量
    'cross_features': [(0,1)],  # 需要交叉的特征索引对
    'cat_deep_dims': [100, 50],   # Deep部分类别特征的唯一值数量
    'hidden_units': [64, 32],
    'embedding_dim': 8,
    'batch_size': 32,
    'lr': 0.001,
    'epochs': 50
}


# 数据示例（假设输入已经是one-hot编码）
class Dataset(torch.utils.data.Dataset):
    def __init__(self, num_samples):
        X,y = make_classification(
            n_samples=num_samples,
            n_features=config['num_deep_features'],
            n_informative= config['num_deep_features']//2,
            n_redundant= 2,
            n_classes = 2,
        )
        #随机选取部分数值型特征
        select_features_number = torch.randperm(config['num_deep_features'])[:config['num_features']] 
        self.wide_data = X[:,select_features_number]
        self.num_deep_data = X
        
        # 随机生成分类deep数据
        self.cat_deep_data = torch.cat([
            torch.randint(0, dim, (num_samples, 1)) for dim in config['cat_deep_dims']
        ], dim=1)

        self.labels = y.astype(np.float32)
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.wide_data[idx],self.num_deep_data[idx], self.cat_deep_data[idx], self.labels[idx]


In [48]:
# 初始化模型
model = WideAndDeep(
    num_wide=config['num_features'],
    num_deep_dim=  config['num_deep_features'],
    cat_deep_dims=config['cat_deep_dims'],
    cross_feature_indices=config['cross_features'],
    hidden_units=config['hidden_units'],
    embedding_dim=config['embedding_dim']
)

In [49]:

# 训练流程
train_data = Dataset(1000)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=config['batch_size'], shuffle=True)

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=config['lr'])

pbar = tqdm(range(config['epochs']))

for epoch in pbar:
    model.train()
    total_loss = 0
    for wide_x, deep_x, cat_deep_x, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(wide_x, deep_x, cat_deep_x)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        accuracy = accuracy_score(labels, (outputs > 0.95).int())
    pbar.set_postfix({'train_loss': f'{total_loss/len(train_loader):.4f}', 'train_accuracy': f'{accuracy:.4f}'})

100%|██████████| 50/50 [00:01<00:00, 29.67it/s, train_loss=0.0031, train_accuracy=1.0000]
