## Lasso SVD

In [None]:
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.linear_model import Lasso
from sklearn.metrics import mean_squared_error


class Lasso_SVD(BaseEstimator):

    def __init__(self, n_users, n_items, lam=0.1, K=5, iterNum=10, tol=1e-4, verbose=1):
        self.P = np.random.randn(n_users, K)        # user latent factors
        self.Q = np.random.randn(n_items, K)        # item latent factors
        self.n_users = n_users
        self.n_items = n_items
        self.K = K                                  # latent dimension
        self.lam = lam                              # L1 regularization strength
        self.iterNum = iterNum                      # number of ALS iterations
        self.tol = tol
        self.verbose = verbose

    def fit(self, X, y):

        # Precompute indices for each user and item
        self.index_item = [np.where(X[:,1] == i)[0] for i in range(self.n_items)]
        self.index_user = [np.where(X[:,0] == u)[0] for u in range(self.n_users)]

        for _ in range(self.iterNum):

            # -------- update item latent factors Q --------
            for item_id in range(self.n_items):
                idx = self.index_item[item_id]

                if len(idx) == 0:
                    self.Q[item_id] = 0
                    continue

                users = X[idx][:, 0]
                P_tmp = self.P[users]
                y_tmp = y[idx]

                # Lasso regression update
                clf = Lasso(alpha=self.lam, fit_intercept=False)
                clf.fit(P_tmp, y_tmp)
                self.Q[item_id, :] = clf.coef_

            # -------- update user latent factors P --------
            for user_id in range(self.n_users):
                idx = self.index_user[user_id]

                if len(idx) == 0:
                    self.P[user_id] = 0
                    continue

                items = X[idx][:, 1]
                Q_tmp = self.Q[items]
                y_tmp = y[idx]

                clf = Lasso(alpha=self.lam, fit_intercept=False)
                clf.fit(Q_tmp, y_tmp)
                self.P[user_id, :] = clf.coef_

        return self

    def predict(self, X):
        # dot product pᵤ^T qᵢ
        return np.array([np.dot(self.P[u], self.Q[i]) for u, i in X])


def root_mean_squared_error(y_pred, y_true):
    return np.sqrt(mean_squared_error(y_true, y_pred))


In [None]:
import numpy as np
import pandas as pd

train = pd.read_csv("https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/train.csv")
test  = pd.read_csv("https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/test.csv")

# Convert to numpy arrays
X_train = train[['User', 'Item']].values
y_train = train['Rating'].values

X_test = test[['User', 'Item']].values
y_test = test['Rating'].values

# Get user/item counts
n_users = max(train['User'].max(), test['User'].max()) + 1
n_items = max(train['Item'].max(), test['Item'].max()) + 1


In [None]:
model = Lasso_SVD(n_users, n_items, lam=0.1, K=3, iterNum=8)
model.fit(X_train, y_train)
pred_test = model.predict(X_test)
rmse_1 = root_mean_squared_error(pred_test, y_test)
print("RMSE (lambda=0.1, K=3):", rmse_1)

In [None]:
model = Lasso_SVD(n_users, n_items, lam=0.3, K=5, iterNum=8)
model.fit(X_train, y_train)
pred_test = model.predict(X_test)
rmse_2 = root_mean_squared_error(pred_test, y_test)
print("RMSE (lambda=0.3, K=5):", rmse_2)


### **Q1.1**
- Standardize features by removing the mean and scaling to unit variance.
- Merge `data.data` and `data.target` as a one `dataframe` with columns: [`sepal length (cm)`,	`sepal width (cm)`, `petal length (cm)`, `petal width (cm)`, `target`]
- Compute the target-specific mean of each features, that is,

              sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)
      target
      0                   5.006             3.428              1.462             0.246
      1                   5.936             2.770              4.260             1.326
      2                   6.588             2.974              5.552             2.026

### **Q1.2**

- Using `seaborn` to show the `Violinplot` of all features against `target`
- Using `seaborn` to show the `heatmap` of the correlation between all features

In [None]:
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler

# Load data
data = load_iris(as_frame=True)

# Extract X and y
X = data.data
y = data.target

# ---- 1. Standardize features ----
scaler = StandardScaler()
X_std = scaler.fit_transform(X)

# Convert back to DataFrame
X_std_df = pd.DataFrame(X_std, columns=X.columns)

# ---- 2. Merge standardized features + target ----
df = X_std_df.copy()
df['target'] = y

# Show merged DataFrame
df.head()

# ---- 3. Compute target-specific mean ----
group_mean = df.groupby('target').mean()
group_mean


## **Q3: Implementing a Custom Recommender System using TensorFlow**

**Recommender System Model:**

The goal of this question is to implement a custom recommender system model using TensorFlow. The model is defined as follows:

$$\widehat{r}_{ui} = \mathbf{p}_u^\intercal \mathbf{q}_i + b_i + \mathbf{p}_u^\intercal \mathbf{p}_u + a_u$$

where:

* $\widehat{r}_{ui}$ is the predicted rating for user $u$ and item $i$
* $\mathbf{p}_u$ is the user latent factor vector
* $\mathbf{q}_i$ is the item latent factor vector
* $b_i$ is the item bias term
* $a_u$ is the user bias term

**Your Task:**

Implement this model using TensorFlow on our course dataset. You will need to:

1. Load the dataset and preprocess the data as needed
2. Define the model architecture using TensorFlow
3. Implement the loss function and optimizer
4. Train the model on the dataset
5. Evaluate the performance of the model using a `Acc`.

**Note:** You can use TensorFlow's built-in functions and modules to implement the model. Note that the prediction result is not of importance; this question only assesses your implementation.

In [None]:
# TensorFlow recommender implementing r_hat = p_u^T q_i + b_i + p_u^T p_u + a_u
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, regularizers, optimizers

# ---------------------------
# Helper metrics
# ---------------------------
def rmse_tf(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_true - y_pred)))

def rounded_accuracy(y_true, y_pred):
    # simple "Acc" that checks if rounded(pred) == rounded(true)
    y_true_r = tf.round(y_true)
    y_pred_r = tf.round(y_pred)
    return tf.reduce_mean(tf.cast(tf.equal(y_true_r, y_pred_r), tf.float32))

# ---------------------------
# Model definition
# ---------------------------
class CustomRecModel(tf.keras.Model):
    def __init__(self, n_users, n_items, K=8, reg=0.0):
        super().__init__()
        self.K = K
        # Embeddings: shape (n_users, K) and (n_items, K)
        self.user_f = layers.Embedding(
            input_dim=n_users, output_dim=K,
            embeddings_initializer="random_normal",
            embeddings_regularizer=regularizers.l2(reg),
            name="user_f"
        )
        self.item_f = layers.Embedding(
            input_dim=n_items, output_dim=K,
            embeddings_initializer="random_normal",
            embeddings_regularizer=regularizers.l2(reg),
            name="item_f"
        )
        # Bias embeddings (scalars)
        self.user_bias = layers.Embedding(input_dim=n_users, output_dim=1,
                                          embeddings_initializer="zeros",
                                          name="user_bias")
        self.item_bias = layers.Embedding(input_dim=n_items, output_dim=1,
                                          embeddings_initializer="zeros",
                                          name="item_bias")

    def call(self, inputs, training=False):
        # inputs: (users, items) as integer tensors
        users, items = inputs
        p_u = self.user_f(users)                # shape (batch, K)
        q_i = self.item_f(items)                # shape (batch, K)
        a_u = tf.squeeze(self.user_bias(users), axis=-1)  # shape (batch,)
        b_i = tf.squeeze(self.item_bias(items), axis=-1)  # shape (batch,)

        # dot product p_u^T q_i
        dot = tf.reduce_sum(p_u * q_i, axis=1)          # shape (batch,)

        # p_u^T p_u  (vector self-interaction)
        pu_norm = tf.reduce_sum(p_u * p_u, axis=1)      # shape (batch,)

        # final prediction
        r_hat = dot + b_i + pu_norm + a_u               # shape (batch,)
        return r_hat

# ---------------------------
# Data loading example (Netflix-like CSVs)
# Replace these with your course dataset paths as needed
# ---------------------------
train_url = "https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/train.csv"
test_url  = "https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/test.csv"

train = pd.read_csv(train_url)
test  = pd.read_csv(test_url)

# ensure zero-based indices for users/items (if not already)
# If your CSV already uses zero-based ids, keep as-is.
# If ids start at 1, subtract 1:
if train['User'].min() == 1:
    train['User'] -= 1
    test['User']  -= 1
if train['Item'].min() == 1:
    train['Item'] -= 1
    test['Item']  -= 1

# numpy arrays for training
X_train_users = train['User'].to_numpy().astype(np.int32)
X_train_items = train['Item'].to_numpy().astype(np.int32)
y_train = train['Rating'].to_numpy().astype(np.float32)

X_test_users = test['User'].to_numpy().astype(np.int32)
X_test_items = test['Item'].to_numpy().astype(np.int32)
y_test = test['Rating'].to_numpy().astype(np.float32)

n_users = max(train['User'].max(), test['User'].max()) + 1
n_items = max(train['Item'].max(), test['Item'].max()) + 1

# ---------------------------
# Build, compile and train model
# ---------------------------
K = 8
reg = 1e-5
model = CustomRecModel(n_users=n_users, n_items=n_items, K=K, reg=reg)

# compile with MSE and custom metrics
model.compile(
    optimizer=optimizers.Adam(learning_rate=1e-3),
    loss="mse",
    metrics=[rmse_tf, rounded_accuracy]
)

# Prepare tf.data datasets
batch_size = 1024
train_ds = tf.data.Dataset.from_tensor_slices(((X_train_users, X_train_items), y_train))
train_ds = train_ds.shuffle(10000).batch(batch_size).prefetch(tf.data.AUTOTUNE)

test_ds = tf.data.Dataset.from_tensor_slices(((X_test_users, X_test_items), y_test))
test_ds = test_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# Fit
history = model.fit(train_ds, epochs=10, validation_data=test_ds)

# ---------------------------
# Evaluate and print final metrics
# ---------------------------
eval_res = model.evaluate(test_ds, return_dict=True)
print("Test results:", eval_res)

# Example: predict first 10
preds = model.predict((X_test_users[:10], X_test_items[:10])).flatten()
print("First 10 predictions:", preds)
print("First 10 true ratings:", y_test[:10])

