In [None]:
from abc import ABC, abstractmethod
from sklearn.decomposition import PCA
import numpy as np
import torch

def project_onto_direction(H, direction):
    """
    Projects the given gcn_embeddings onto the given direction.

    Args:
        H (torch.Tensor): The gcn_embeddings to project.(n, embedding_dim)
        direction (torch.Tensor): The direction to project onto.(embedding_dim,)

    Returns:
        torch.Tensor: The projected states.(n, )
    """
    if not isinstance(H, torch.Tensor):
        H = torch.tensor(H, dtype=torch.float32, device='cuda')  

    if not isinstance(direction, torch.Tensor):
        direction = torch.tensor(direction, dtype=torch.float32, device=H.device)  

    mag = torch.norm(direction)
    assert not torch.isinf(mag).any(), "direction is inf"

    # calculate the projection
    projection = H.matmul(direction) / mag
    return projection

def recenter(x, mean=None):
    """
    Recenter the given data to have zero mean.

    Args:
        x (torch.Tensor): The data to recenter.(n, embedding_dim)
        mean (torch.Tensor, optional): The mean to subtract from the data. Defaults to None.

    Returns:
        torch.Tensor: The recentered data.(n, embedding_dim)
    """
    x = torch.Tensor(x).cuda()
    if mean is None:
        mean = torch.mean(x, axis=0, keepdims= True).cuda()
    else:
        mean = torch.Tensor(mean).cuda()
    return x - mean

class RepReader(ABC):
    """Class to identify and store concept directions.
    
    Subclasses implement the abstract methods to identify concept directions 
    for each hidden layer via strategies including PCA, and cluster means.

    RepReader instances are used by RepReaderPipeline to get concept scores.

    Directions can be used for downstream interventions."""

    @abstractmethod
    def __init__(self) -> None:
        self.contrast = True
        self.directions = None # directions accessible via directions[layer]
        self.direction_signs = None # direction of high concept scores (mapping min/max to high/low)
    
    @abstractmethod
    def get_rep_directions(self, gcn_embeddings: torch.Tensor) -> None:
        """Identify concept directions for each embeddings.
        
        Args:
            H (torch.Tensor): The embeddings to analyze.(n, embedding_dim)
        """
        pass

    @abstractmethod
    def get_signs(self, gcn_embeddings: torch.Tensor, train_labels) -> None:
        """Identify the direction of high concept scores.
        
        Args:
            gcn_embeddings (torch.Tensor): The embeddings to analyze.(n, embedding_dim)
            train_labels (torch.Tensor): The labels to analyze.(n, )
        """
        pass

    @abstractmethod
    def test_accuracy(self, gcn_embeddings: torch.Tensor, test_labels, direction_signs) -> None:
        """Test the accuracy of the concept directions.
        
        Args:
            gcn_embeddings (torch.Tensor): The embeddings to analyze.(n, embedding_dim)
            test_labels (torch.Tensor): The labels to analyze.(n, )
            direction_signs (dict): The direction of high concept scores.(n, )
        """
        pass

class PCARepReader(RepReader):
    """ Extract directions via PCA."""

    def __init__(self, n_components: int = 1, contrast: bool = True) -> None:
        super().__init__()
        self.n_components = n_components
        self.contrast = contrast
        self.H_train_means = {}
    
    def get_rep_directions(self, gcn_embeddings: torch.Tensor) -> dict:
        """Identify concept directions for each embeddings.
        
        Args:
            gcn_em (torch.Tensor): The embeddings to analyze.(n, layers, embedding_dim)
        """
        self.directions = {}
        self.explained_variance_ratios = {}
        for layer in range(gcn_embeddings.shape[1]):
            H = gcn_embeddings[:, layer, :].clone()
            if self.contrast:
                H_relative = H[1::2][:] - H[::2][:]
                H_mean = torch.mean(H_relative, axis=0, keepdims=True)
                self.H_train_means[layer] = H_mean
                H_train = recenter(H_relative, H_mean).cpu().numpy()
            else:
                H_mean = torch.mean(H, axis=0, keepdims=True)
                self.H_train_means[layer] = H_mean
                H_train = recenter(H, H_mean).cpu().numpy()
            H_train = np.vstack(H_train)
            pca = PCA(n_components=self.n_components)
            pca.fit(H_train)
            self.directions[layer] = pca.components_
            print("explained_variance_ratio:", pca.explained_variance_ratio_)
            self.explained_variance_ratios[layer] = pca.explained_variance_ratio_
        return self.directions

    
    def get_signs(self, gcn_embeddings: torch.Tensor, train_labels) -> dict:
        """Identify the direction of high concept scores.
        
        Args:
            gcn_embeddings (torch.Tensor): The embeddings to analyze.(n, layers, embedding_dim)
            train_labels (torch.Tensor): The labels to analyze.(n, )
        """
        self.direction_signs = {}
        for layer in range(gcn_embeddings.shape[1]):
            H = gcn_embeddings[:, layer, :].clone()
            H = recenter(H, self.H_train_means[layer])
            layer_signs = np.zeros(self.n_components)
            for component_index in range(self.n_components):
                trans_states = project_onto_direction(H, self.directions[layer][component_index])
                layer_signs[component_index] = np.sign(np.mean(trans_states.cpu().numpy()[train_labels.cpu().numpy() == 1]) - np.mean(trans_states.cpu().numpy()[train_labels.cpu().numpy() == 0]))
                if layer_signs[component_index] == 0:
                    layer_signs[component_index] = 1
            self.direction_signs[layer] = layer_signs
        return self.direction_signs

    def test_accuracy(self, gcn_embeddings: torch.Tensor, n_compont: int = 0) -> dict:
        """Test the accuracy of the concept directions.
            to be implemented: n_components > 1
        Args:
            gcn_embeddings (torch.Tensor): The embeddings to analyze.(n, embedding_dim)
        """
        test_results = {}
        for layer in range(gcn_embeddings.shape[1]):
            H = gcn_embeddings[:, layer, :].clone()
            H = recenter(H, self.H_train_means[layer])
            H_test = project_onto_direction(H, self.directions[layer][n_compont])
            test_results[layer] = H_test.cpu().numpy()
        return test_results
            

class ClusterMeanRepReader(RepReader):
    """ Get the direction that is the difference between the mean of the positive and negative clusters."""
    n_components = 1
    def __init__(self, contrast: bool = True) -> None:
        super().__init__()
        self.contrast = contrast
        
    def get_rep_directions(self, gcn_embeddings: torch.Tensor) -> dict:
        """Identify concept directions for each embeddings.
        
        Args:
            gcn_em (torch.Tensor): The embeddings to analyze.(n, layers, embedding_dim)
        """
        self.directions = {}
        for layer in range(gcn_embeddings.shape[1]):
            H = gcn_embeddings[:, layer, :].clone()
            if self.contrast:
                H_pos = H[1::2][:]
                H_neg = H[::2][:]
                H_pos_mean = torch.mean(H_pos, axis=0, keepdims=True)
                H_neg_mean = torch.mean(H_neg, axis=0, keepdims=True)
                self.directions[layer] = H_pos_mean - H_neg_mean
            else:
                H_mean = torch.mean(H, axis=0, keepdims=True)
                self.directions[layer] = H_mean                
        return self.directions

    
    def get_signs(self, gcn_embeddings: torch.Tensor) -> None:
        return None

    def test_accuracy(self, gcn_embeddings: torch.Tensor) -> None:
        """Test the accuracy of the concept directions.
            to be implemented: n_components > 1
        Args:
            gcn_embeddings (torch.Tensor): The embeddings to analyze.(n, embedding_dim)
        """
        test_results = {}
        for layer in range(gcn_embeddings.shape[1]):
            H = gcn_embeddings[:, layer, :].clone()
            H_test = project_onto_direction(H, self.directions[layer].reshape(-1, 1))
            test_results[layer] = H_test.squeeze().cpu().numpy()
        return test_results

In [None]:
print("RepReader and PCARepReader classes are defined. You can now use them to extract concept directions from GCN embeddings.")

In [None]:
import os
from datetime import datetime
import sys
import pandas as pd
import random
os.environ["CUDA_VISIBLE_DEVICES"] = "0"  # Set to the GPU ID you want to use
BASE_DIR = '.'
from recbole.data import (
    create_dataset,
    data_preparation,
)
from recbole.utils import (
    get_model,
    init_seed,
)
import json
import math
# IMPORTANT: Specify the path to your pre-trained model file.
# This path should be relative to the project root directory.
MODEL_FILE = './saved/path_to_your_tmall_model.pth' 


In [None]:
torch.manual_seed(2020)
np.random.seed(2020)
random.seed(2020)
if torch.cuda.is_available():
	torch.cuda.manual_seed_all(2020)

In [None]:
# Load the pre-trained model and dataset from the checkpoint
print("Loading model and data from checkpoint...")
checkpoint = torch.load(MODEL_FILE)
config = checkpoint['config']
init_seed(config['seed'], config['reproducibility']) # Ensure reproducibility
dataset = create_dataset(config)
train_data, valid_data, test_data = data_preparation(config, dataset)
model = get_model(config["model"])(config, train_data._dataset).to(config["device"])
model.load_state_dict(checkpoint["state_dict"])
model.load_other_parameter(checkpoint.get("other_parameter"))
print("Model and data loaded successfully.")

In [None]:
uid_field = train_data._dataset.uid_field
iid_field = train_data._dataset.iid_field
user_interactions = pd.Series(train_data._dataset.inter_feat[uid_field].numpy()).value_counts()
item_interactions = pd.Series(train_data._dataset.inter_feat[iid_field].numpy()).value_counts()

In [None]:
bins = [0, 10, 15, 25, 40, 60, 100, 150, np.inf]
labels = ['0-9', '10-15', '16-25', '26-40', '41-60', '61-100', '101-150', '>150']

In [None]:
item_bins = [0, 5, 10, 15, 20, 30, 50, 75, 100,  200, 400, np.inf]
item_labels = ['0-5', '6-9', '10-15', '16-20', '21-30', '31-50', '51-75', '76-100', '101-200', '201-400', '>400']

In [None]:
# User interaction distribution statistics
user_counts = pd.cut(user_interactions, bins=bins, labels=labels).value_counts()
user_counts = user_counts.sort_index()  # Sort by interval order

# Item interaction distribution statistics
item_counts = pd.cut(item_interactions, bins=item_bins, labels=item_labels).value_counts()
item_counts = item_counts.sort_index()  # Sort by interval order

# Print detailed distribution data
print("User interaction distribution:")
print(user_counts)
print("\nItem interaction distribution:")
print(item_counts)

In [None]:
item_interaction_dict={}
for low_line in item_bins:
    item_interaction_dict[str(low_line)+'-'] = []
    for i_id, count in item_interactions.items():
        if count <= low_line:
            item_interaction_dict[str(low_line)+'-'].append(i_id)
for high_line in item_bins:
    item_interaction_dict[str(high_line)+'+'] = []
    for i_id, count in item_interactions.items():
        if count >= high_line:
            item_interaction_dict[str(high_line)+'+'].append(i_id)

In [None]:
def popularity_repe_clustermean(low_line, high_line):
    # Get item IDs
    low_line_id = list(item_interaction_dict[str(low_line)+'-'])
    high_line_id = list(item_interaction_dict[str(high_line)+'+'])
    
    # Early return if no data
    if len(low_line_id) < 10 or len(high_line_id) < 10:
        return
    
    # Batch process embeddings
    with torch.no_grad():
        low_ids = torch.tensor(low_line_id).to(config['device'])
        high_ids = torch.tensor(high_line_id).to(config['device'])
        
        low_line_embeddings = model.get_item_embedding(low_ids).cpu().numpy()
        high_line_embeddings = model.get_item_embedding(high_ids).cpu().numpy()

                # Add the extra dimension in the middle
        low_line_embeddings = np.expand_dims(low_line_embeddings, axis=1)
        high_line_embeddings = np.expand_dims(high_line_embeddings, axis=1)
        

    print(low_line, high_line)
    print(np.array(low_line_embeddings).shape)
    print(np.array(high_line_embeddings).shape)

    # Determine sizes
    total_num = min(len(low_line_embeddings), len(high_line_embeddings))
    train_num = math.ceil(int(total_num * 0.8))
    test_num = total_num - train_num

    # Shuffle once
    np.random.shuffle(low_line_embeddings)
    np.random.shuffle(high_line_embeddings)

    # More efficient implementation with interleaving pattern
    # Create train data with alternating pattern (low, high, low, high...)
    train_low = low_line_embeddings[:train_num]
    train_high = high_line_embeddings[:train_num]
    # Interleave arrays - note the shape includes the extra dimension now
    train_data = np.empty((2*train_num, train_low.shape[1], train_low.shape[2]), dtype=train_low.dtype)
    train_data[0::2] = train_low  # Even indices for low popularity
    train_data[1::2] = train_high  # Odd indices for high popularity
    # Similar interleaved pattern for labels
    train_labels = np.zeros(2*train_num)
    train_labels[1::2] = 1  # Set odd indices to 1 (high popularity)

    # Create test data with alternating pattern
    test_low = low_line_embeddings[train_num:total_num]
    test_high = high_line_embeddings[train_num:total_num]
    # Interleave arrays - note the shape includes the extra dimension now
    test_data = np.empty((2*test_num, test_low.shape[1], test_low.shape[2]), dtype=test_low.dtype)
    test_data[0::2] = test_low  # Even indices for low popularity
    test_data[1::2] = test_high  # Odd indices for high popularity
    # Similar interleaved pattern for labels
    test_labels = np.zeros(2*test_num)
    test_labels[1::2] = 1  # Set odd indices to 1 (high popularity)

    # Convert to torch tensors once
    train_data = torch.tensor(train_data)
    test_data = torch.tensor(test_data)
    train_label = torch.tensor(train_labels)
    test_label = torch.tensor(test_labels)


    pca_rep_reader = PCARepReader(contrast=True)
    rep_direction = pca_rep_reader.get_rep_directions(train_data)
    print(rep_direction)
    # direction_signs = pca_rep_reader.get_signs(train_data, train_label)
    # print(direction_signs)
    test_result = pca_rep_reader.test_accuracy(test_data)
    results_val = {layer:{} for layer in range(0, 5)}
    for layer in range(0, 1):
        # Get the test results for the current layer
        layer_results = test_result[layer]
        
        # Calculate number of complete pairs
        n_pairs = test_data.shape[0] // 2
        max_idx = 2 * n_pairs
        
        # First comparison: even indices with their following odd indices
        even_indices = np.arange(0, max_idx, 2)
        odd_indices = np.arange(1, max_idx, 2)
        
        # Get values at these indices
        even_values = layer_results[even_indices]
        odd_values = layer_results[odd_indices]
        
        # Second comparison: odd indices with next even indices (excluding last odd index)
        next_indices = np.arange(2, max_idx, 2)
        next_values = np.zeros_like(odd_values)
        next_values[:-1] = layer_results[next_indices]  # All except last
        
        # First comparison: even <= odd (1 if true, 0 if false)
        first_comparison = (even_values - odd_values <= 0).astype(int)
        
        # Second comparison: odd >= next (1 if true, 0 if false)
        second_comparison = np.zeros_like(first_comparison)
        second_comparison[:-1] = (odd_values[:-1] - next_values[:-1] >= 0).astype(int)
        
        # Combine results (flatten to match the original implementation)
        result = np.zeros(2 * len(first_comparison) - 1)
        result[0::2] = first_comparison  # Even positions for first comparison
        result[1::2] = second_comparison[:-1]  # Odd positions for second comparison
        
        # Calculate final result
        results_val[layer] = np.mean(result)
        print(f"{layer}: {results_val[layer]}")
        print("=====")
    return rep_direction, results_val

In [None]:
for low_line in item_bins:
    for high_line in item_bins:
        if low_line < high_line:
            popularity_repe_clustermean(low_line, high_line)

In [None]:
log_data = []

In [None]:
from datetime import datetime
# Define base directory for saving results relative to the project root
DIRECTION_BASE_DIR = "./e_pop_saved/" 
# Define and create a dataset-specific subdirectory
DATASET_NAME = 'tmall' 
DIRECTION_BASE_DIR_ML = os.path.join(DIRECTION_BASE_DIR, DATASET_NAME)
os.makedirs(DIRECTION_BASE_DIR_ML, exist_ok=True)
os.makedirs(DIRECTION_BASE_DIR_ML, exist_ok=True)
TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
LOG_FILE_PATH = os.path.join(DIRECTION_BASE_DIR_ML, f"rep_direction_item_tmall_{TIMESTAMP}.json")

In [None]:
from collections import OrderedDict
def convert_to_serializable(value):
    """Recursively converts values to JSON-serializable types."""
    # Handle pandas Series
    if isinstance(value, pd.Series):
        # Convert pandas Series to dictionary
        try:
            # First approach: iterate through items
            return {str(k): convert_to_serializable(v) for k, v in value.items()}
        except Exception:
            # Fallback: use to_dict() method which is more reliable
            return {str(k): convert_to_serializable(v) for k, v in value.to_dict().items()}
    # Handle scalar types
    elif isinstance(value, torch.Tensor):
        return value.cpu().tolist()
    elif isinstance(value, np.ndarray):
        return value.tolist()
    elif isinstance(value, (np.int_, np.intc, np.intp, np.int8,
                          np.int16, np.int32, np.int64, np.uint8,
                          np.uint16, np.uint32, np.uint64)):
        return int(value)
    elif isinstance(value, (np.float_, np.float16, np.float32,
                          np.float64)):
        return float(value)
    elif isinstance(value, OrderedDict):
        # For OrderedDict, maintain order within the dict representation
        return {k: convert_to_serializable(v) for k, v in value.items()}
    # Handle collections recursively
    elif isinstance(value, dict):
        return {k: convert_to_serializable(v) for k, v in value.items()}
    elif isinstance(value, (list, tuple)):
        return [convert_to_serializable(item) for item in value]
    # Return everything else as is
    return value

def log_message(key, value):
    """Stores a log entry (key-value pair) with timestamp for later saving."""
    # Convert value to JSON-serializable format
    serializable_value = convert_to_serializable(value)

    log_entry = {
        # "timestamp": datetime.now().isoformat()
        "key": key,
        "value": serializable_value
    }
    log_data.append(log_entry)

    print(f"LOG [{key}]: {serializable_value}")

def save_log_to_json():
    """Saves all collected log messages sequentially to a JSON file."""
    try:
        # Extra safety: convert everything one more time to ensure it's serializable
        serializable_data = convert_to_serializable(log_data)
        with open(LOG_FILE_PATH, 'w') as f:
            json.dump(serializable_data, f, indent=4)
        print(f"Log data successfully saved sequentially to {LOG_FILE_PATH}")
    except Exception as e:
        print(f"Error saving log data to JSON: {e}")

In [None]:
rep_direction, results_val = popularity_repe_clustermean(5, 100) # you can change the parameters here
log_message("rep_direction_clustermean_low_high_line", [5, 100])
log_message("rep_direction_clustermean", rep_direction)
log_message("results_val_clustermean", results_val)

In [None]:
save_log_to_json()