In [1]:
import torch
import numpy as np

from model_components import FM, LinearPart, DNN, DeepFM
from deepctr_torch.layers.core import DNN as DNN_deepctr
from utils import seed_everything
from deepctr_torch.inputs import SparseFeat, DenseFeat, build_input_features
from deepctr_torch.models.deepfm import DeepFM as OfficialDeepFM

In [2]:
#시드 고정
seed_everything(42)

1. FM 비교

In [None]:
embed_stack = torch.randn(2, 3, 4, dtype=torch.float32)

fm_layer = FM()
fm_out = fm_layer(embed_stack)

# deepctr의 FM과 원본 식 비교
square_of_sum = torch.pow(torch.sum(embed_stack, dim=1, keepdim=True), 2)
sum_of_square = torch.sum(embed_stack * embed_stack, dim=1, keepdim=True)
cross_term = square_of_sum - sum_of_square
cross_term = 0.5 * torch.sum(cross_term, dim=2, keepdim=False)

assert torch.allclose(fm_out, cross_term, atol=1e-6), "불일치!"

print("embed_stack shape :", embed_stack.shape)
print("FM output :", fm_out.squeeze(-1).squeeze(-1))
print("Manual output :", cross_term.squeeze(-1).squeeze(-1))

embed_stack shape : torch.Size([2, 3, 4])
FM output         : tensor([2.2455, 0.4180])
Manual output     : tensor([2.2455, 0.4180])


2. DNN 비교

In [None]:
input_dim = 16
hidden_units = (32, 8)
activation = "relu"
dropout_rate = 0.0
use_bn = False
device = "cpu"

torch.manual_seed(0)

dnn_dc = DNN_deepctr(
    input_dim,
    hidden_units=hidden_units,
    activation=activation,
    dropout_rate=dropout_rate,
    use_bn=use_bn,
    device=device,
).to(device)

dnn_my = DNN(
    input_dim,
    hidden_units=hidden_units,
    activation=activation,
    dropout_rate=dropout_rate,
    use_bn=use_bn,
).to(device)

for src, tgt in zip(dnn_dc.linears, dnn_my.linears):
    tgt.weight.data.copy_(src.weight.data)
    tgt.bias.data.copy_(src.bias.data)

if use_bn:
    for src, tgt in zip(dnn_dc.bn, dnn_my.bn):
        for attr in ["weight", "bias", "running_mean", "running_var"]:
            getattr(tgt, attr).data.copy_(getattr(src, attr).data)

X = torch.randn(5, input_dim, device=device)   # batch=5

out_dc = dnn_dc(X)
out_my = dnn_my(X)

print("deepctr output :", out_dc)
print("custom  output :", out_my)
print("max |diff|     :", (out_dc - out_my).abs().max().item())

assert torch.allclose(out_dc, out_my, atol=1e-6, rtol=1e-6), "불일치!"

deepctr output : tensor([[0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000],
        [0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000],
        [0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000],
        [0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000],
        [0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000]],
       grad_fn=<ReluBackward0>)
custom  output : tensor([[0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000],
        [0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000],
        [0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000],
        [0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000],
        [0.0787, 0.0000, 0.1409, 0.0000, 0.0006, 0.1589, 0.1758, 0.0000]],
       grad_fn=<ReluBackward0>)
max |diff|     : 0.0


3. Linear 비교

In [None]:
feature_columns = [
    SparseFeat('feat1', vocabulary_size=10, embedding_dim=4),
    SparseFeat('feat2', vocabulary_size=5, embedding_dim=4),
    DenseFeat ('feat3', dimension=1)
]

linear_part = LinearPart(feature_columns, device='cpu')
official_model = OfficialDeepFM(
    linear_feature_columns=feature_columns,
    dnn_feature_columns=[],       # DNN 없이 linear 파트만 사용
    l2_reg_linear=0,
    l2_reg_embedding=0,
    init_std=0.0001,
    seed=1024,
    task='binary'
)

# 공식 linear_model의 가중치 복사
for fc in feature_columns:
    if isinstance(fc, SparseFeat):
        name = fc.embedding_name  # 보통 name과 동일
        emb_weight = official_model.linear_model.embedding_dict[name].weight.data
        linear_part.sparse_linears[name].weight.data.copy_(emb_weight)

if hasattr(official_model.linear_model, 'weight'):
    w = official_model.linear_model.weight.data  # shape: (dense_dim,1)
    linear_part.dense_linear.weight.data.copy_(w.t())

feature_index = build_input_features(feature_columns)
dense_idx = [feature_index[fc.name][0] for fc in feature_columns if isinstance(fc, DenseFeat)]

batch_size = 4
num_features = len(feature_index)
X_np = np.zeros((batch_size, num_features), dtype=np.float32)

for fc in feature_columns:
    idx = feature_index[fc.name][0]
    if isinstance(fc, SparseFeat):
        X_np[:, idx] = np.random.randint(0, fc.vocabulary_size, size=batch_size)
    else:
        X_np[:, idx] = np.random.randn(batch_size)
X = torch.from_numpy(X_np)

out_custom = linear_part(X, feature_index, dense_idx)
out_official = official_model.linear_model(X)

print("Custom LinearPart 출력:\n", out_custom.detach().numpy())
print("Official linear_model 출력:\n", out_official.detach().numpy())
print("최대 절대 오차:", float((out_custom - out_official).abs().max()))


Custom LinearPart 출력:
 [[-4.3112147e-04]
 [-1.7197993e-04]
 [-1.7077866e-05]
 [-7.7292119e-05]]
Official linear_model 출력:
 [[-4.3112147e-04]
 [-1.7197993e-04]
 [-1.7077866e-05]
 [-7.7292119e-05]]
최대 절대 오차: 0.0
