In [None]:
from numpy import shape
import random
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import torch
from huggingface_hub import hf_hub_download
from collections import OrderedDict, defaultdict
from transformers import CLIPTokenizer, CLIPModel, CLIPTextModel, FlaxCLIPTextModel
import jax
import jax.numpy as jnp
from tqdm import trange
import threading, queue, math
from tqdm import tqdm

In [None]:
# Se torch_xla è installato (TPU), importalo; altrimenti lo ignori
try:
    import torch_xla.core.xla_model as xm
    TPU_AVAILABLE = True
except ImportError:
    TPU_AVAILABLE = False

In [None]:
class PromptDatasetManager:
    def __init__(self, repo_id="poloclub/diffusiondb", filename="metadata.parquet", max_size=None):
        print("Scaricamento metadata.parquet...")
        self.meta_path = hf_hub_download(
            repo_id=repo_id, repo_type="dataset", filename=filename
        )
        if max_size:
            self.df = pd.read_parquet(self.meta_path).head(max_size)
        else:
            self.df = pd.read_parquet(self.meta_path)
        print(f"Colonne disponibili: {list(self.df.columns)}\n")
        print(f"Dimensioni del DataFrame: {shape(self.df)}")
    
    def get_path(self):
        """
        Restituisce il percorso del file metadata.parquet scaricato.
        """
        return self.meta_path

    def _get_user_sessions(self,
                          user_name: str,
                          session_gap: int = 30) -> pd.DataFrame:
        """
        Estrae dal DataFrame tutte le righe di un dato `user_name`, ordina per timestamp
        e assegna un `session_id` incrementale ogni volta che l'intervallo tra richieste
        successive supera `session_gap` minuti.
        Deve restituire tutte le sessioni per un utente specifico.
        """
        if 'user_name' not in self.df.columns or 'timestamp' not in self.df.columns:
            raise ValueError("Le colonne 'user_name' e/o 'timestamp' non sono presenti.")
        df_user = self.df[self.df['user_name'] == user_name].copy()
        if df_user.empty:
            raise ValueError(f"Nessun dato per l'utente {user_name}")
        df_user = df_user.sort_values('timestamp').reset_index(drop=True)
        # Calcola delta in minuti
        df_user['session_delta'] = df_user['timestamp'].diff().dt.total_seconds() / 60.0
        # Assegna session_id
        session_ids = []
        current_id = 0
        for delta in df_user['session_delta']:
            if pd.isna(delta) or delta > session_gap:
                current_id += 1
            session_ids.append(current_id)
        df_user['session_id'] = session_ids
        return df_user

        
    def add_clip_embeddings_auto(
        self,
        output_path: str,
        batch_size: int = 4096,
        prefetch: int = 2
    ):
        """
        Streaming Parquet → batch → tokenizza su CPU → inferisce su
        TPU (torch_xla) / GPU (CUDA) / CPU → scrive parquet con colonna `clip_emb`.
        """
        reader = pq.ParquetFile(self.meta_path)
        total_rows    = reader.metadata.num_rows

        # --- Device selection e conteggio ---
        if TPU_AVAILABLE:
            device    = xm.xla_device()
            n_devices = xm.xrt_world_size()
            print(f"▶ Using TPU: {n_devices} core(s), device={device}")
        elif torch.cuda.is_available():
            n_devices = torch.cuda.device_count()
            device    = torch.device("cuda")
            names     = [torch.cuda.get_device_name(i) for i in range(n_devices)]
            print(f"▶ Using {n_devices} GPU(s): {names}")
        else:
            n_devices = 1
            device    = torch.device("cpu")
            print("▶ CUDA/TPU non disponibile, uso CPU")

        # Adatto batch_size a multiplo di n_devices
        per_dev_bs     = max(1, batch_size // n_devices)
        batch_size_glb = per_dev_bs * n_devices
        if batch_size_glb != batch_size:
            print(f"Aggiusto batch_size: {batch_size} → {batch_size_glb}  ({per_dev_bs}×{n_devices})")
        batch_size    = batch_size_glb
        total_batches = math.ceil(total_rows / batch_size)

        # --- Writer setup ---
        schema_out = reader.schema_arrow.append(
            pa.field("clip_emb", pa.list_(pa.float32()))
        )
        writer = pq.ParquetWriter(output_path, schema_out, compression=None)

        # --- Tokenizer & Model setup ---
        tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch32")
        model = CLIPTextModel.from_pretrained("openai/clip-vit-base-patch32") \
                             .to(device).half()
        if not TPU_AVAILABLE and n_devices > 1 and device.type == "cuda":
            model = torch.nn.DataParallel(model)
        model = torch.compile(model)
        model.eval()

        # Coda per pipeline producer/consumer
        q = queue.Queue(prefetch)

        def producer():
            for batch in reader.iter_batches(batch_size=batch_size):
                tbl     = pa.Table.from_batches([batch])
                prompts = tbl.column("prompt").to_pylist()
                toks    = tokenizer(prompts,
                                    padding=True,
                                    truncation=True,
                                    return_tensors="pt")
                q.put((tbl, toks))
            q.put(None)

        def consumer():
            pbar = tqdm(total=total_batches, desc="CLIP embeddings", unit="batch")
            while True:
                item = q.get()
                if item is None:
                    break
                tbl, toks = item
                toks = {k: v.to(device, non_blocking=True) for k, v in toks.items()}

                if TPU_AVAILABLE:
                    with torch.no_grad():
                        emb = model(**toks).pooler_output.cpu().numpy()
                else:
                    # mixed‑precision solo su CUDA
                    with torch.cuda.amp.autocast(enabled=(device.type=="cuda")), torch.no_grad():
                        emb = model(**toks).pooler_output.cpu().numpy()

                tbl = tbl.append_column(
                    "clip_emb",
                    pa.array(emb.tolist(), type=pa.list_(pa.float32()))
                )
                writer.write_table(tbl)
                pbar.update(1)
            pbar.close()
            writer.close()

        t1 = threading.Thread(target=producer, daemon=True)
        t2 = threading.Thread(target=consumer, daemon=True)
        t1.start(); t2.start()
        t1.join(); t2.join()

        # Ricarico in pandas per usi successivi
        self.df = pd.read_parquet(output_path)
        print(f"Embeddings calcolati e salvati in: {output_path}")
    
    
    def executeFunctionOnDataFrame(self, func, *args, **kwargs):
        """
        Esegue una funzione `func` sul DataFrame e restituisce il risultato.
        La funzione deve accettare un DataFrame come primo argomento.
        """
        if not callable(func):
            raise ValueError("Il parametro 'func' deve essere una funzione chiamabile.")
        return func(self.df, *args, **kwargs)

    def  getDataFrame(self):
        """
        Restituisce il DataFrame completo.
        Se il DataFrame è molto grande, considera di usare `head()` per limitare le righe.
        """
        return self.df
    
    def getPrompts(self, limit: int = None, shuffle: bool = True):
        """
        Restituisce una lista di prompt dal DataFrame.
        Se `limit` è specificato, restituisce solo i primi `limit` prompt.
        """
        prompts = self.df['prompt'].dropna().tolist()
        if shuffle:
            random.shuffle(prompts)
        if limit is not None:
            prompts = prompts[:limit]
        return prompts
    
    def getUsersPrompts(self, limit: int = None, shuffle: bool = True):
        """
        Restituisce un dizionario di prompt per ogni utente.
        Ogni chiave è il nome dell'utente, e il valore è una lista di prompt.
        Se `limit` è specificato, restituisce solo i primi `limit` utenti.
        """
        user_prompts = {}
        users = self.df['user_name'].dropna().unique()
        if shuffle:
            random.shuffle(users)
        if limit is not None:
            users = users[:limit]
        for user in users:
            user_prompts[user] = self.df[self.df['user_name'] == user]['prompt'].dropna().tolist()
        return user_prompts
    
    def getSessionsPrompts(self, session_gap: int = 30, limit: int = None, shuffle: bool = True):
        """
        Restituisce un dizionario di sessioni per ogni utente.
        Ogni chiave è il nome dell'utente, e il valore è un dizionario con `session_id` come chiave
        e una lista di prompt come valore.
        Se `limit` è specificato, restituisce solo i primi `limit` utenti.
        """
        user_sessions = {}
        users = self.df['user_name'].dropna().unique()
        if shuffle:
            random.shuffle(users)
        if limit is not None:
            users = users[:limit]
        for user in users:
            df_user = self._get_user_sessions(user, session_gap=session_gap)
            if df_user.empty:
                continue
            sessions = defaultdict(list)
            for _, row in df_user.iterrows():
                sessions[row['session_id']].append(row['prompt'])
            user_sessions[user] = dict(sessions)
        return user_sessions
    
    def retRandomPrompt(self):
        """
        Restituisce una funzione lambda che, ad ogni chiamata,
        restituisce un nuovo prompt casuale dal database.
        """
        prompts = self.df['prompt'].dropna().tolist()
        return lambda: random.choice(prompts)
    
    def retRandomSession(self, session_gap: int = 30, max_prompts: int = None):
        """
        Restituisce una funzione che genera sessioni di un utente casuale.
        Ogni sessione ritorna i prompt di un utente casuale, se specificato,
        i prompt per ogni sessione sono limitati a `max_prompts.
        """
        users = self.df['user_name'].dropna().unique()
        if len(users) == 0:
            raise ValueError("Nessun utente trovato nel DataFrame.")
        
        def generate_session():
            user = random.choice(users)
            df_user = self._get_user_sessions(user, session_gap)
            if df_user.empty:
                raise ValueError(f"Nessuna sessione trovata per l'utente {user}")
            # Ritorno tutti i prompt della sessione se `max_prompts` non è specificato
            if max_prompts is None:
                return user, df_user['prompt'].dropna().tolist()
            # Altrimenti, ritorno tutti i prompt della sessione
            return 

        
        return generate_session

In [None]:
manager = PromptDatasetManager(max_size=10)
manager.add_clip_embeddings_auto("dataset.parquet")

In [None]:
# Test
print("Esempio di utilizzo del PromptDatasetManager:")
print("Numero di prompt:", len(manager.getPrompts()))

In [None]:
x