In [None]:
import pandas as pd
import numpy as np
from typing import List, Tuple, Optional
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns
import threading
import multiprocessing
import os
import pickle
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import json

In [None]:
class User:
    """
    Clase para representar un usuario con sus ratings y embedding
    """
    def __init__(self, user_id: str):
        self.id = user_id
        self.ratings: List[Tuple[int, float]] = []
        self.embedding: Optional[np.ndarray] = None
    
    def add_rating(self, item_id: int, rating: float):
        """Agregar un rating del usuario"""
        if not pd.isna(rating):  # Solo agregar ratings válidos
            self.ratings.append((item_id, rating))
    
    def get_ratings_vector(self, total_items: int) -> np.ndarray:
        """Obtener vector de ratings completo (con 0s para items no calificados)"""
        vector = np.zeros(total_items)
        for item_id, rating in self.ratings:
            vector[item_id] = rating
        return vector
    
    def get_ratings_dict(self) -> dict:
        """Obtener ratings como diccionario"""
        return {item_id: rating for item_id, rating in self.ratings}
    
    def to_dict(self) -> dict:
        """Convertir a diccionario para serialización"""
        return {
            'id': self.id,
            'ratings': self.ratings,
            'embedding': self.embedding.tolist() if self.embedding is not None else None
        }
    
    @classmethod
    def from_dict(cls, data: dict):
        """Crear User desde diccionario"""
        user = cls(data['id'])
        user.ratings = data['ratings']
        user.embedding = np.array(data['embedding']) if data['embedding'] is not None else None
        return user
    
    def __str__(self):
        return f"User {self.id}: {len(self.ratings)} ratings, embedding: {self.embedding is not None}"
    
    def __repr__(self):
        return self.__str__()

In [None]:
class Chunk:
    """
    Clase para representar un chunk de usuarios con su embedding representativo
    """
    def __init__(self, chunk_id: int):
        self.id = chunk_id
        self.users: List[User] = []
        self.embedding: Optional[np.ndarray] = None
        self.centroid: Optional[np.ndarray] = None
        self._lock = threading.Lock()
    
    def add_user(self, user: User):
        """Agregar usuario al chunk de forma thread-safe y actualizar embedding promedio"""
        with self._lock:
            self.users.append(user)
            self._update_average_embedding()
    
    def _update_average_embedding(self):
        """Actualizar embedding promedio del chunk"""
        if not self.users or self.users[0].embedding is None:
            self.embedding = None
            return
        
        # Calcular embedding promedio de todos los usuarios
        embeddings = np.array([user.embedding for user in self.users])
        self.embedding = np.mean(embeddings, axis=0)
    
    def calculate_representative_embedding(self):
        """Calcular embedding representativo del chunk (centroide)"""
        self._update_average_embedding()
        if self.embedding is not None:
            self.centroid = self.embedding.copy()
    
    def cosine_distance(self, user_embedding: np.ndarray) -> float:
        """Calcular distancia coseno entre embedding promedio del chunk y usuario"""
        if self.embedding is None:
            return float('inf')  # Distancia infinita si no hay embedding
        
        dot_product = np.dot(self.embedding, user_embedding)
        norm_chunk = np.linalg.norm(self.embedding)
        norm_user = np.linalg.norm(user_embedding)
        
        if norm_chunk == 0 or norm_user == 0:
            return float('inf')
        
        # Similitud coseno
        cosine_similarity = dot_product / (norm_chunk * norm_user)
        
        # Distancia coseno = 1 - similitud coseno
        return 1.0 - cosine_similarity
    
    def cosine_similarity(self, user_embedding: np.ndarray) -> float:
        """Calcular similitud coseno entre embedding promedio del chunk y usuario"""
        distance = self.cosine_distance(user_embedding)
        if distance == float('inf'):
            return 0.0
        return 1.0 - distance
    
    def get_user_count(self) -> int:
        """Obtener número de usuarios en el chunk"""
        return len(self.users)
    
    def to_dict(self) -> dict:
        """Convertir a diccionario para serialización"""
        return {
            'id': self.id,
            'users': [user.to_dict() for user in self.users],
            'embedding': self.embedding.tolist() if self.embedding is not None else None,
            'centroid': self.centroid.tolist() if self.centroid is not None else None
        }
    
    @classmethod
    def from_dict(cls, data: dict):
        """Crear Chunk desde diccionario"""
        chunk = cls(data['id'])
        chunk.users = [User.from_dict(user_data) for user_data in data['users']]
        chunk.embedding = np.array(data['embedding']) if data['embedding'] is not None else None
        chunk.centroid = np.array(data['centroid']) if data['centroid'] is not None else None
        return chunk
    
    def __str__(self):
        return f"Chunk {self.id}: {len(self.users)} users, embedding: {self.embedding is not None}"
    
    def __repr__(self):
        return self.__str__()

In [1]:
class MovieRatingsProcessor:
    """
    Procesador principal para convertir dataset de ratings en chunks de usuarios
    """
    
    def __init__(self, threshold: float = 0.5, num_chunks: int = 4, output_folder: str = "chunks_data"):
        self.threshold = threshold
        self.num_chunks = num_chunks
        self.output_folder = output_folder
        self.chunks: List[Chunk] = []
        self.scaler = StandardScaler()
        self.pca = None
        self.total_items = 0
        self.num_threads = max(1, multiprocessing.cpu_count() // 2)
        
        # Crear carpeta de salida si no existe
        os.makedirs(self.output_folder, exist_ok=True)
        
        # Inicializar chunks
        for i in range(self.num_chunks):
            self.chunks.append(Chunk(i))
            
        print(f"Inicializado con {self.num_chunks} chunks, umbral: {threshold}")
        print(f"Usando {self.num_threads} threads para procesamiento")
    
    def _process_chunk_data(self, chunk_data: Tuple[pd.DataFrame, int]) -> List[User]:
        """
        Procesar un chunk de datos en un thread separado
        """
        df_chunk, chunk_index = chunk_data
        users = []
        
        # Obtener columnas de usuarios (todas excepto la primera)
        user_columns = df_chunk.columns[1:]
        
        # Crear usuarios para este chunk
        for user_id in user_columns:
            user = User(str(user_id))
            
            # Procesar ratings de este usuario
            for idx, row in df_chunk.iterrows():
                item_id = int(row['item_id'])
                rating = row[user_id]
                
                if pd.notna(rating):  # Solo agregar ratings válidos
                    user.add_rating(item_id, float(rating))
            
            if user.ratings:  # Solo agregar usuarios con ratings
                users.append(user)
        
        return users
    
    def load_from_npz(self, npz_path: str, movies_mapping_path: str = None, 
                     users_mapping_path: str = None, metadata_path: str = None) -> None:
        """
        Cargar dataset desde archivos NPZ dispersos
        
        Args:
            npz_path: Ruta al archivo .npz con la matriz dispersa
            movies_mapping_path: Ruta al archivo .pkl con mapeo de películas (opcional)
            users_mapping_path: Ruta al archivo .pkl con mapeo de usuarios (opcional)
            metadata_path: Ruta al archivo .pkl con metadatos (opcional)
        """
        try:
            from scipy.sparse import load_npz
            
            print(f"Cargando matriz dispersa desde: {npz_path}")
            sparse_matrix = load_npz(npz_path)
            
            print(f"Matriz cargada: {sparse_matrix.shape}")
            print(f"Densidad de la matriz: {sparse_matrix.nnz / (sparse_matrix.shape[0] * sparse_matrix.shape[1]):.4f}")
            
            # Cargar mapeos si están disponibles
            movies_mapping = None
            users_mapping = None
            metadata = None
            
            if movies_mapping_path and os.path.exists(movies_mapping_path):
                with open(movies_mapping_path, 'rb') as f:
                    movies_mapping = pickle.load(f)
                print(f"Mapeo de películas cargado: {len(movies_mapping)} películas")
            
            if users_mapping_path and os.path.exists(users_mapping_path):
                with open(users_mapping_path, 'rb') as f:
                    users_mapping = pickle.load(f)
                print(f"Mapeo de usuarios cargado: {len(users_mapping)} usuarios")
            
            if metadata_path and os.path.exists(metadata_path):
                with open(metadata_path, 'rb') as f:
                    metadata = pickle.load(f)
                print(f"Metadatos cargados: {metadata}")
            
            # Convertir matriz dispersa a formato denso para procesamiento
            # Nota: Para matrices muy grandes, considerar procesamiento por lotes
            dense_matrix = sparse_matrix.toarray()
            
            # La matriz tiene forma (usuarios, películas)
            n_users, n_movies = dense_matrix.shape
            self.total_items = n_movies
            
            print(f"Procesando {n_users} usuarios y {n_movies} películas")
            
            # Crear usuarios desde la matriz
            all_users = self._create_users_from_matrix(dense_matrix, users_mapping)
            
            print(f"Creados {len(all_users)} usuarios")
            
            # Generar embeddings para todos los usuarios
            self._generate_user_embeddings_from_matrix(dense_matrix, all_users)
            
            # Distribuir usuarios en chunks basado en similitud
            self._distribute_users_to_chunks(all_users)
            
            # Calcular embeddings representativos para cada chunk
            self._calculate_chunk_embeddings()
            
            # Guardar chunks en archivos
            self._save_chunks_to_files()
            
            # Limpiar memoria
            self._clear_memory()
            
        except ImportError:
            print("Error: scipy no está instalado. Ejecute: pip install scipy")
            raise
        except Exception as e:
            print(f"Error al cargar archivo NPZ: {e}")
            raise
    
    def _create_users_from_matrix(self, matrix: np.ndarray, users_mapping: dict = None) -> List[User]:
        """
        Crear usuarios desde matriz densa
        
        Args:
            matrix: Matriz densa (usuarios x películas)
            users_mapping: Mapeo de índices a IDs de usuario
        """
        users = []
        n_users, n_movies = matrix.shape
        
        print("Creando usuarios desde matriz...")
        
        for user_idx in range(n_users):
            # Obtener ID del usuario
            if users_mapping:
                user_id = users_mapping.get(user_idx, str(user_idx))
            else:
                user_id = str(user_idx)
            
            user = User(user_id)
            
            # Extraer ratings no cero para este usuario
            user_ratings = matrix[user_idx, :]
            non_zero_indices = np.nonzero(user_ratings)[0]
            
            # Agregar ratings al usuario
            for movie_idx in non_zero_indices:
                rating = user_ratings[movie_idx]
                if rating != 0:  # Verificar que no sea cero
                    user.add_rating(movie_idx, float(rating))
            
            if user.ratings:  # Solo agregar usuarios con ratings
                users.append(user)
            
            # Mostrar progreso cada 1000 usuarios
            if (user_idx + 1) % 1000 == 0:
                print(f"Procesados {user_idx + 1}/{n_users} usuarios")
        
        return users
    
    def _generate_user_embeddings_from_matrix(self, matrix: np.ndarray, users: List[User]):
        """
        Generar embeddings para usuarios usando matriz densa directamente
        """
        print("Generando embeddings desde matriz...")
        
        # Usar la matriz directamente (ya está en formato usuario x película)
        ratings_matrix = matrix.copy()
        
        # Filtrar solo las filas de usuarios que tienen ratings
        valid_user_indices = []
        for i, user in enumerate(users):
            if user.ratings:
                valid_user_indices.append(i)
        
        # Tomar solo las filas de usuarios válidos
        if valid_user_indices:
            ratings_matrix = ratings_matrix[valid_user_indices, :]
        
        print(f"Matriz de ratings para embeddings: {ratings_matrix.shape}")
        
        # Normalizar y aplicar PCA
        ratings_matrix = self.scaler.fit_transform(ratings_matrix)
        
        n_components = min(50, ratings_matrix.shape[1], ratings_matrix.shape[0])
        self.pca = PCA(n_components=n_components)
        embeddings = self.pca.fit_transform(ratings_matrix)
        
        # Asignar embeddings a usuarios
        for i, user in enumerate(users):
            user.embedding = embeddings[i]
        
        print(f"Embeddings generados: {embeddings.shape}")
        print(f"Varianza explicada: {self.pca.explained_variance_ratio_.sum():.3f}")
    
    def load_from_csv(self, filepath: str) -> None:
        """
        Cargar dataset desde archivo CSV usando multithreading
        """
        try:
            print(f"Cargando datos desde: {filepath}")
            df = pd.read_csv(filepath)
            
            # Verificar que la primera columna sea item_id
            if df.columns[0] != 'item_id':
                print("Advertencia: Se esperaba 'item_id' como primera columna")
            
            self.total_items = len(df)
            
            # Dividir DataFrame en chunks para procesamiento paralelo
            chunk_size = max(1, len(df) // self.num_threads)
            df_chunks = []
            
            for i in range(0, len(df), chunk_size):
                chunk_df = df.iloc[i:i + chunk_size].copy()
                df_chunks.append((chunk_df, i // chunk_size))
            
            print(f"Dividiendo datos en {len(df_chunks)} chunks para procesamiento")
            
            # Procesar chunks en paralelo
            all_users = []
            with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
                future_to_chunk = {executor.submit(self._process_chunk_data, chunk_data): chunk_data 
                                 for chunk_data in df_chunks}
                
                for future in as_completed(future_to_chunk):
                    chunk_users = future.result()
                    all_users.extend(chunk_users)
            
            print(f"Procesados {len(all_users)} usuarios en total")
            
            # Generar embeddings para todos los usuarios
            self._generate_user_embeddings(all_users)
            
            # Distribuir usuarios en chunks basado en similitud
            self._distribute_users_to_chunks(all_users)
            
            # Calcular embeddings representativos para cada chunk
            self._calculate_chunk_embeddings()
            
            # Guardar chunks en archivos
            self._save_chunks_to_files()
            
            # Limpiar memoria
            self._clear_memory()
            
        except Exception as e:
            print(f"Error al cargar archivo: {e}")
            raise
    
    def _generate_user_embeddings(self, users: List[User]):
        """
        Generar embeddings para todos los usuarios
        """
        print("Generando embeddings para usuarios...")
        
        # Encontrar el rango real de item_ids
        all_item_ids = set()
        for user in users:
            for item_id, rating in user.ratings:
                all_item_ids.add(item_id)
        
        if not all_item_ids:
            print("No se encontraron ratings válidos")
            return
        
        min_item_id = min(all_item_ids)
        max_item_id = max(all_item_ids)
        
        print(f"Rango de item_ids: {min_item_id} - {max_item_id}")
        print(f"Total items únicos: {len(all_item_ids)}")
        
        # Crear mapeo de item_id a índice
        self.item_id_to_index = {item_id: idx for idx, item_id in enumerate(sorted(all_item_ids))}
        self.index_to_item_id = {idx: item_id for item_id, idx in self.item_id_to_index.items()}
        
        # Actualizar total_items al número real de items únicos
        self.total_items = len(all_item_ids)
        
        # Crear matriz de ratings usando el mapeo
        ratings_matrix = np.zeros((len(users), self.total_items))
        
        for i, user in enumerate(users):
            for item_id, rating in user.ratings:
                if item_id in self.item_id_to_index:
                    matrix_index = self.item_id_to_index[item_id]
                    ratings_matrix[i, matrix_index] = rating
        
        print(f"Matriz de ratings creada: {ratings_matrix.shape}")
        
        # Normalizar y aplicar PCA
        ratings_matrix = self.scaler.fit_transform(ratings_matrix)
        
        n_components = min(50, ratings_matrix.shape[1], ratings_matrix.shape[0])
        self.pca = PCA(n_components=n_components)
        embeddings = self.pca.fit_transform(ratings_matrix)
        
        # Asignar embeddings a usuarios
        for i, user in enumerate(users):
            user.embedding = embeddings[i]
        
        print(f"Embeddings generados: {embeddings.shape}")
        #print(f"Varianza explicada:
    
    def _distribute_users_to_chunks(self, users: List[User]):
        """
        Distribuir usuarios en chunks basado en distancia coseno
        """
        print("Distribuyendo usuarios en chunks...")
        
        # Inicializar primer chunk con el primer usuario
        if users:
            self.chunks[0].add_user(users[0])
            print(f"Usuario {users[0].id} agregado al Chunk 0 (inicialización)")
        
        # Distribuir el resto de usuarios
        for user_idx, user in enumerate(users[1:], 1):
            user_assigned = False
            
            # Probar cada chunk secuencialmente
            for chunk_idx, chunk in enumerate(self.chunks):
                # Si el chunk está vacío, agregamos el usuario
                if len(chunk.users) == 0:
                    chunk.add_user(user)
                    user_assigned = True
                    print(f"Usuario {user.id} agregado al Chunk {chunk_idx} (chunk vacío)")
                    break
                
                # Calcular distancia coseno
                cosine_distance = chunk.cosine_distance(user.embedding)
                
                print(f"Usuario {user.id} vs Chunk {chunk_idx}: distancia coseno = {cosine_distance:.4f}")
                
                # Si la distancia es menor que el umbral, agregar al chunk
                if cosine_distance < self.threshold:
                    chunk.add_user(user)
                    user_assigned = True
                    print(f"Usuario {user.id} agregado al Chunk {chunk_idx} (distancia < {self.threshold})")
                    break
                else:
                    print(f"Usuario {user.id} no agregado al Chunk {chunk_idx} (distancia >= {self.threshold})")
            
            # Si no se pudo asignar a ningún chunk, agregar al último chunk
            if not user_assigned:
                last_chunk = self.chunks[-1]
                last_chunk.add_user(user)
                print(f"Usuario {user.id} agregado al Chunk {len(self.chunks)-1} (último chunk por defecto)")
            
            # Mostrar progreso cada 10 usuarios
            if user_idx % 10 == 0:
                print(f"Procesados {user_idx}/{len(users)-1} usuarios")
        
        # Mostrar estadísticas finales de distribución
        print("\nDistribución final de usuarios:")
        for chunk in self.chunks:
            print(f"Chunk {chunk.id}: {len(chunk.users)} usuarios")
            if len(chunk.users) > 0:
                print(f"  Embedding promedio shape: {chunk.embedding.shape if chunk.embedding is not None else 'None'}")
        
        # Verificar que todos los usuarios fueron asignados
        total_assigned = sum(len(chunk.users) for chunk in self.chunks)
        print(f"\nTotal usuarios asignados: {total_assigned}/{len(users)}")
        
        if total_assigned != len(users):
            print("¡ADVERTENCIA! No todos los usuarios fueron asignados correctamente")
    
    def _calculate_chunk_embeddings(self):
        """
        Calcular embeddings representativos para cada chunk
        """
        print("Calculando embeddings representativos de chunks...")
        
        for chunk in self.chunks:
            chunk.calculate_representative_embedding()
    
    def _save_chunks_to_files(self):
        """
        Guardar cada chunk en un archivo individual
        """
        print("Guardando chunks en archivos...")
        
        for chunk in self.chunks:
            filename = os.path.join(self.output_folder, f"chunk_{chunk.id}.pkl")
            
            with open(filename, 'wb') as f:
                pickle.dump(chunk.to_dict(), f)
            
            print(f"Chunk {chunk.id} guardado en: {filename}")
        
        # Guardar metadatos del procesador
        metadata = {
            'threshold': self.threshold,
            'num_chunks': self.num_chunks,
            'total_items': self.total_items,
            'scaler': self.scaler,
            'pca': self.pca
        }
        
        metadata_file = os.path.join(self.output_folder, "processor_metadata.pkl")
        with open(metadata_file, 'wb') as f:
            pickle.dump(metadata, f)
        
        print(f"Metadatos guardados en: {metadata_file}")
    
    def _clear_memory(self):
        """
        Limpiar datos de memoria, conservando solo la estructura del procesador
        """
        print("Limpiando memoria...")
        
        # Limpiar chunks de memoria
        for chunk in self.chunks:
            chunk.users.clear()
            chunk.embedding = None
            chunk.centroid = None
        
        print("Memoria limpiada. Solo se conservan archivos y estructura del procesador.")
    
    def load_chunk_from_file(self, chunk_id: int) -> Optional[Chunk]:
        """
        Cargar un chunk específico desde archivo
        """
        filename = os.path.join(self.output_folder, f"chunk_{chunk_id}.pkl")
        
        if not os.path.exists(filename):
            print(f"Archivo no encontrado: {filename}")
            return None
        
        try:
            with open(filename, 'rb') as f:
                chunk_data = pickle.load(f)
            
            chunk = Chunk.from_dict(chunk_data)
            print(f"Chunk {chunk_id} cargado desde archivo")
            return chunk
            
        except Exception as e:
            print(f"Error al cargar chunk {chunk_id}: {e}")
            return None
    
    def get_chunk_user_count(self, chunk_id: int) -> int:
        """
        Retornar el número de usuarios de un chunk específico
        """
        if chunk_id < 0 or chunk_id >= self.num_chunks:
            print(f"ID de chunk inválido: {chunk_id}. Debe estar entre 0 y {self.num_chunks-1}")
            return 0
        
        # Verificar si el chunk está en memoria
        if chunk_id < len(self.chunks) and self.chunks[chunk_id].users:
            return len(self.chunks[chunk_id].users)
        
        # Cargar desde archivo
        chunk = self.load_chunk_from_file(chunk_id)
        if chunk:
            return chunk.get_user_count()
        
        return 0
    
    def get_all_chunks_stats(self) -> dict:
        """
        Obtener estadísticas de todos los chunks
        """
        stats = {}
        
        for chunk_id in range(self.num_chunks):
            user_count = self.get_chunk_user_count(chunk_id)
            stats[f"chunk_{chunk_id}"] = {
                'user_count': user_count,
                'file_exists': os.path.exists(os.path.join(self.output_folder, f"chunk_{chunk_id}.pkl"))
            }
        
        return stats
    
    def list_chunk_files(self) -> List[str]:
        """
        Listar todos los archivos de chunks disponibles
        """
        files = []
        for i in range(self.num_chunks):
            filename = os.path.join(self.output_folder, f"chunk_{i}.pkl")
            if os.path.exists(filename):
                files.append(filename)
        return files


NameError: name 'Tuple' is not defined

In [None]:
processor = MovieRatingsProcessor(
    threshold=0.003,
    num_chunks=4,
    output_folder="chunks_output"
)

In [None]:
processor.load_from_csv('ratings_matrix.csv')

In [None]:
print("\nEstadísticas de chunks:")
stats = processor.get_all_chunks_stats()
for chunk_name, chunk_stats in stats.items():
    print(f"  {chunk_name}: {chunk_stats['user_count']} usuarios")