In [11]:
import os
import numpy as np
import pandas as pd
import torch
import time
import json
from tqdm import tqdm
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
from sentence_transformers import SentenceTransformer
import spacy
import gc
import hnswlib

class ProductRecommender:
    def __init__(self, dataframe_path, max_dataset_size=200000, chunk_size=2000, top_n=10, absa_chunk_size=400, absa_batch_size=16):
        os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

        if torch.cuda.is_available():
            self.overall_device_str = 'cuda'
            self.overall_device_idx = 0
            print(f"📦 **Overall Device**: GPU (cuda:{self.overall_device_idx})")
        else:
            self.overall_device_str = 'cpu'
            self.overall_device_idx = -1
            print(f"📦 **Overall Device**: CPU")

        self.chunk_size = chunk_size
        self.top_n = top_n
        self.max_dataset_size = max_dataset_size
        self.absa_chunk_size = absa_chunk_size
        self.absa_batch_size = absa_batch_size

        print(f"⚙️ **ABSA Settings**: chunk_size={self.absa_chunk_size}, batch_size={self.absa_batch_size}")

        self.df_original = pd.read_csv(dataframe_path)
        self.df = None

        # Load ABSA model
        absa_model_name = "yangheng/deberta-v3-base-absa-v1.1"
        self.absa_pipeline_device_idx = self.overall_device_idx
        try:
            absa_tokenizer = AutoTokenizer.from_pretrained(absa_model_name)
            absa_model = AutoModelForSequenceClassification.from_pretrained(
                absa_model_name,
                device_map="auto"
            )
            print(f"✅ **ABSA Model loaded with device_map='auto'**.")
        except Exception as e:
            print(f"⚠️ **Failed to load ABSA model with device_map='auto'**: {e}. Falling back.")
            absa_tokenizer = AutoTokenizer.from_pretrained(absa_model_name)
            absa_model = AutoModelForSequenceClassification.from_pretrained(absa_model_name)
            absa_model.to(self.overall_device_str)
            print(f"✅ **ABSA Model loaded on {self.overall_device_str}**.")

        if hasattr(absa_model, 'hf_device_map') and absa_model.hf_device_map is not None:
            self.absa_pipe = pipeline("text-classification", model=absa_model, tokenizer=absa_tokenizer)
            print("✅ **ABSA pipeline initialized (managed by Accelerate)**.")
        else:
            self.absa_pipe = pipeline("text-classification", model=absa_model, tokenizer=absa_tokenizer, device=self.absa_pipeline_device_idx)
            print(f"✅ **ABSA pipeline initialized on {'GPU' if self.absa_pipeline_device_idx != -1 else 'CPU'}**.")

        self.sbert = SentenceTransformer('all-MiniLM-L6-v2', device=self.overall_device_str)
        print(f"✅ **SBERT Model loaded on {self.overall_device_str}**.")

        # Load spaCy for dynamic aspect extraction
        self.nlp = spacy.load("en_core_web_sm", disable=["ner", "lemmatizer"])  # Lightweight model

        # File paths
        self.annotated_path = "/content/drive/MyDrive/finalyearResearch/Second_fixed_image_urls.csv"
        self.incremental_annotated_path = "/content/drive/MyDrive/finalyearResearch/annoated/absa_annotated_data_incremental.csv"
        self.itemname_emb_path = "/content/drive/MyDrive/finalyearResearch/itemname_embeddings.npy"
        self.combo_text_emb_path = "/content/drive/MyDrive/finalyearResearch/combo_text_embeddings.npy"
        self.enriched_item_descriptions_emb_path = "/content/drive/MyDrive/finalyearResearch/enriched_item_descriptions_embeddings.npy"
        self.hnsw_index_path = "/content/drive/MyDrive/finalyearResearch/hnsw_index.bin"

        # Ensure directory exists
        os.makedirs(os.path.dirname(self.incremental_annotated_path), exist_ok=True)
        print(f"✅ **Ensured directory exists for incremental file: {os.path.dirname(self.incremental_annotated_path)}**")

        self.item_unique_id_to_name_map = None
        self.item_unique_id_to_enriched_text_map = None
        self.hnsw_index = None

        self.df = self._prepare_data()

    def _extract_dynamic_aspects(self, review):
        """
        Extract aspects dynamically from a review using spaCy noun phrase detection.
        """
        doc = self.nlp(review)
        aspects = []
        for chunk in doc.noun_chunks:
            aspect = chunk.text.lower().strip()
            if len(aspect) > 2 and aspect not in {'it', 'this', 'that', 'thing', 'product', 'item'}:
                aspects.append(aspect)
        return list(set(aspects))[:3]

    def _extract_multi_aspects(self, reviews):
        """
        Extract dynamic aspects and their sentiments from a list of reviews.
        """
        results = []
        for review in reviews:
            review_results = {}
            aspects = self._extract_dynamic_aspects(review)
            if not aspects:
                aspects = ['general']
            for aspect in aspects:
                input_text = f"[CLS] {review} [SEP] {aspect} [SEP]"
                try:
                    result = self.absa_pipe(input_text, batch_size=1)[0]
                    sentiment = result['label'].capitalize()
                    score = result['score']
                    if score > 0.6:
                        review_results[aspect] = {'sentiment': sentiment, 'confidence': score}
                    else:
                        review_results[aspect] = {'sentiment': 'Neutral', 'confidence': 0.0}
                except Exception as e:
                    print(f"⚠️ Error processing aspect '{aspect}' for review: {e}")
                    review_results[aspect] = {'sentiment': 'Neutral', 'confidence': 0.0}
            results.append(review_results)
        return results

    def _prepare_data(self):
        """
        Load pre-annotated data and embeddings, cleaning aspects_sentiments column.
        """
        if os.path.exists(self.annotated_path):
            print(f"📂 **Final annotated file found at '{self.annotated_path}'. Loading it.**")
            try:
                df = pd.read_csv(self.annotated_path)
                # Clean aspects_sentiments column
                df['aspects_sentiments'] = df['aspects_sentiments'].apply(
                    lambda x: '{}' if pd.isna(x) or not isinstance(x, str) else x
                )
                if 'aspects_sentiments' in df.columns:
                    print("✅ **Data appears complete with dynamic ABSA annotations.**")
                    self._save_embeddings(df)
                    return df
                else:
                    print("⚠️ **Final file missing dynamic ABSA annotations. Reprocessing.**")
            except Exception as e:
                print(f"⚠️ **Error loading final annotated file: {e}. Reprocessing.**")

        df = self.df_original.sample(n=min(self.max_dataset_size, len(self.df_original)), random_state=42).copy()
        df = df[df['reviewText'].str.len() > 15].reset_index(drop=True)
        df['description'] = df['description'].fillna('').astype(str).apply(lambda x: x.replace("['", "").replace("']", "").replace("', '", ", "))
        df['feature'] = df['feature'].fillna('').astype(str).apply(lambda x: x.replace("['", "").replace("']", "").replace("', '", ", "))
        df['item_unique_id'] = df['itemName'].astype(str) + df['category'].astype(str) + df['description'].astype(str) + df['feature'].astype(str)

        processed_df = None
        df_to_process = df.copy()

        if os.path.exists(self.incremental_annotated_path):
            print(f"📂 **Found incremental data file: {self.incremental_annotated_path}. Checking...**")
            try:
                processed_df = pd.read_csv(self.incremental_annotated_path)
                processed_df['description'] = processed_df['description'].fillna('').astype(str).apply(lambda x: x.replace("['", "").replace("']", "").replace("', '", ", "))
                processed_df['feature'] = processed_df['feature'].fillna('').astype(str).apply(lambda x: x.replace("['", "").replace("']", "").replace("', '", ", "))
                processed_df['item_unique_id'] = processed_df['itemName'].astype(str) + processed_df['category'].astype(str) + processed_df['description'].astype(str) + processed_df['feature'].astype(str)
                # Clean aspects_sentiments in incremental data
                processed_df['aspects_sentiments'] = processed_df['aspects_sentiments'].apply(
                    lambda x: '{}' if pd.isna(x) or not isinstance(x, str) else x
                )

                processed_ids = set(processed_df['item_unique_id'])
                df_to_process = df[~df['item_unique_id'].isin(processed_ids)].copy()
                print(f"✅ **Loaded {len(processed_df)} previously annotated rows.**")
                print(f"🧠 **{len(df_to_process)} rows remaining to be annotated.**")
            except Exception as e:
                print(f"⚠️ **Error loading incremental file: {e}. Starting from scratch.**")
                processed_df = None
                df_to_process = df.copy()
                if os.path.exists(self.incremental_annotated_path):
                    try:
                        os.remove(self.incremental_annotated_path)
                        print(f"✅ **Removed corrupted incremental file: {self.incremental_annotated_path}**")
                    except Exception as e:
                        print(f"⚠️ **Failed to remove incremental file: {e}**")

        if df_to_process.empty:
            print("✅ **No new rows to process. Finalizing data.**")
            df = processed_df.copy()
        else:
            print(f"🧠 **Running Dynamic ABSA on {len(df_to_process)} rows...**")
            start_time = time.time()

            write_mode = 'a' if processed_df is not None and not processed_df.empty else 'w'
            header = False if processed_df is not None and not processed_df.empty else True

            pbar = tqdm(range(0, len(df_to_process), self.absa_chunk_size), desc="Performing Dynamic ABSA")
            for i in pbar:
                chunk_df = df_to_process.iloc[i:i + self.absa_chunk_size].copy()
                reviews_list = chunk_df['reviewText'].tolist()

                try:
                    results = self._extract_multi_aspects(reviews_list)
                    chunk_df['aspects_sentiments'] = [json.dumps(res) for res in results]
                    try:
                        chunk_df.to_csv(self.incremental_annotated_path, mode=write_mode, header=header, index=False)
                        with open(self.incremental_annotated_path, 'a') as f:
                            f.flush()
                            os.fsync(f.fileno())
                        print(f"✅ **Saved {len(chunk_df)} rows to incremental file (chunk {i//self.absa_chunk_size + 1})**")
                        write_mode = 'a'
                        header = False
                    except Exception as e:
                        print(f"❌ **Error saving chunk to incremental file: {e}**")
                        raise
                    del chunk_df, reviews_list, results
                    if self.overall_device_idx != -1:
                        torch.cuda.empty_cache()
                    gc.collect()
                except RuntimeError as e:
                    if "CUDA out of memory" in str(e):
                        print(f"\nFATAL ERROR: CUDA Out of Memory. GPU memory: {torch.cuda.memory_summary()}")
                        print("ACTION: Reduce absa_batch_size (e.g., to 1) and absa_chunk_size, then restart.")
                        raise
                    else:
                        print(f"❌ **Error processing chunk: {e}**")
                        raise

            print(f"✅ **Dynamic ABSA annotation complete in {time.time() - start_time:.2f} seconds.**")

            print("🔄 **Combining results...**")
            try:
                df = pd.read_csv(self.incremental_annotated_path)
                # Clean aspects_sentiments in combined data
                df['aspects_sentiments'] = df['aspects_sentiments'].apply(
                    lambda x: '{}' if pd.isna(x) or not isinstance(x, str) else x
                )
                print(f"✅ **Loaded combined incremental data: {len(df)} rows**")
            except Exception as e:
                print(f"❌ **Error loading combined incremental file: {e}**")
                raise

        print(f"💾 **Saving final annotated data to '{self.annotated_path}'...**")
        try:
            df.to_csv(self.annotated_path, index=False)
            print(f"✅ **Successfully saved final annotated data: {len(df)} rows**")
        except Exception as e:
            print(f"❌ **Error saving final annotated file: {e}**")
            raise

        if os.path.exists(self.incremental_annotated_path):
            try:
                os.remove(self.incremental_annotated_path)
                print(f"✅ **Removed temporary incremental file: {self.incremental_annotated_path}**")
            except Exception as e:
                print(f"⚠️ **Failed to remove incremental file: {e}. Manual removal recommended.**")

        self._save_embeddings(df)
        return df

    def _save_embeddings(self, df):
        """
        Load or generate and save embeddings, and initialize HNSW index.
        """
        print("💾 **Generating and saving embeddings**...")
        start_time = time.time()

        def format_aspects_sentiments(row):
            try:
                if pd.isna(row['aspects_sentiments']) or not isinstance(row['aspects_sentiments'], str):
                    return ""
                aspects_dict = json.loads(row['aspects_sentiments'])
                return " ".join([f"{aspect} {info['sentiment']}" for aspect, info in aspects_dict.items() if info['confidence'] > 0.6])
            except json.JSONDecodeError as e:
                print(f"⚠️ Invalid JSON in aspects_sentiments for row: {e}. Returning empty string.")
                return ""
            except Exception as e:
                print(f"⚠️ Error processing aspects_sentiments: {e}. Returning empty string.")
                return ""

        df['enriched_item_description'] = df['itemName'].astype(str) + " " + \
                                         df['category'].astype(str) + " " + \
                                         df['description'].astype(str) + " " + \
                                         df['feature'].astype(str) + " " + \
                                         df.apply(format_aspects_sentiments, axis=1)

        unique_items_df = df.drop_duplicates(subset='item_unique_id')
        unique_item_ids = unique_items_df['item_unique_id'].tolist()
        unique_enriched_descriptions = unique_items_df['enriched_item_description'].tolist()

        if not os.path.exists(self.enriched_item_descriptions_emb_path):
            print("🔄 Generating Enriched Item Description Embeddings...")
            enriched_description_embeddings = self.sbert.encode(unique_enriched_descriptions, convert_to_tensor=True, show_progress_bar=True)
            np.save(self.enriched_item_descriptions_emb_path, enriched_description_embeddings.cpu().numpy())
            print(f"✅ Saved to {self.enriched_item_descriptions_emb_path}")
            self.enriched_description_embeddings = enriched_description_embeddings.cpu().numpy()
        else:
            print("✅ Loading existing Enriched Item Description Embeddings...")
            self.enriched_description_embeddings = np.load(self.enriched_item_descriptions_emb_path)

        self.item_unique_id_to_embedding_map = {
            item_id: self.enriched_description_embeddings[i]
            for i, item_id in enumerate(unique_item_ids)
        }
        self.item_unique_id_to_name_map = dict(zip(unique_items_df['item_unique_id'], unique_items_df['itemName']))
        self.item_unique_id_to_enriched_text_map = dict(zip(unique_items_df['item_unique_id'], unique_items_df['enriched_item_description']))

        # Initialize HNSW index
        dim = self.enriched_description_embeddings.shape[1]
        self.hnsw_index = hnswlib.Index(space='cosine', dim=dim)
        if os.path.exists(self.hnsw_index_path):
            print(f"✅ Loading existing HNSW index from {self.hnsw_index_path}")
            self.hnsw_index.load_index(self.hnsw_index_path, max_elements=len(unique_item_ids))
        else:
            print("🔄 Building HNSW index...")
            self.hnsw_index.init_index(max_elements=len(unique_item_ids), ef_construction=200, M=16)
            self.hnsw_index.add_items(self.enriched_description_embeddings, list(range(len(unique_item_ids))))
            self.hnsw_index.save_index(self.hnsw_index_path)
            print(f"✅ Saved HNSW index to {self.hnsw_index_path}")

        self.hnsw_index.set_ef(50)  # Set ef for querying
        print(f"✅ Embeddings and HNSW index ready in {time.time() - start_time:.2f} seconds.")

    def _infer_product_category_and_attributes(self, user_query):
        """
        Infer product category and aspects from user query using HNSW for similarity search.
        """
        print(f"\n🔍 **Inferring from query**: '{user_query}'")
        query_embedding = self.sbert.encode(user_query, convert_to_numpy=True)

        query_aspects = self._extract_dynamic_aspects(user_query)
        if not query_aspects:
            query_aspects = ['general']

        aspect_results = []
        for aspect in query_aspects:
            input_text = f"[CLS] {user_query} [SEP] {aspect} [SEP]"
            try:
                result = self.absa_pipe(input_text)[0]
                if result['score'] > 0.6:
                    aspect_results.append((aspect, result['label'].capitalize(), result['score']))
            except Exception as e:
                print(f"⚠️ Error processing query aspect '{aspect}': {e}")

        all_item_ids = list(self.item_unique_id_to_embedding_map.keys())
        labels, distances = self.hnsw_index.knn_query(query_embedding, k=min(5, len(all_item_ids)))
        top_results = [(labels[0][i], 1 - distances[0][i]) for i in range(len(labels[0]))]  # Convert distance to similarity

        if top_results[0][1] > 0.4:
            best_match_id = all_item_ids[top_results[0][0]]
            inferred_item_row = self.df[self.df['item_unique_id'] == best_match_id].iloc[0]
            inferred_category = inferred_item_row['category']
            print(f"**Inferred Category**: {inferred_category}, Aspects: {aspect_results}")
            return inferred_item_row['itemName'], inferred_category, aspect_results

        print("**No specific item strongly inferred. Using general search.**")
        return None, "general", aspect_results

    def recommend(self, user_query, product_id=None, filter_category=None, top_n_results=10):
        """
        Recommend products using HNSW for fast similarity search, returning name, category, image, score, and confidence.
        """
        print(f"\n✨ **Starting Recommendation for query**: '{user_query}'")
        all_item_ids = list(self.item_unique_id_to_embedding_map.keys())

        _, _, query_aspects = self._infer_product_category_and_attributes(user_query)
        query_aspect_dict = {aspect: sentiment for aspect, sentiment, _ in query_aspects}

        if product_id:
            if product_id not in self.item_unique_id_to_embedding_map:
                print(f"❌ **Error**: Product ID '{product_id}' not found.")
                return []
            query_embedding = self.item_unique_id_to_embedding_map[product_id]
        else:
            query_embedding = self.sbert.encode(user_query, convert_to_numpy=True)

        # Perform HNSW search
        labels, distances = self.hnsw_index.knn_query(query_embedding, k=min(top_n_results + 1, len(all_item_ids)))
        cosine_scores = [1 - dist for dist in distances[0]]  # Convert distance to similarity

        aspect_scores = []
        for idx, item_idx in enumerate(labels[0]):
            item_id = all_item_ids[item_idx]
            item_row = self.df[self.df['item_unique_id'] == item_id].iloc[0]
            try:
                if pd.isna(item_row['aspects_sentiments']) or not isinstance(item_row['aspects_sentiments'], str):
                    item_aspects = {}
                else:
                    item_aspects = json.loads(item_row['aspects_sentiments'])
            except json.JSONDecodeError as e:
                print(f"⚠️ Invalid JSON in aspects_sentiments for item {item_id}: {e}. Using empty aspects.")
                item_aspects = {}
            except Exception as e:
                print(f"⚠️ Error processing aspects_sentiments for item {item_id}: {e}. Using empty aspects.")
                item_aspects = {}

            aspect_match_score = 0
            matched_aspects = 0
            top_confidence = 0.0  # Track highest confidence for matching aspects
            top_aspect = None
            for aspect, q_sentiment in query_aspect_dict.items():
                if aspect in item_aspects and item_aspects[aspect]['sentiment'] == q_sentiment:
                    confidence = item_aspects[aspect]['confidence']
                    aspect_match_score += confidence
                    matched_aspects += 1
                    if confidence > top_confidence:
                        top_confidence = confidence
                        top_aspect = aspect
            if matched_aspects > 0:
                aspect_match_score /= matched_aspects
            else:
                aspect_match_score = 0.5
                top_confidence = 0.5  # Default confidence if no matching aspects

            combined_score = 0.7 * cosine_scores[idx] + 0.3 * aspect_match_score
            aspect_scores.append((combined_score, item_idx, top_confidence))

        aspect_scores.sort(reverse=True)
        top_results = aspect_scores[:min(top_n_results + 1, len(all_item_ids))]

        recommended_products_info = []
        for score, idx, confidence in top_results:
            item_id = all_item_ids[idx]
            if product_id and item_id == product_id:
                continue
            product_info = self.df[self.df['item_unique_id'] == item_id].iloc[0]
            recommended_products_info.append({
                'name': product_info['itemName'],
                'category': product_info['category'],
                'image': product_info['image'],
                'score': score,
                'confidence': confidence
            })
            if len(recommended_products_info) == top_n_results:
                break

        print(f"✅ **Recommendation complete. Found {len(recommended_products_info)} products**.")
        return recommended_products_info

if __name__ == '__main__':
    print("Initializing Product Recommender with Dynamic ABSA and HNSW...")
    recommender = ProductRecommender(
        dataframe_path="/content/drive/MyDrive/finalyearResearch/Second_fixed_image_urls.csv",
        absa_batch_size=16,
        absa_chunk_size=200
    )

    print("\n\n===================================")
    print("🚀 Recommender is ready to use.")
    print("===================================\n")

    print("\n--- Test Case 1: Query with dynamic aspects ---")
    user_input_1 = "sound is not good enough"
    results_1 = recommender.recommend(user_input_1, top_n_results=10)
    print("\n✅ Final Recommendations for Test Case 1:")
    for result in results_1:
        print(f"Name: {result['name']}")
        print(f"Category: {result['category']}")
        print(f"Image: {result['image']}")
        print(f"Score: {result['score']:.3f}")
        print(f"Confidence: {result['confidence']:.3f}")
        print("---")


Initializing Product Recommender with Dynamic ABSA and HNSW...
📦 **Overall Device**: CPU
⚙️ **ABSA Settings**: chunk_size=200, batch_size=16


  self.df_original = pd.read_csv(dataframe_path)
Device set to use cpu


✅ **ABSA Model loaded with device_map='auto'**.
✅ **ABSA pipeline initialized (managed by Accelerate)**.
✅ **SBERT Model loaded on cpu**.
✅ **Ensured directory exists for incremental file: /content/drive/MyDrive/finalyearResearch/annoated**
📂 **Final annotated file found at '/content/drive/MyDrive/finalyearResearch/Second_fixed_image_urls.csv'. Loading it.**


  df = pd.read_csv(self.annotated_path)


✅ **Data appears complete with dynamic ABSA annotations.**
💾 **Generating and saving embeddings**...
✅ Loading existing Enriched Item Description Embeddings...
✅ Loading existing HNSW index from /content/drive/MyDrive/finalyearResearch/hnsw_index.bin
✅ Embeddings and HNSW index ready in 6.94 seconds.


🚀 Recommender is ready to use.


--- Test Case 1: Query with dynamic aspects ---

✨ **Starting Recommendation for query**: 'sound is not good enough'

🔍 **Inferring from query**: 'sound is not good enough'
**Inferred Category**: Musical_Instruments, Aspects: [('sound', 'Negative', 0.9812643527984619)]
✅ **Recommendation complete. Found 10 products**.

✅ Final Recommendations for Test Case 1:
Name: ION Audio Tailgater (iPA77) | Portable Bluetooth PA Speaker with Mic, AM/FM Radio, and USB Charge Port
Category: Musical_Instruments
Image: https://images-na.ssl-images-amazon.com/images/I/31N3dpTs36L.jpg
Score: 0.486
Confidence: 0.500
---
Name: Joso PS4 Headphones with Mic, Wired Chat Gaming H