In [None]:
from typing import List, Tuple
import tensorflow as tf
from Recommender_System.utility.decorator import logger
from Recommender_System.utility.evaluation import TopkData
from Recommender_System.algorithm.common import topk

# SLIM模型类定义
class SLIM(tf.keras.Model):
    @logger('初始化SLIM模型：', ('n_user', 'n_item'))
    def __init__(self, n_user: int, n_item: int, A: List[List[float]], **kwargs):
        super(SLIM, self).__init__(**kwargs)
        assert len(A) == n_user and len(A[0]) == n_item
        self.n_user, self.n_item = n_user, n_item
        self.A = tf.constant(A, dtype=tf.keras.backend.floatx())
        self.W = self.add_weight(name='W', shape=(n_item, n_item), initializer='zeros',
                                 dtype=tf.keras.backend.floatx(),
                                 constraint=SLIMwConstraint(n_item))
        self._AW = None
        self._AW_updated = False

    def loss(self, training=False):
        if self.A is None or self.W is None:
            raise ValueError("A and W must not be None")

        loss_val = tf.reduce_sum(tf.square(self.A - tf.matmul(self.A, self.W)))
        if training:
            self._AW_updated = False
        return loss_val

    def call(self, inputs):
        if not self._AW_updated:
            self._AW = tf.matmul(self.A, self.W)
            self._AW_updated = True
        return tf.gather_nd(self._AW, inputs)

    @property
    def AW(self):
        if not self._AW_updated:
            self._AW = tf.matmul(self.A, self.W)
            self._AW_updated = True
        return self._AW

# SLIM权重约束类
class SLIMwConstraint(tf.keras.constraints.Constraint):
    def __init__(self, n_item: int):
        self.k = tf.cast(tf.ones((n_item, n_item)) - tf.eye(n_item), dtype=tf.keras.backend.floatx())

    def __call__(self, w):
        return tf.maximum(w * self.k, 0)

# 训练集用户物品交互矩阵获取函数
@logger('根据训练集数据得到用户物品交互矩阵，', ('n_user', 'n_item'))
def get_user_item_matrix(n_user: int, n_item: int, train_data: List[Tuple[int, int, int]]) -> List[List[int]]:
    user_item_matrix = [[0 for _ in range(n_item)] for _ in range(n_user)]
    for user_id, item_id, label in train_data:
        user_item_matrix[user_id][item_id] = label
    return user_item_matrix

# SLIM模型训练函数
@logger('开始训练，', ('l12', 'epochs'))
def train(model: SLIM, topk_data: TopkData, l12=0.01, epochs=1000):
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

    @tf.function
    def train_step():
        with tf.GradientTape() as tape:
            loss = model.loss(training=True)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        return loss

    for epoch in range(epochs):
        loss = train_step()
        if epoch == 1 or (epoch < 20 and epoch % 5 == 0) or (epoch < 100 and epoch % 20 == 0) or epoch % 100 == 0:
            print('epoch=', epoch, ', loss=', loss.numpy(), sep='')
            topk(topk_data, score_fn=lambda ui: model({k: tf.constant(v, dtype=tf.int32) for k, v in ui.items()}))

if __name__ == '__main__':
    model = SLIM(2, 3, [[0, 1, 1], [1, 0, 1]])
    print(model({'user_id': [0, 1, 0], 'item_id': [0, 2, 1]}))
    print(model({'user_id': [1, 0], 'item_id': [1, 2]}))
