<a href="https://www.kaggle.com/code/sagarsahu123/mechanic-mitra?scriptVersionId=282792942" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# üîß Mechanic-Mitra: Production-Ready Multi-Agent AI Vehicle Diagnostic System

## üì¶ Step 1: Install Dependencies

In [None]:
%pip install -q google-generativeai pillow fpdf2 python-dotenv ipywidgets duckduckgo-search requests beautifulsoup4

## üîë Step 2: Configure API Key (SECURE)

In [None]:
import os
import google.generativeai as genai

# ‚úÖ SECURE: Use Kaggle Secrets (for Kaggle) or .env (for local)
try:
    from kaggle_secrets import UserSecretsClient
    user_secrets = UserSecretsClient()
    GOOGLE_API_KEY = user_secrets.get_secret("GOOGLE_API_KEY")
except:
    from dotenv import load_dotenv
    load_dotenv()
    GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
    if not GOOGLE_API_KEY:
        raise ValueError("‚ùå ERROR: GOOGLE_API_KEY not found! Add it to Kaggle Secrets or .env file")

os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
genai.configure(api_key=GOOGLE_API_KEY)

## üõ†Ô∏è Step 3: Market Price Tool (Web Scraping + Estimation)

In [None]:
class PriceAgent:
    """
    Intelligent Price Estimation Agent using Gemini AI
    This agent estimates automotive part prices based on context and market knowledge.
    """
    
    def __init__(self, model_name='gemini-2.5-flash'):
        """Initialize the price estimation agent."""
        try:
            self.model = genai.GenerativeModel(model_name)
        except Exception as e:
            raise RuntimeError(f"‚ùå Failed to initialize PriceAgent: {str(e)}")
    
    def estimate_price(self, part_name, vehicle_info=None, part_context=None):
        """
        Estimate price for an automotive part using AI.
        
        Args:
            part_name: Name of the automotive part
            vehicle_info: Optional dict with vehicle details (make, model, year)
            part_context: Optional context about why this part is needed
        
        Returns:
            str: Price range in format "INR min-max"
        """
        try:
            
            # Build context-aware prompt
            prompt = f"""You are an expert in Indian automotive parts pricing with deep market knowledge.

Part to estimate: {part_name}

Context:
- Market: India (prices in INR)
- Current year: 2024
- Consider both OEM and aftermarket options
"""
            
            if vehicle_info:
                prompt += f"\nVehicle: {vehicle_info.get('make', 'Generic')} {vehicle_info.get('model', '')} ({vehicle_info.get('year', 'Recent')})"
            
            if part_context:
                prompt += f"\nReason for replacement: {part_context}"
            
            prompt += """\n
Task:
1. Analyze the part category and typical market prices in India
2. Consider factors like: part complexity, material, brand variance
3. Provide a realistic price RANGE (not single price) in INR
4. Account for both budget and premium options

Important:
- Return ONLY the price range in this exact format: "INR min-max"
- Example: "INR 2500-4500"
- Use realistic Indian market prices for 2024
- Round to nearest 50 or 100

Price range:"""
            
            # Get AI estimation
            response = self.model.generate_content(prompt)
            price_text = response.text.strip()
            
            # Validate format
            if 'INR' in price_text and '-' in price_text:
                return price_text
            else:
                # Try to extract price from response
                import re
                match = re.search(r'INR\s*([\d,]+)\s*-\s*([\d,]+)', price_text, re.IGNORECASE)
                if match:
                    price_formatted = f"INR {match.group(1)}-{match.group(2)}"
                    return price_formatted
                else:
                    print(f"   [WARN] AI response format unexpected: {price_text[:50]}...")
                    return "INR 1000-5000"  # Safe fallback
        
        except Exception as e:
            print(f"   [ERROR] Price estimation failed: {str(e)}")
            return "INR 1000-5000"  # Safe fallback
    
    def estimate_batch(self, part_names, vehicle_info=None):
        """
        Estimate prices for multiple parts at once (more efficient).
        
        Args:
            part_names: List of part names
            vehicle_info: Optional vehicle details
        
        Returns:
            dict: {part_name: price_range}
        """
        results = {}
        for part_name in part_names:
            results[part_name] = self.estimate_price(part_name, vehicle_info)
        return results

In [None]:
# Initialize Price Agent (global instance)
price_agent = PriceAgent()

def get_market_prices(part_name: str):
    """
    Tool function for Gemini to get automotive part prices.
    Uses AI-powered PriceAgent for intelligent estimation.
    
    Args:
        part_name: Name of the automotive part
    
    Returns:
        str: Price range in INR
    """
    return price_agent.estimate_price(part_name)



## ü§ñ Step 4: Optimized Diagnostic Agent (with Better Error Handling)

In [None]:
import time
import json

class OptimizedDiagnosticAgent:
    def __init__(self, model_name='gemini-2.5-flash', max_retries=3):
        """
        Initialize the diagnostic agent with error handling.
        
        Args:
            model_name: Gemini model to use
            max_retries: Number of retry attempts for API failures
        """
        try:
            self.model = genai.GenerativeModel(model_name, tools=[get_market_prices])
            self.max_retries = max_retries
        except Exception as e:
            raise RuntimeError(f"‚ùå Failed to initialize agent: {str(e)}")
    
    def analyze_complete(self, image_data, audio_path, language="English"):
        """
        Perform complete multi-modal analysis with retry logic and error handling.
        """
        prompt = f"""
You are an expert automotive diagnostic AI system. Analyze BOTH the image and audio comprehensively.

VISUAL ANALYSIS:
- Examine the image for visible damage, rust, wear, leaks, or anomalies
- Be specific about parts and conditions observed

AUDIO ANALYSIS:
- Listen carefully to engine sounds for abnormal noises
- Identify knocking, grinding, squealing, or irregular patterns
- Correlate sounds with potential mechanical issues

COMPREHENSIVE DIAGNOSIS:
Based on your visual and audio findings:
1. Identify the root cause of any issues
2. List affected components with their conditions
3. Recommend specific parts for replacement
4. Use the `get_market_prices` tool to get prices for each recommended part

Output Format (JSON):
{{
    "visual_analysis": "Detailed visual inspection findings in {language}",
    "audio_analysis": "Detailed audio analysis findings in {language}",
    "diagnosis": "Complete diagnosis with root cause analysis in {language}",
    "components": [
        {{"component": "Component name", "condition": "Status", "notes": "Detailed notes"}},
        ...
    ],
    "parts": [
        {{"part_name": "Part name", "price_inr": price_from_tool}},
        ...
    ]
}}

Provide thorough, professional analysis in {language}.
"""
        
        # Retry logic with exponential backoff
        for attempt in range(self.max_retries):
            try:
                
                # Upload audio file
                try:
                    audio_file = genai.upload_file(path=audio_path)
                except Exception as e:
                    raise RuntimeError(f"Failed to upload audio file: {str(e)}")
                
                # Single combined API call
                try:
                    chat = self.model.start_chat(enable_automatic_function_calling=True)
                    response = chat.send_message([prompt, image_data, audio_file])
                    return response.text
                except Exception as e:
                    raise RuntimeError(f"API call failed: {str(e)}")
                
            except Exception as e:
                error_msg = str(e)
                print(f"   ‚ö†Ô∏è Attempt {attempt + 1} failed: {error_msg}")
                
                if attempt < self.max_retries - 1:
                    wait_time = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
                    time.sleep(wait_time)
                else:
                    # Final failure
                    error_report = {
                        "visual_analysis": "Analysis failed - unable to process image",
                        "audio_analysis": "Analysis failed - unable to process audio",
                        "diagnosis": f"ERROR: Analysis failed after {self.max_retries} attempts. Error: {error_msg}",
                        "components": [],
                        "parts": []
                    }
                    return json.dumps(error_report)
        
        return json.dumps({"error": "Unexpected failure"})

## üìÑ Step 5: PDF Generator (Production-Ready)

In [None]:
import re
from fpdf import FPDF
from fpdf.fonts import FontFace

class DiagnosticPDF(FPDF):
    def header(self):
        if self.page_no() == 1:
            self.set_font('helvetica', 'B', 16)
            self.cell(0, 10, 'Mechanic-Mitra Diagnostic Report', border=0, align='C')
            self.ln(15)
        else:
            self.ln(10)
    
    def footer(self):
        self.set_y(-15)
        self.set_font('helvetica', '', 8)
        self.cell(0, 10, f'Page {self.page_no()}', align='C')
    
    def clean_text(self, text):
        """Remove emojis and unsupported characters"""
        text = re.sub(r'[\U00010000-\U0010ffff]', '', text)
        text = text.replace('**', '')
        text = re.sub(r'\s+', ' ', text).strip()
        return text
    
    def chapter_title(self, title):
        self.set_font('helvetica', 'B', 14)
        self.set_fill_color(200, 220, 255)
        title = self.clean_text(title)
        self.cell(0, 10, title, border=0, fill=True, align='L')
        self.ln(8)
    
    def chapter_body(self, body):
        self.set_font('helvetica', '', 11)
        body = self.clean_text(body)
        for line in body.split('\n'):
            if line.strip():
                self.multi_cell(0, 6, line.strip())
            else:
                self.ln(3)
        self.ln(5)
    
    def add_component_table(self, components):
        if not components:
            return
        
        self.chapter_title("Component Inspection")
        self.set_font('helvetica', '', 9)
        
        headers = ["Component", "Condition", "Notes"]
        data = []
        for component in components:
            data.append([
                self.clean_text(str(component.get('component', 'Unknown'))),
                self.clean_text(str(component.get('condition', 'Unknown'))),
                self.clean_text(str(component.get('notes', '')))
            ])
        
        # Create FontFace object properly
        headings_style = FontFace(emphasis="BOLD", fill_color=(200, 220, 255))
        
        with self.table(
            borders_layout="SINGLE_TOP_LINE",
            cell_fill_color=(240, 240, 240),
            cell_fill_mode="ROWS",
            col_widths=(25, 20, 55),
            headings_style=headings_style,
            line_height=self.font_size * 1.5,
            text_align=("LEFT", "CENTER", "LEFT"),
            width=190,
            padding=2
        ) as table:
            header_row = table.row()
            for header in headers:
                header_row.cell(header)
            for row_data in data:
                data_row = table.row()
                for cell_data in row_data:
                    data_row.cell(cell_data)
        self.ln(5)
    
    def add_price_table(self, parts):
        if not parts:
            return
        
        self.chapter_title("Estimated Parts Costs")
        self.set_font('helvetica', '', 10)
        
        headers = ["Part Name", "Estimated Price (INR)"]
        data = []
        total_low = 0
        total_high = 0
        
        for part in parts:
            name = self.clean_text(str(part.get('part_name', 'Unknown')))
            raw_price = str(part.get('price_inr', '0')).replace('INR', '').strip()
            
            try:
                if '-' in raw_price:
                    low, high = map(float, raw_price.split('-'))
                    total_low += low
                    total_high += high
                else:
                    val = float(raw_price.replace(',', ''))
                    total_low += val
                    total_high += val
            except:
                pass
            
            data.append([name, f"INR {raw_price}"])
        
        # Add total row - we'll make it bold using set_font
        data.append(["TOTAL", f"INR {total_low:,.0f}-{total_high:,.0f}"])
        
        # Create FontFace object properly
        headings_style = FontFace(emphasis="BOLD", fill_color=(200, 220, 255))
        
        with self.table(
            borders_layout="ALL",
            col_widths=(70, 30),
            headings_style=headings_style,
            line_height=self.font_size * 1.5,
            text_align=("LEFT", "RIGHT"),
            width=190,
            padding=2
        ) as table:
            header_row = table.row()
            for header in headers:
                header_row.cell(header)
            
            # Regular rows
            for i, row_data in enumerate(data):
                data_row = table.row()
                # Make last row (total) bold
                if i == len(data) - 1:
                    for cell_data in row_data:
                        data_row.cell(cell_data, style=FontFace(emphasis="BOLD"))
                else:
                    for cell_data in row_data:
                        data_row.cell(cell_data)
        self.ln(10)

def generate_pdf(visual_text, audio_text, diagnosis_text, parts, components):
    pdf = DiagnosticPDF()
    pdf.add_page()
    pdf.chapter_title("Visual Analysis")
    pdf.chapter_body(visual_text)
    pdf.chapter_title("Audio Analysis")
    pdf.chapter_body(audio_text)
    pdf.chapter_title("Chief Mechanic Diagnosis")
    pdf.chapter_body(diagnosis_text)
    if components:
        pdf.add_component_table(components)
    if parts:
        pdf.add_price_table(parts)
    return pdf.output()

## üìÅ Step 6: File Upload Widgets

In [None]:
from IPython.display import display, Image as IPImage, Audio
from PIL import Image
from ipywidgets import FileUpload, HTML as HTMLWidget, VBox
import io

image_upload = FileUpload(accept='image/*', multiple=False)
audio_upload = FileUpload(accept='audio/*', multiple=False)

display(VBox([
    HTMLWidget("<h3>Upload Vehicle Image:</h3>"),
    image_upload,
    HTMLWidget("<h3>Upload Engine Audio:</h3>"),
    audio_upload
]))

print("\nüìÅ Upload your files using the widgets above")

## üñºÔ∏è Step 7: Preview Uploaded Files

In [None]:
from IPython.display import HTML, display, Image as IPImage, Audio
from PIL import Image
import io

# Helper function to get the first uploaded file safely
def get_first_upload(upload_widget):
    if not upload_widget.value:
        return None
    
    # Handle tuple (new ipywidgets)
    if isinstance(upload_widget.value, tuple):
        return upload_widget.value[0]
    
    # Handle dict (older ipywidgets)
    if isinstance(upload_widget.value, dict):
        # Get the first value from the dictionary
        return list(upload_widget.value.values())[0]
        
    # Handle list (very old ipywidgets)
    if isinstance(upload_widget.value, list):
        return upload_widget.value[0]
        
    return None

# Get files
uploaded_image = get_first_upload(image_upload)
uploaded_audio = get_first_upload(audio_upload)

# Display Image
if uploaded_image:
    display(HTML("<h4>Uploaded Image:</h4>"))
    try:
        # Check if content is memoryview or bytes
        img_content = uploaded_image['content']
        if hasattr(img_content, 'tobytes'):
            img_content = img_content.tobytes()
            
        image_data = Image.open(io.BytesIO(img_content))
        display(image_data)
    except Exception as e:
        print(f"‚ùå Error loading image: {e}")
else:
    print("‚ÑπÔ∏è No image uploaded yet.")

# Display Audio
if uploaded_audio:
    display(HTML("<h4>Uploaded Audio:</h4>"))
    try:
        # Check if content is memoryview or bytes
        audio_content = uploaded_audio['content']
        if hasattr(audio_content, 'tobytes'):
            audio_content = audio_content.tobytes()
            
        audio_data = Audio(audio_content)
        display(audio_data)
    except Exception as e:
        print(f"‚ùå Error loading audio: {e}")
else:
    print("‚ÑπÔ∏è No audio uploaded yet.")


## üî¨ Step 8: Run Diagnosis

In [None]:
import tempfile
import json

print("="*60)
print("üîß MECHANIC-MITRA DIAGNOSTIC SYSTEM")
print("="*60)

try:
    # Validate uploads
    if not uploaded_image or not uploaded_audio:
        raise ValueError("Please upload both image and audio files first!")
    
    # Initialize agent
    diagnostic_agent = OptimizedDiagnosticAgent()
    language = "English"
    
    # Save audio to temp file
    print("\nüì§ Preparing files for analysis...")
    with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_audio:
        tmp_audio.write(uploaded_audio['content'])
        tmp_audio_path = tmp_audio.name
    
    # Run analysis
    print("\n" + "="*60)
    print("ü§ñ Running comprehensive multi-modal analysis...")
    print("="*60)
    
    raw_response = diagnostic_agent.analyze_complete(
        image_data, 
        tmp_audio_path, 
        language=language
    )
    
    # Parse JSON response
    try:
        start_idx = raw_response.find('{')
        end_idx = raw_response.rfind('}') + 1
        if start_idx != -1 and end_idx != -1:
            json_str = raw_response[start_idx:end_idx]
            diagnosis_data = json.loads(json_str)
        else:
            diagnosis_data = {
                "visual_analysis": "Could not parse response",
                "audio_analysis": "Could not parse response",
                "diagnosis": raw_response,
                "parts": [],
                "components": []
            }
    except json.JSONDecodeError as e:
        print(f"‚ö†Ô∏è JSON parsing error: {e}")
        diagnosis_data = {
            "visual_analysis": "JSON parsing failed",
            "audio_analysis": "",
            "diagnosis": raw_response,
            "parts": [],
            "components": []
        }
    
    # Extract for display
    visual_report = diagnosis_data.get("visual_analysis", "Not available")
    audio_report = diagnosis_data.get("audio_analysis", "Not available")
    components = diagnosis_data.get("components", [])
    parts = diagnosis_data.get("parts", [])
    
    # Cleanup
    import os
    os.unlink(tmp_audio_path)
    
    # Show results
    print("\n" + "="*60)
    print("üìä RESULTS PREVIEW")
    print("="*60)
    print(f"\nüëÅÔ∏è  Visual: {visual_report[:100]}...")
    print(f"\nüëÇ Audio: {audio_report[:100]}...")
    print(f"\nüîß Diagnosis: {diagnosis_data.get('diagnosis', '')[:100]}...")
    print(f"\nüì¶ Parts: {len(parts)} | Components: {len(components)}")
    print("\n" + "="*60)
    print("="*60)
    print("DIAGNOSTIC COMPLETED!")
    print("="*60)
except Exception as e:
    print(f"\n‚ùå Error: {str(e)}")
    import traceback
    traceback.print_exc()

In [None]:
from IPython.display import display, HTML, Markdown

def parse_price_safe(price_val):
    """Safely parses price from string or number"""
    if isinstance(price_val, (int, float)):
        return price_val
    if isinstance(price_val, str):
        try:
            clean = price_val.replace('INR', '').replace(',', '').strip()
            if '-' in clean:
                low, high = map(float, clean.split('-'))
                return (low + high) / 2
            return float(clean)
        except:
            return 0
    return 0

if 'parts' in locals() and 'diagnosis_data' in locals():
    # 1. Display Total Cost
    total_cost = sum(parse_price_safe(part.get('price_inr', 0)) for part in parts)
    display(HTML(f"<div style='background-color: #f0f8ff; padding: 15px; border-radius: 5px; border: 1px solid #007bff;'>" 
                 f"<h3 style='color: black; margin-top:0;'>Mechanic-Mitra Diagnostic Report</h3>"))
    
    # 2. Display Diagnosis Summary
    visual = diagnosis_data.get('visual_analysis', 'N/A')
    audio = diagnosis_data.get('audio_analysis', 'N/A')
    diagnosis = diagnosis_data.get('diagnosis', 'N/A')
    
    display(Markdown(f"### üëÅÔ∏è Visual Analysis\n{visual}\n"))
    display(Markdown(f"### üëÇ Audio Analysis\n{audio}\n"))
    display(Markdown(f"### üîß Comprehensive Diagnosis\n{diagnosis}\n"))
    
    # 3. Display Parts List
    if parts:
        parts_html = "<table style='width:100%; border-collapse: collapse;'>"
    # Headers: Black text
        parts_html += "<tr style='background-color: #f2f2f2;'><th style='padding: 8px; border: 1px solid #ddd; color: black;'>Part Name</th><th style='padding: 8px; border: 1px solid #ddd; color: black;'>Price (INR)</th></tr>"
    
        for part in parts:
        # Data Rows: White text
            parts_html += f"<tr><td style='padding: 8px; border: 1px solid #ddd; color: white;'>{part.get('part_name', 'Unknown')}</td>"
            parts_html += f"<td style='padding: 8px; border: 1px solid #ddd; color: white;'>{part.get('price_inr', 'N/A')}</td></tr>"
        
        parts_html += "</table>"
        display(HTML(f"<h3>üì¶ Required Parts</h3>{parts_html}"))
    else:
        print("‚ÑπÔ∏è Diagnosis data not available yet. Run the diagnosis step first.")

## üìÑ Step 9: Generate PDF Report

In [None]:
from IPython.display import FileLink

try:
    if not diagnosis_data:
        raise ValueError("No diagnosis data available. Run Step 8 first.")
    
    pdf_bytes = generate_pdf(
        visual_report,
        audio_report,
        diagnosis_data.get("diagnosis", ""),
        parts,
        components
    )
    
    pdf_filename = "mechanic_mitra_diagnosis.pdf"
    with open(pdf_filename, "wb") as f:
        f.write(pdf_bytes)
    
    print(f"‚úÖ PDF Report Generated: {pdf_filename}")
    print(f"   Size: {len(pdf_bytes):,} bytes")
    display(FileLink(pdf_filename))
    
except Exception as e:
    print(f"‚ùå PDF generation failed: {e}")
    import traceback
    traceback.print_exc()