In [None]:
"""
Machine Description Generator
Author: Meenakshi2434

Description: 
- Generates technical equipment descriptions using LLM and PDF data
- Processes Excel input files and PDF documentation
- Integrates with FAISS vector database for context retrieval
- Supports multilingual content translation
"""

import pandas as pd
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from tqdm import tqdm
import warnings
import os
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from PyPDF2 import PdfReader
import re
from langdetect import detect
from langdetect.lang_detect_exception import LangDetectException
from googletrans import Translator, LANGUAGES

warnings.filterwarnings('ignore')

# Configuration constants
MODEL_NAME = "meta-llama/Llama-3.2-3b"
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
CONFIDENCE_PDF = 1.0
CONFIDENCE_LLM = 0.7

# Status code mappings
MACHINE_TYPE_MAPPING = {1: "New", 2: "Used"}
CONDITION_MAPPING = {
    1: "In Stock", 2: "Running", 3: "Rebuilt", 
    4: "In Transit", 5: "In Production", 6: "Excellent"
}
AVAILABILITY_MAPPING = {
    1: "Immediately", 2: "Less than 30 days", 
    3: "More than 30 days", 4: "Immediately from stock"
}

def is_english(text: str) -> bool:
    """Robust language detection for English content"""
    if not text or not isinstance(text, str):
        return True
    try:
        cleaned = ' '.join(text.split())
        return detect(cleaned) == 'en' if len(cleaned) >= 10 else True
    except Exception:
        return True

def translate_to_english(text: str) -> str:
    """Translate non-English text to English with chunk processing"""
    if not text or not isinstance(text, str):
        return text
    translator = Translator()
    chunks = [text[i:i+1000] for i in range(0, len(text), 1000)]
    translated = []
    for chunk in chunks:
        try:
            if not is_english(chunk):
                translated.append(translator.translate(chunk, dest='en').text)
            else:
                translated.append(chunk)
        except Exception:
            translated.append(chunk)
    return ' '.join(translated)

def load_model():
    """Load pre-trained LLM with optimized settings"""
    try:
        tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
        model = AutoModelForCausalLM.from_pretrained(
            MODEL_NAME, 
            torch_dtype=torch.float16, 
            device_map="auto"
        )
        return model, tokenizer
    except Exception as e:
        raise RuntimeError(f"Model loading failed: {e}")

def clean_value(value) -> str:
    """Standardize and clean input values"""
    if pd.isna(value) or (isinstance(value, str) and value.lower() == 'nan'):
        return None
    return str(value).strip()

def parse_pdf_and_create_embeddings(folder_path: str) -> dict:
    """Extract and process PDF content into structured data"""
    pdf_data = {}
    if not os.path.exists(folder_path):
        print(f"Warning: PDF folder {folder_path} not found")
        return pdf_data

    for manufacturer in os.listdir(folder_path):
        mfg_path = os.path.join(folder_path, manufacturer)
        if not os.path.isdir(mfg_path):
            continue
        for model in os.listdir(mfg_path):
            model_path = os.path.join(mfg_path, model)
            if not os.path.isdir(model_path):
                continue
            combined_text = []
            for pdf_file in os.listdir(model_path):
                if pdf_file.lower().endswith('.pdf'):
                    pdf_path = os.path.join(model_path, pdf_file)
                    text = extract_text_from_pdf(pdf_path)
                    if text:
                        combined_text.append(preprocess_text(text))
            if combined_text:
                pdf_data[(manufacturer, model)] = " ".join(combined_text)
    return pdf_data

def extract_text_from_pdf(pdf_path: str) -> str:
    """Extract text content from PDF documents"""
    try:
        reader = PdfReader(pdf_path)
        return " ".join(page.extract_text() for page in reader.pages)
    except Exception:
        return ""

def preprocess_text(text: str) -> str:
    """Clean and normalize text content for processing"""
    if not text:
        return ""
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'(\w+)-\s+(\w+)', r'\1\2', text)
    text = text.replace('\n', ' ').strip()
    text = re.sub(r'Page \d+ of \d+', '', text)
    text = re.sub(r'\d+/\d+', '', text)
    return translate_to_english(text) if not is_english(text) else text

def create_vector_store(pdf_data: dict):
    """Create FAISS vector store for semantic search"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP
    )
    embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
    documents = []
    metadatas = []
    
    for (mfg, model), text in pdf_data.items():
        chunks = text_splitter.split_text(text)
        documents.extend(chunks)
        metadatas.extend([{"manufacturer": mfg, "model": model}] * len(chunks))
        
    return FAISS.from_texts(documents, embeddings, metadatas=metadatas)

def map_status_codes(row, manufacturer_mapping, model_mapping):
    """
    Maps numeric codes to their corresponding text values using the mapping dictionaries.
    
    Args:
        row (pd.Series): A row from the input DataFrame
        manufacturer_mapping (pd.DataFrame): DataFrame containing manufacturer code to name mappings
        model_mapping (pd.DataFrame): DataFrame containing model code to name mappings
        
    Returns:
        dict: Mapped values or None if mapping fails
    """
    try:
        # Clean input values
        mfg_code = clean_value(row.get('ManufacturerId'))
        model_code = clean_value(row.get('MachineModelId'))
        machine_type = clean_value(row.get('MachineTypeId'))
        condition = clean_value(row.get('MachineConditionId'))
        availability = clean_value(row.get('MachineAvailabilityId'))
        location = clean_value(row.get('Location'))
        year = clean_value(row.get('Year'))
        description = clean_value(row.get('Description'))
        
        # Validate required fields
        if not all([mfg_code, model_code]):
            print(f"Missing required codes for row: {row.name}")
            return None
            
        # Map manufacturer
        try:
            manufacturer = manufacturer_mapping.loc[
                manufacturer_mapping['Id'] == int(mfg_code), 
                'Name'
            ].iloc[0]
        except (IndexError, ValueError):
            print(f"Invalid manufacturer code: {mfg_code}")
            return None
            
        # Map model
        try:
            model = model_mapping.loc[
                model_mapping['MachineModelId'] == int(model_code), 
                'ModelName'
            ].iloc[0]
        except (IndexError, ValueError):
            print(f"Invalid model code: {model_code}")
            return None
            
        # Map other fields using the global mapping dictionaries
        mapped_data = {
            'ManufacturerId': manufacturer,
            'MachineModelId': model,
            'MachineTypeId': MACHINE_TYPE_MAPPING.get(int(machine_type)) if machine_type else 'Unknown',
            'MachineConditionId': CONDITION_MAPPING.get(int(condition)) if condition else 'Unknown',
            'MachineAvailabilityId': AVAILABILITY_MAPPING.get(int(availability)) if availability else 'Unknown',
            'Location': location or 'Unknown',
            'Year': year or 'Unknown',
            'Description': description or 'No description available'
        }
        
        return mapped_data
        
    except Exception as e:
        print(f"Error mapping status codes: {str(e)}")
        return None

def get_relevant_context(vector_store, manufacturer, model, k=3):
    """Retrieve relevant context from vector store with improved filtering."""
    query = f"{manufacturer} {model} technical specifications and features"
    docs = vector_store.similarity_search(query, k=k)
    
    # Combine docs but avoid repetition
    unique_content = set()
    filtered_content = []
    
    for doc in docs:
        sentences = doc.page_content.split('.')
        for sentence in sentences:
            cleaned = sentence.strip()
            if cleaned and cleaned not in unique_content:
                unique_content.add(cleaned)
                filtered_content.append(cleaned)
    
    return ". ".join(filtered_content)

template = """The {MachineConditionId} {MachineTypeId} machine, Model {MachineModelId} manufactured by {ManufacturerId}, is located in {Location}. This {Year} model is currently {MachineAvailabilityId} available. {context}"""


def create_prompt(row, manufacturer_mapping, model_mapping, vector_store, user_description=None):
    """
    Create a prompt for description generation based on row data.
    
    Args:
        row (pd.Series): Input data row
        manufacturer_mapping (pd.DataFrame): Manufacturer mapping data
        model_mapping (pd.DataFrame): Model mapping data
        vector_store: FAISS vector store
        user_description (str, optional): Additional user-provided description
        
    Returns:
        str: Generated prompt or None if mapping fails
    """
    try:
        # Map the status codes first
        mapped_data = map_status_codes(row, manufacturer_mapping, model_mapping)
        if not mapped_data:
            return None
            
        # Get relevant context from vector store
        context = get_relevant_context(
            vector_store,
            mapped_data['ManufacturerId'],
            mapped_data['MachineModelId']
        )
        
        # Format the template with the mapped data
        prompt = template.format(
            **mapped_data,
            context=context,
            location=mapped_data['Location'],
            description=mapped_data['Description']
        )
        
        # Add the user-provided description to the prompt, if available
        if user_description:
            prompt += f"\n\nAdditional Details:\n{user_description}"
        
        return prompt
        
    except Exception as e:
        print(f"Error creating prompt: {str(e)}")
        return None

def clean_generated_text(text):
    """Enhanced cleaning of generated text with improved error handling."""
    if not text:
        return "Error: Empty generated text."
    
    try:
        # Remove unwanted introductory phrases
        text = re.sub(r'You are a technical writer.*?\n\n', '', text, flags=re.DOTALL)
        text = re.sub(r'Create a detailed, professional description.*?Response must be in English only.', '', text, flags=re.DOTALL)
        
        # Clean up formatting
        text = re.sub(r'\s+', ' ', text).strip()
        text = re.sub(r'([.!?])\s*([A-Z])', r'\1 \2', text)
        
        # Ensure complete sentences
        if not text.endswith(('.', '!', '?')):
            text = text.rstrip() + '.'
        
        # Verify English content and length
        if not is_english(text):
            translated = translate_to_english(text)
            if translated and len(translated.split()) >= 3:
                return translated
            return "Error: Unable to generate valid English description."
        
        # Check for minimum meaningful content
        if len(text.split()) < 3:
            return "Error: Generated description too short."
        
        return text
    except Exception as e:
        print(f"Text cleaning error: {str(e)}")
        return "Error: Text cleaning failed."

def generate_description(model, tokenizer, prompt: str) -> str:
    """Generate equipment descriptions using LLM"""
    try:
        inputs = tokenizer(prompt, return_tensors="pt", 
                         truncation=True, max_length=2048).to(model.device)
        outputs = model.generate(
            **inputs,
            max_new_tokens=500,
            temperature=0.7,
            top_p=0.9,
            repetition_penalty=1.2,
            no_repeat_ngram_size=3,
            do_sample=True,
            num_return_sequences=1,
            pad_token_id=tokenizer.eos_token_id,
        )
        raw_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
        return clean_generated_text(raw_text)
    except Exception as e:
        print(f"Generation error: {e}")
        return "Error generating description"

def process_excel(input_file: str, output_file: str, pdf_folder: str):
    """Main processing pipeline for Excel files"""
    try:
        model, tokenizer = load_model()
        pdf_data = parse_pdf_and_create_embeddings(pdf_folder)
        vector_store = create_vector_store(pdf_data)
        df = pd.read_excel(input_file)
        
        df['Generated_Description'] = ''
        df['Source'] = ''
        df['Confidence_Score'] = 0.0
        
        for idx, row in tqdm(df.iterrows(), total=len(df)):
            mfg = clean_value(row['ManufacturerId'])
            model_code = clean_value(row['MachineModelId'])
            
            if (mfg, model_code) in pdf_data:
                context = pdf_data[(mfg, model_code)]
                desc = generate_description(model, tokenizer, context)
                df.at[idx, 'Generated_Description'] = desc
                df.at[idx, 'Source'] = 'PDF'
                df.at[idx, 'Confidence_Score'] = CONFIDENCE_PDF
            else:
                prompt = create_prompt(row, vector_store)
                if prompt:
                    desc = generate_description(model, tokenizer, prompt)
                    df.at[idx, 'Generated_Description'] = desc
                    df.at[idx, 'Source'] = 'LLM'
                    df.at[idx, 'Confidence_Score'] = CONFIDENCE_LLM
        
        df.to_excel(output_file, index=False)
        print(f"Processing complete: {output_file}")
    except Exception as e:
        print(f"Processing failed: {e}")

if __name__ == "__main__":
    process_excel(
        input_file="data/input/MD_data.xlsx",
        output_file="output/descriptions.xlsx",
        pdf_folder="data/pdfs"
    )