In [None]:
import pandas as pd

In [None]:
df = pd.read_excel("/content/Bosch_Dataset.xlsx")
df.head()

In [None]:
dfnew = df.drop(columns=["Review_Section", "Error_Type"])

In [None]:
dfnew.head()

In [None]:
import os
import re
import json
import pandas as pd
import numpy as np
from dotenv import load_dotenv
from openai import AzureOpenAI
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

class LLMErrorDetector:
    """
    Detects error types in product reviews using an LLM (Azure OpenAI),
    categorizing them as 'Semantic Misalignment', 'Specialized Error', or 'Clean'.
    """

    def __init__(self, openai_api_key: str = None, api_base: str = None, api_version: str = None, deployment: str = None):
        """
        Initialize with explicit parameters or environment variables.
        """
        load_dotenv()  # Load environment variables from .env file

        # Set configuration with explicit parameters or environment variables
        self.api_key = openai_api_key or os.environ.get("AZURE_OPENAI_API_KEY", "")
        self.api_version = api_version or os.environ.get("AZURE_OPENAI_API_VERSION", "2024-08-01-preview")
        self.api_base = api_base or os.environ.get("AZURE_OPENAI_ENDPOINT", "")
        self.deployment = deployment or os.environ.get("AZURE_OPENAI_DEPLOYMENT", "gpt-4o")

        # Initialize client only if we have required credentials
        self.client = None
        self.api_available = False

        if self.api_key and self.api_base:
            try:
                self.client = AzureOpenAI(
                    api_key=self.api_key,
                    api_version=self.api_version,
                    azure_endpoint=self.api_base
                )
                # Test the connection
                self._test_connection()
            except Exception as e:
                print(f"Failed to initialize Azure OpenAI client: {e}")
                print("Will continue with local fallback classification method.")

    def _test_connection(self):
        """Test the API connection with a minimal request"""
        try:
            # Simple test to check if the API works
            response = self.client.chat.completions.create(
                model=self.deployment,
                messages=[
                    {"role": "system", "content": "Hello"},
                    {"role": "user", "content": "Test connection"}
                ],
                max_tokens=5
            )
            self.api_available = True
            print("Azure OpenAI API connection successful!")
        except Exception as e:
            self.api_available = False
            print(f"Azure OpenAI API connection test failed: {e}")
            print("API Authentication Error: Please check your Azure OpenAI API credentials.")
            print("1. Verify your API key is correct.")
            print("2. Confirm your endpoint URL is correct.")
            print("3. Make sure your API version is valid.")
            print("4. Check that your deployment name exists.")

    def _clean_json_response(self, raw_output: str) -> str:
        """
        Cleans the raw output from the API to extract just the JSON part.
        Handles cases where JSON is wrapped in markdown code blocks.
        """
        # Remove markdown code block syntax if present
        code_block_pattern = r"```(?:json)?\s*([\s\S]*?)```"
        match = re.search(code_block_pattern, raw_output)

        if match:
            # Extract the content inside the code block
            return match.group(1).strip()

        return raw_output.strip()

    def analyze_review(self, product_name: str, review_text: str) -> dict:
        """
        Analyzes a review using Azure OpenAI or falls back to a simple rule-based method
        if the API is not available.
        """
        if self.api_available and self.client:
            return self._analyze_with_api(product_name, review_text)
        else:
            return self._fallback_analysis(product_name, review_text)

    def _analyze_with_api(self, product_name: str, review_text: str) -> dict:
        """Use Azure OpenAI API to analyze the review"""

        user_prompt = f"""
You are a Bosch power tool expert. Analyze the following review and classify it into one of these categories:

1. Clean: The review contains no errors in understanding of the product's functionality or terminology.

2. Semantic Misalignment: The review contains misunderstandings of the tool's function or usage
(e.g., using a drill to cut wood), misuse of product-related terms (e.g., calling a cordless drill "battery-free"),
or wrong assumptions about what a tool can do.

3. Specialized Error: The review contains highly technical errors related to specific tool features, performance metrics,
or specialized usage contexts that would only be known to professionals or experts in the field.

Tool: '{product_name}'
Review: '{review_text}'

Your response must be valid JSON in this format:
{{
  "Error_Type": "Clean" or "Semantic Misalignment" or "Specialized Error",
  "Review_Section": "Brief explanation of why the review is classified this way",
  "Corrective_Measure": "How the review can be corrected if there is an error, or N/A if clean"
}}

DO NOT wrap the JSON in markdown code blocks or any other formatting - just return the raw JSON object.
"""

        try:
            response = self.client.chat.completions.create(
                model=self.deployment,
                messages=[
                    {
                        "role": "system",
                        "content": "You are a domain expert analyzing product reviews for different types of errors. Always return only valid JSON without markdown formatting."
                    },
                    {
                        "role": "user",
                        "content": user_prompt
                    }
                ],
                temperature=0.0
            )

            raw_output = response.choices[0].message.content.strip()

            # Clean the response to extract just the JSON part
            cleaned_json_str = self._clean_json_response(raw_output)

            try:
                output_json = json.loads(cleaned_json_str)
                return output_json
            except json.JSONDecodeError as e:
                print(f"Failed to parse JSON: {cleaned_json_str}")
                print(f"Error: {e}")

                # Try a more aggressive approach to extract valid JSON
                try:
                    # Find anything that looks like a JSON object
                    json_pattern = r"\{[\s\S]*\}"
                    match = re.search(json_pattern, cleaned_json_str)
                    if match:
                        potential_json = match.group(0)
                        return json.loads(potential_json)
                except:
                    pass

                # If all parsing attempts fail, extract the basic information manually
                error_type_match = re.search(r'"Error_Type":\s*"([^"]+)"', cleaned_json_str)
                review_section_match = re.search(r'"Review_Section":\s*"([^"]+)"', cleaned_json_str)
                corrective_match = re.search(r'"Corrective_Measure":\s*"([^"]+)"', cleaned_json_str)

                return {
                    "Error_Type": error_type_match.group(1) if error_type_match else "Unknown",
                    "Review_Section": review_section_match.group(1) if review_section_match else "Failed to parse",
                    "Corrective_Measure": corrective_match.group(1) if corrective_match else "No suggestion"
                }

        except Exception as e:
            print(f"OpenAI API call failed: {e}")
            return {
                "Error_Type": "Unknown",
                "Review_Section": f"API Error: {str(e)}",
                "Corrective_Measure": "No suggestion due to API error"
            }

    def _fallback_analysis(self, product_name: str, review_text: str) -> dict:
        """
        Simplified rule-based fallback method when API is unavailable.
        This is just a very basic implementation - you may want to enhance it.
        """
        review_lower = review_text.lower()

        # Very simple keyword-based classification
        semantic_keywords = [
            "battery-free", "cordless", "wireless", "cut wood with drill",
            "hammer with screwdriver", "screwdriver to chisel", "without power",
            "router", "sds-max", "precision drilling"
        ]

        specialized_keywords = [
            "torque rating", "rpm", "amperage", "voltage incorrect",
            "technical specification", "wrong specs"
        ]

        # Check for semantic misalignment
        for keyword in semantic_keywords:
            if keyword.lower() in review_lower:
                return {
                    "Error_Type": "Semantic Misalignment",
                    "Review_Section": f"Contains potential semantic error with term: {keyword}",
                    "Corrective_Measure": "Please verify the correct terminology and tool usage"
                }

        # Check for specialized errors
        for keyword in specialized_keywords:
            if keyword.lower() in review_lower:
                return {
                    "Error_Type": "Specialized Error",
                    "Review_Section": f"Contains potential specialized error: {keyword}",
                    "Corrective_Measure": "Please review technical specifications and correct"
                }

        # Default to Clean if no keywords match
        return {
            "Error_Type": "Clean",
            "Review_Section": "No obvious errors detected (fallback analysis)",
            "Corrective_Measure": "N/A"
        }

    def process_reviews(self, reviews_file: str) -> pd.DataFrame:
        """
        Reads an Excel file with columns including Product_ID, Product_Name, Review, Error_Type (ground truth),
        analyzes each review, adds prediction columns, and calculates accuracy metrics.
        """
        try:
            df = pd.read_excel(reviews_file)
        except Exception as e:
            print(f"Error reading Excel file: {e}")
            return pd.DataFrame()

        # Check if the ground truth column exists
        has_ground_truth = "Error_Type" in df.columns

        if has_ground_truth:
            # Rename the ground truth column to avoid overwriting
            df.rename(columns={"Error_Type": "True_Error_Type"}, inplace=True)

        # Add or reset prediction columns
        df["Error_Type"] = None
        df["Review_Section"] = None
        df["Corrective_Measure"] = None

        # Process each review
        for idx, row in df.iterrows():
            product_name = row.get("Product_Name", "")
            review_text = row.get("Review", "")

            # Skip if essential data is missing
            if not product_name or not review_text:
                df.at[idx, "Error_Type"] = "Unknown"
                df.at[idx, "Review_Section"] = "Missing product name or review text"
                df.at[idx, "Corrective_Measure"] = "N/A"
                continue

            # Analyze the review
            parsed_output = self.analyze_review(product_name, review_text)

            # Update the dataframe with the analysis results
            df.at[idx, "Error_Type"] = parsed_output.get("Error_Type", "Unknown")
            df.at[idx, "Review_Section"] = parsed_output.get("Review_Section", "")
            df.at[idx, "Corrective_Measure"] = parsed_output.get("Corrective_Measure", "")

            # Show progress
            if (idx + 1) % 10 == 0 or idx == len(df) - 1:
                print(f"Processed {idx + 1}/{len(df)} reviews")

        # Calculate accuracy metrics if ground truth is available
        if has_ground_truth:
            self.calculate_accuracy(df)

        # Save results
        output_file = reviews_file.replace(".xlsx", "_analyzed.xlsx")
        try:
            df.to_excel(output_file, index=False)
            print(f"Results saved to: {output_file}")
        except Exception as e:
            print(f"Error saving results to Excel: {e}")

        return df

    def calculate_accuracy(self, df: pd.DataFrame) -> None:
        """
        Calculate and print accuracy metrics based on ground truth labels.
        """
        print("\n===== ACCURACY METRICS =====")

        # Filter out rows with missing values
        valid_rows = df.dropna(subset=["True_Error_Type", "Error_Type"])

        if len(valid_rows) == 0:
            print("No valid rows with both ground truth and predictions for accuracy calculation.")
            return

        # Overall accuracy
        accuracy = accuracy_score(valid_rows["True_Error_Type"], valid_rows["Error_Type"])
        print(f"Overall Accuracy: {accuracy:.2f}")

        # Class-specific metrics
        print("\nClassification Report:")
        report = classification_report(
            valid_rows["True_Error_Type"],
            valid_rows["Error_Type"],
            zero_division=0
        )
        print(report)

        # Confusion matrix
        print("\nConfusion Matrix:")
        labels = ["Clean", "Semantic Misalignment", "Specialized Error"]

        # Filter to only include labels that exist in the data
        existing_labels = list(set(list(valid_rows["True_Error_Type"].unique()) +
                                  list(valid_rows["Error_Type"].unique())))
        existing_labels = [label for label in labels if label in existing_labels]

        cm = confusion_matrix(
            valid_rows["True_Error_Type"],
            valid_rows["Error_Type"],
            labels=existing_labels
        )

        # Create a DataFrame for better visualization
        cm_df = pd.DataFrame(
            cm,
            index=[f"True: {label}" for label in existing_labels],
            columns=[f"Pred: {label}" for label in existing_labels]
        )
        print(cm_df)

        # Calculate per-class counts and accuracies
        print("\nPer-Class Performance:")
        classes = ["Clean", "Semantic Misalignment", "Specialized Error"]
        for cls in classes:
            true_count = sum(valid_rows["True_Error_Type"] == cls)
            correct_count = sum((valid_rows["True_Error_Type"] == cls) & (valid_rows["Error_Type"] == cls))

            if true_count > 0:
                class_accuracy = correct_count / true_count
                print(f"{cls}: {correct_count}/{true_count} correct ({class_accuracy:.2f})")
            else:
                print(f"{cls}: No ground truth instances")


def main():
    # Get user input for API credentials
    print("Azure OpenAI Credentials (leave blank to use environment variables):")
    api_key = input("API Key (press Enter to use environment variable): ").strip() or None
    api_base = input("API Endpoint (press Enter to use environment variable): ").strip() or None
    api_version = input("API Version (press Enter to use default '2024-08-01-preview'): ").strip() or None
    deployment = input("Deployment Name (press Enter to use default 'gpt-4o'): ").strip() or None

    # Initialize the detector
    detector = LLMErrorDetector(
        openai_api_key=api_key,
        api_base=api_base,
        api_version=api_version,
        deployment=deployment
    )

    # Get the input file path
    input_file = input("Enter the path to your Excel file (e.g., sample_reviews.xlsx): ").strip()

    if not os.path.exists(input_file):
        print(f"File not found: {input_file}")
        return

    # Process the reviews
    df_result = detector.process_reviews(input_file)

    if not df_result.empty:
        print("\nFinal Results Sample:")
        columns_to_show = ["Product_ID", "Product_Name", "Review"]
        if "True_Error_Type" in df_result.columns:
            columns_to_show.append("True_Error_Type")
        columns_to_show.extend(["Error_Type", "Review_Section", "Corrective_Measure"])

        print(df_result[columns_to_show].head())

if __name__ == "__main__":
    main()