In [1]:
# Install required packages
!pip install gradio gTTS PyPDF2 python-docx pandas python-pptx google-generativeai langchain chromadb tiktoken pytesseract easyocr

# Standard library imports
import csv
import json
import os
import random
import re
import tempfile
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Union

# Third-party imports
import cv2
import docx
import easyocr
import google.generativeai as genai
import gradio as gr
import numpy as np
import pandas as pd
import PyPDF2
import pytesseract
from gtts import gTTS
from pptx import Presentation

# Configure Google API
os.environ['GOOGLE_API_KEY'] = "AIzaSyCufC37xgbXAS-gJPccd29YBz2pP1jzH2I"
genai.configure(api_key=os.environ['GOOGLE_API_KEY'])

Collecting gradio
  Downloading gradio-5.9.0-py3-none-any.whl.metadata (16 kB)
Collecting gTTS
  Downloading gTTS-2.5.4-py3-none-any.whl.metadata (4.1 kB)
Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Collecting python-docx
  Downloading python_docx-1.1.2-py3-none-any.whl.metadata (2.0 kB)
Collecting python-pptx
  Downloading python_pptx-1.0.2-py3-none-any.whl.metadata (2.5 kB)
Collecting chromadb
  Downloading chromadb-0.5.23-py3-none-any.whl.metadata (6.8 kB)
Collecting tiktoken
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Collecting easyocr
  Downloading easyocr-1.7.2-py3-none-any.whl.metadata (10 kB)
Collecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.6-py3-none-any.wh

In [2]:
from typing import Dict, Any,Optional
from dataclasses import dataclass, field

# Gradio User Interface (Citation Task Working fine for PDF,PPTX,CSV files)

## DOCX File Citataion(Working for Citation sources and migrating to the source guide but not highlighting text response)

In [4]:
import uuid
import shutil

class FileInfo:
    def __init__(self, path: str):
        self.path = path
        self.name = os.path.basename(path)
        self.size = os.path.getsize(path)

def extract_text_from_pdf(file_path: str) -> str:
    try:
        with open(file_path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            text_with_pages = []
            for page_num, page in enumerate(reader.pages, 1):
                page_text = page.extract_text()
                text_with_pages.append(f'<div id="page{page_num}" class="page-content">[PAGE {page_num}]\n{page_text}</div>')
            text = '\n'.join(text_with_pages)
            text_file_path = f"{os.path.splitext(file_path)[0]}_text.txt"
            with open(text_file_path, 'w', encoding='utf-8') as txt_file:
                txt_file.write(text)
            return text
    except Exception as e:
        return f"Error processing PDF: {str(e)}"

def extract_text_from_pptx(file_path: str) -> str:
    try:
        prs = Presentation(file_path)
        text = '\n'.join(
            f"[SLIDE {i+1}]\n{shape.text}"
            for i, slide in enumerate(prs.slides)
            for shape in slide.shapes
            if hasattr(shape, "text")
        )
        return text
    except Exception as e:
        return f"Error processing PPTX: {str(e)}"

def extract_text_from_docx(file_path: str) -> str:
    doc = docx.Document(file_path)
    return "\n".join([para.text for para in doc.paragraphs])

def extract_text_from_csv_excel(file_path: str) -> str:
    try:
        ext = os.path.splitext(file_path)[1].lower()
        if ext == '.csv':
            df = pd.read_csv(file_path)
            rows_with_markers = []
            for idx, row in df.iterrows():
                row_text = [f"[ROW {idx+1}]"]
                for col in df.columns:
                    row_text.append(f"{col}: {row[col]}")
                rows_with_markers.append('\n'.join(row_text))
            full_data = '\n\n'.join(rows_with_markers)
            text = (
                f"File Summary:\n"
                f"Total Rows: {len(df)}\n"
                f"Columns: {', '.join(df.columns)}\n\n"
                f"Complete Data:\n{full_data}\n\n"
                f"Basic Statistics:\n{df.describe().to_string()}\n\n"
                f"Searchable Fields: {', '.join(df.columns)}\n"
            )
            return text
    except Exception as e:
        return f"Error processing CSV/Excel: {str(e)}"

def extract_text_from_excel(file_path: str) -> str:
    try:
        ext = os.path.splitext(file_path)[1].lower()
        if ext == '.xlsx':
            df = pd.read_excel(file_path)
            rows_with_markers = []
            for idx, row in df.iterrows():
                row_marker = f"[ROW {idx+1}]"
                row_text = [row_marker]
                for col in df.columns:
                    row_text.append(f"{col}: {row[col]}")
                rows_with_markers.append('\n'.join(row_text))
            full_data = '\n\n'.join(rows_with_markers)
            text = (
                f"File Summary:\n"
                f"Total Rows: {len(df)}\n"
                f"Columns: {', '.join(df.columns)}\n\n"
                f"Complete Data:\n{full_data}\n\n"
                f"Basic Statistics:\n{df.describe().to_string()}\n\n"
                f"Searchable Fields: {', '.join(df.columns)}\n"
            )
            return text
    except Exception as e:
        return f"Error processing CSV/Excel: {str(e)}"

def preprocess_image(image_path: str) -> np.ndarray:
    img = cv2.imread(image_path)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    gray = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)[1]
    coords = np.column_stack(np.where(gray > 0))
    angle = cv2.minAreaRect(coords)[-1]
    if angle < -45:
        angle = -(90 + angle)
    else:
        angle = -angle
    (h, w) = gray.shape[:2]
    center = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    rotated = cv2.warpAffine(gray, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
    return rotated

def extract_text_from_image(image_path: str, method: str = 'pytesseract') -> str:
    try:
        if method == 'pytesseract':
            preprocessed_img = preprocess_image(image_path)
            text = pytesseract.image_to_string(preprocessed_img)
        elif method == 'easyocr':
            reader = easyocr.Reader(['en'])
            results = reader.readtext(image_path)
            text = ' '.join([result[1] for result in results])
        else:
            raise ValueError("Invalid OCR method. Choose 'pytesseract' or 'easyocr'.")
        return text.strip()
    except Exception as e:
        return f"Error extracting text from image: {str(e)}"

def extract_text_from_charts_and_graphs(image_path: str) -> Dict:
    try:
        img = cv2.imread(image_path)
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        pytesseract_text = extract_text_from_image(image_path, 'pytesseract')
        easyocr_text = extract_text_from_image(image_path, 'easyocr')
        chart_info = {
            'text_pytesseract': pytesseract_text,
            'text_easyocr': easyocr_text,
            'contour_count': len(contours),
            'approximate_chart_type': _identify_chart_type(img)
        }
        return chart_info
    except Exception as e:
        return {"error": f"Error analyzing chart: {str(e)}"}

def _identify_chart_type(img: np.ndarray) -> str:
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    edges = cv2.Canny(gray, 50, 150)
    contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    aspect_ratios = [cv2.contourArea(cnt) / (cv2.arcLength(cnt, True) ** 2) for cnt in contours]
    if len(contours) > 10:
        if np.mean(aspect_ratios) < 0.1:
            return "Bar Chart"
        elif np.mean(aspect_ratios) > 0.5:
            return "Pie Chart"
        else:
            return "Complex Chart/Graph"
    return "Simple Image/Graphic"

def process_file(file_path: str) -> Tuple[str, str]:
    ext = os.path.splitext(file_path)[1].lower()
    file_name = os.path.basename(file_path)
    try:
        if ext == '.pdf':
            return extract_text_from_pdf(file_path)
        elif ext == '.docx':
            return extract_text_from_docx(file_path)
        elif ext == '.pptx':
            return extract_text_from_pptx(file_path)
        elif ext == '.csv':
            return extract_text_from_csv_excel(file_path)
        elif ext == '.xlsx':
            return extract_text_from_excel(file_path)
        elif ext == '.txt':
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        elif ext in ['.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.webp']:
            image_text = extract_text_from_image(file_path, 'pytesseract')
            if not image_text.strip():
                image_text = extract_text_from_image(file_path, 'easyocr')
            if len(image_text) < 50:
                chart_info = extract_text_from_charts_and_graphs(file_path)
                return f"Chart/Graph Analysis: {json.dumps(chart_info)}"
            return image_text
    except Exception as e:
        return f"Error processing file: {str(e)}"
    return "Unsupported file type."

def text_to_audio(text: str) -> str:
    try:
        if not text or not text.strip():
            log_to_file("Empty text provided for audio conversion", "WARNING")
            return ""

        cleaned_text = text.strip()

        tts = gTTS(text=cleaned_text, lang='en')
        audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
        tts.save(audio_file.name)

        log_to_file("Successfully converted text to audio", "INFO")
        return audio_file.name
    except Exception as e:
        error_msg = f"Error converting text to audio: {str(e)}"
        log_to_file(error_msg, "ERROR")
        return ""

def log_to_file(message: str, level: str = "INFO") -> None:
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_message = f"[{timestamp}] [{level}] {message}"
    with open('app.log', 'a+', encoding='utf-8') as log_file:
        log_file.write(f"{log_message}\n")
        log_file.flush()

def save_notes(notes: str, filename: str = "notes.json") -> Tuple[bool, str]:
    try:
        if not notes or not isinstance(notes, str):
            log_to_file("Invalid notes format", "ERROR")
            return False, "Invalid notes format"
        os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
        existing_notes = []
        if os.path.exists(filename):
            with open(filename, 'r', encoding='utf-8') as f:
                existing_notes = json.load(f)
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        note_entry = {"timestamp": timestamp, "content": notes}
        existing_notes.append(note_entry)
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(existing_notes, f, indent=4, ensure_ascii=False)
        log_to_file("Notes saved successfully", "INFO")
        return True, "Notes saved successfully!"
    except Exception as e:
        error_msg = f"Error saving notes: {str(e)}"
        log_to_file(error_msg, "ERROR")
        return False, error_msg

def safe_file_processing(file_path: str) -> Dict:
    try:
        if os.path.isdir(file_path):
            log_to_file(f"Skipping directory: {file_path}", "WARNING")
            return {"error": "Directory paths are not supported"}

        if file_path and os.path.isfile(file_path):
            file_info = {
                "name": os.path.basename(file_path),
                "size": os.path.getsize(file_path),
                "path": file_path
            }
            log_to_file(f"Successfully processed file: {file_path}")
            return file_info
        log_to_file(f"File not found: {file_path}", "WARNING")
        return {"error": f"File not found: {file_path}"}
    except Exception as e:
        log_to_file(f"Error processing file {file_path}: {str(e)}", "ERROR")
        return {"error": f"Error processing file: {str(e)}"}

def update_file_list(files: List[str]) -> Tuple[List[Dict], gr.components.CheckboxGroup]:
    log_to_file(f"Updating file list with {len(files)} files")
    valid_files = [
        safe_file_processing(file)
        for file in files
        if not os.path.isdir(file) and "error" not in safe_file_processing(file)
    ]
    log_to_file(f"Valid files processed: {len(valid_files)}")
    return valid_files, gr.CheckboxGroup(
        choices=[f["name"] for f in valid_files],
        value=[],
        label="Select files to delete"
    )

def add_more_files(new_files: List[str], existing_files: Optional[List[Dict]]) -> Tuple[List[Dict], gr.components.CheckboxGroup]:
    log_to_file(f"Adding {len(new_files)} new files to existing {len(existing_files) if existing_files else 0} files")
    if existing_files is None:
        existing_files = []
    upload_dir = "uploaded_files"
    os.makedirs(upload_dir, exist_ok=True)
    new_file_infos = []

    for file_path in new_files:
        # Explicitly check and skip directories
        if os.path.isdir(file_path):
            log_to_file(f"Skipping directory: {file_path}", "WARNING")
            continue

        file_name = os.path.basename(file_path)
        destination_path = os.path.join(upload_dir, file_name)
        try:
            with open(file_path, 'rb') as src_file:
                with open(destination_path, 'wb') as dest_file:
                    dest_file.write(src_file.read())
            new_file_infos.append(safe_file_processing(destination_path))
        except Exception as e:
            log_to_file(f"Error copying file {file_path}: {str(e)}", "ERROR")
            continue

    # Combine new files with existing files
    all_files = existing_files + [f for f in new_file_infos if "error" not in f]
    unique_files = {f["path"]: f for f in all_files}.values()
    log_to_file(f"Total unique files after addition: {len(unique_files)}")

    # Process all files together
    return update_file_list([f["path"] for f in unique_files])

def delete_selected_files(files_to_delete: List[str], all_files: List[Dict]) -> Tuple[List[Dict], gr.components.CheckboxGroup, str]:
    log_to_file(f"Attempting to delete {len(files_to_delete)} files")
    if not all_files:
        log_to_file("No files available for deletion", "WARNING")
        return [], gr.CheckboxGroup(choices=[], value=[], label="Select files to delete"), "Please upload files to begin."
    if not files_to_delete:
        log_to_file("No files selected for deletion", "WARNING")
        return all_files, gr.CheckboxGroup(choices=[f["name"] for f in all_files], value=[]), ""
    remaining_files = []
    deleted_count = 0
    error_count = 0
    for file_info in all_files:
        if file_info["name"] not in files_to_delete:
            remaining_files.append(file_info)
        else:
            try:
                if os.path.exists(file_info["path"]):
                    os.remove(file_info["path"])
                    deleted_count += 1
                    log_to_file(f"Successfully deleted file: {file_info['path']}")
                else:
                    log_to_file(f"File not found for deletion: {file_info['path']}", "WARNING")
            except Exception as e:
                error_count += 1
                log_to_file(f"Error deleting file {file_info['path']}: {str(e)}", "ERROR")
    log_to_file(f"Deletion summary - Successful: {deleted_count}, Failed: {error_count}, Remaining: {len(remaining_files)}")
    message = "Please upload files to begin." if not remaining_files else ""
    return remaining_files, gr.CheckboxGroup(choices=[f["name"] for f in remaining_files], value=[]), message

MODELS = {
    'gemini-1.5-pro': genai.GenerativeModel('gemini-1.5-pro'),
    'gemini-1.5-flash-8b': genai.GenerativeModel('gemini-1.5-flash-8b'),
    'gemini-1.5-flash': genai.GenerativeModel('gemini-1.5-flash')
}
MODEL_OPTIONS = list(MODELS.keys())

def generate_suggested_questions(content: str, model_name: str = 'gemini-1.5-pro') -> List[str]:
    prompt = (f"Based on the content provided in the following document, please suggest up to 3 clear and simple questions "
              f"that are directly and easily answerable using information only from the document. "
              f"Ensure that each question is specific to a fact, section, or topic explicitly mentioned in the document "
              f"and that the answers are found directly in the text. Avoid generating any vague, complex, or abstract "
              f"questions that require external information or inference beyond what's stated in the document. "
              f"Each question must have a clear, direct answer based only on the document's content. "
              f"If the document does not contain enough information to generate 3 questions, generate as many as possible "
              f"that meet the criteria. "
              f"Additionally, follow these guidelines: "
              f"1. Identify key sections and topics within the document to ensure questions cover a range of content. "
              f"2. Focus on factual information, dates, names, definitions, and specific details mentioned in the document. "
              f"3. Avoid questions that require interpretation, opinion, or synthesis of information beyond the document. "
              f"4. Ensure each question is concise and can be answered in one or two sentences. "
              f"5. Cross-check each question to confirm that the answer is explicitly stated in the document. "
              f"6. Prioritize questions that highlight the most important and relevant information in the document. "
              f"7. If a section of the document is particularly dense with information, consider generating multiple questions from that section and don't give answers in front of questions generated. "
              f"{content[:10000000]} "
              f"Make sure the questions are straightforward and factual, and can be easily answered by referring to the content.")
    response = MODELS[model_name].generate_content(prompt)
    return [question.strip() for question in response.text.split('\n') if question.strip()][:3]

def generate_combined_summary(docs: List[str], model_name: str = 'gemini-1.5-pro') -> Tuple[str, str]:
    all_content = ""
    for doc in docs:
        # Extract the file path from the dictionary
        file_path = doc['path'] if isinstance(doc, dict) else doc
        file_content = process_file(file_path)  # Ensure file_path is a string
        if not file_content.startswith("Error"):
            all_content += f"Document: {os.path.basename(file_path)}\n{file_content}\n\n"
    if not all_content:
        return "No valid documents to summarize.", ""
    summary_prompt = f"Provide a concise summary of the following content from multiple documents:\n{all_content}"
    combined_summary = MODELS[model_name].generate_content(summary_prompt).text
    audio_file = text_to_audio(combined_summary)
    return combined_summary, audio_file

def summarize_and_suggest_questions(history: List, docs: List[str], stored_doc: List[str], model_name: str = 'gemini-1.5-pro') -> Tuple[List, List[str], List[str]]:
    if not docs:
        return history, stored_doc, ["Please upload documents to generate questions."]
    document_suggested_questions = []
    combined_summary, combined_audio = generate_combined_summary(docs, model_name)
    history.append(["Combined Summary of All Documents:", combined_summary])
    history.append(["", gr.Audio(value=combined_audio, label="Audio for Combined Summary")])
    for doc in docs:
        file_content = process_file(doc['path'])
        if not file_content.startswith("Error"):
            stored_doc.append(file_content)
            file_name = os.path.basename(doc['path'])
            summary_prompt = f"Provide a short summary of the following content:\n{file_content}"
            summary = MODELS[model_name].generate_content(summary_prompt).text
            audio_file = text_to_audio(summary)
            history.append([f"Summary for {file_name}:", summary])
            history.append(["", gr.Audio(value=audio_file, label=f"Audio Summary for {file_name}")])
            suggested_questions = generate_suggested_questions(file_content, model_name)
            document_suggested_questions.extend(suggested_questions)
        else:
            history.append([f"Error processing {os.path.basename(doc['path'])}:", file_content])
    return history, stored_doc, document_suggested_questions

def update_csv(serial_num: int, doc_name: str, doc_type: str, question: str, answer: str, csv_file_path: str = 'qa_log.csv') -> None:
    file_exists = os.path.isfile(csv_file_path)
    with open(csv_file_path, mode='a', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        if not file_exists:
            writer.writerow(["Serial Number", "Document Name", "Type of Document", "Question", "Answer"])
        writer.writerow([serial_num, doc_name, doc_type, question, answer])

def save_current_chat(chat_history: List) -> str:
    success, message = save_notes_to_document(chat_history_to_string(chat_history))
    return message

def chat_history_to_string(chat_history: List) -> str:
    chat_string = ""
    for i, (user_msg, bot_msg) in enumerate(chat_history):
        chat_string += f"Q: {user_msg}\nA: {bot_msg}\n"
        if i < len(chat_history) - 1:
            chat_string += "---\n"
    return chat_string

def save_notes_to_document(notes: str, filename: str = "chat_history.docx") -> Tuple[bool, str]:
    try:
        doc = docx.Document(filename) if os.path.exists(filename) else docx.Document()
        doc.add_heading(f"Chat History from {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", level=1)
        for line in notes.split('\n'):
            if line.startswith('Q:'):
                p = doc.add_paragraph()
                p.add_run('Question: ').bold = True
                p.add_run(line[2:].strip())
            elif line.startswith('A:'):
                p = doc.add_paragraph()
                p.add_run('Answer: ').bold = True
                p.add_run(line[2:].strip())
            elif line.strip():
                doc.add_paragraph(line)
        doc.add_paragraph('---')
        doc.save(filename)
        log_to_file(f"Chat history saved to document: {filename}", "INFO")
        return True, "Chat saved successfully!"
    except Exception as e:
        error_msg = f"Error saving chat history to document: {str(e)}"
        log_to_file(error_msg, "ERROR")
        return False, "Chat not saved successfully."

def find_relevant_passages(content: str, query: str, response: str, file_type: str) -> List[Dict]:
    query_terms = set(query.lower().split())
    response_terms = set(response.lower().split())
    search_terms = query_terms.union(response_terms)
    stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'is', 'are', 'was', 'were'}
    search_terms = {term for term in search_terms if term not in stop_words}
    chunks = []

    if file_type == 'pdf':
        pages = content.split('[PAGE ')
        for page in pages[1:]:
            try:
                page_num = int(page.split(']')[0])
                page_text = page.split(']')[1]
                sentences = [s.strip() + '.' for s in page_text.split('.') if s.strip()]

                for sentence in sentences:
                    term_matches = sum(1 for term in search_terms if term in sentence.lower())
                    if term_matches > 1 or any(
                        phrase.lower() in sentence.lower()
                        for phrase in response.split('.')
                        if len(phrase.split()) > 2
                    ):
                        chunks.append({
                            'identifier': f"page_{page_num}",
                            'text': sentence,
                            'relevance': term_matches,
                            'type': 'pdf'
                        })
            except:
                continue

    elif file_type in ['csv', 'xlsx', 'xls']:
        sections = content.split('Complete Data:')[1].split('Basic Statistics:')[0] if 'Complete Data:' in content else content
        rows = sections.split('[ROW ')
        for row in rows[1:]:
            try:
                row_num = int(row.split(']')[0])
                row_text = row.split(']')[1].strip()
                term_matches = sum(1 for term in search_terms if term in row_text.lower())

                if term_matches > 0:
                    chunks.append({
                        'identifier': f"row_{row_num}",
                        'text': row_text,
                        'relevance': term_matches,
                        'type': 'spreadsheet'
                    })
            except:
                continue

    elif file_type == 'pptx':
        slides = content.split('[SLIDE ')
        for slide in slides[1:]:
            try:
                slide_num = int(slide.split(']')[0])
                slide_text = slide.split(']')[1]
                term_matches = sum(1 for term in search_terms if term in slide_text.lower())

                if term_matches > 1:
                    chunks.append({
                        'identifier': f"slide_{slide_num}",
                        'text': slide_text,
                        'relevance': term_matches,
                        'type': 'pptx'
                    })
            except:
                continue

    elif file_type in ['txt', 'docx']:
        paragraphs = content.split('\n')
        current_section = []
        section_num = 1

        for i, paragraph in enumerate(paragraphs):
            if paragraph.strip():
                # Add paragraph markers for better navigation
                marked_paragraph = f'<div id="para_{i+1}" class="paragraph-content">{paragraph}</div>'
                current_section.append(marked_paragraph)

                if len(current_section) >= 3 or len(paragraph) > 200:
                    section_text = ' '.join(current_section)
                    term_matches = sum(1 for term in search_terms if term.lower() in section_text.lower())

                    # Check for exact phrase matches from the response
                    response_phrases = [phrase.strip() for phrase in response.split('.') if len(phrase.strip().split()) > 2]
                    phrase_matches = sum(1 for phrase in response_phrases if phrase.lower() in section_text.lower())

                    relevance_score = term_matches + (phrase_matches * 2)  # Weight phrase matches more heavily

                    if relevance_score > 0:
                        chunks.append({
                            'identifier': f"para_{i-len(current_section)+1}",
                            'text': section_text,
                            'relevance': relevance_score,
                            'type': 'document',
                            'paragraph_number': i-len(current_section)+1
                        })
                    section_num += 1
                    current_section = []

    chunks.sort(key=lambda x: x['relevance'], reverse=True)
    return chunks[:3]

HIGHLIGHT_COLORS = [
    "rgba(255, 255, 0, 0.3)",  # Yellow
    "rgba(173, 216, 230, 0.3)",  # Light Blue
    "rgba(255, 165, 0, 0.3)",  # Orange
    "rgba(144, 238, 144, 0.3)",  # Light Green
    "rgba(255, 192, 203, 0.3)"   # Light Pink
]

def llm_response(history: List, text: str, docs: List[str], stored_doc: List[str], serial_num: int, model_name: str = 'gemini-1.5-pro') -> Tuple[List, List[str], int, str, str]:
    if not text.strip():
        return history, [], serial_num, "", ""

    highlighted_content = ""
    source_chunks_html = []
    docs_response = None

    if docs:
        all_docs_content = {}
        for doc in docs:
            try:
                file_ext = os.path.splitext(doc['path'])[1].lower()
                file_content = process_file(doc['path'])
                if not file_content.startswith("Error"):
                    # Add paragraph markers for DOCX and TXT files
                    if file_ext in ['.txt', '.docx']:
                        paragraphs = file_content.split('\n')
                        marked_content = []
                        for i, para in enumerate(paragraphs, 1):
                            if para.strip():
                                marked_content.append(f'<div id="para_{i}" class="paragraph-content">{para}</div>')
                        file_content = '\n'.join(marked_content)

                    all_docs_content[doc['path']] = {
                        'content': file_content,
                        'type': file_ext[1:]  # Remove the dot from extension
                    }
            except Exception as e:
                log_to_file(f"Error processing file {doc['path']}: {str(e)}", "ERROR")
                continue

        if all_docs_content:
            # Determine the most relevant document
            relevance_prompt = f"""
            From these documents, which one is most likely to contain the answer to this question?
            Question: {text}
            Documents:
            {chr(10).join(f"{os.path.basename(doc)}: {info['content'][:500]}" for doc, info in all_docs_content.items())}

            Respond with ONLY the filename of the most relevant document.
            """

            most_relevant_doc = MODELS[model_name].generate_content(relevance_prompt).text.strip()

            # Find the full path of the most relevant document
            most_relevant_path = next(
                (doc for doc in all_docs_content if os.path.basename(doc) == most_relevant_doc),
                list(all_docs_content.keys())[0]  # fallback to first document if no match
            )

            doc_info = all_docs_content[most_relevant_path]

            context_check = MODELS[model_name].generate_content(
                f"Based on this document content, can the following question be fully answered using only this information? Reply with just 'yes' or 'no'.\n\nDocument: {doc_info['content']}\n\nQuestion: {text}"
            ).text.lower().strip()

            if context_check == 'yes':
                docs_response = MODELS[model_name].generate_content(
                    f"Answer this question using only the provided document content. Also specify which document type ({doc_info['type'].upper()}) contains the main answer. Include specific details and quotes when possible.\n\nDocument: {doc_info['content']}\n\nQuestion: {text}"
                ).text

                chunks = find_relevant_passages(
                    doc_info['content'],
                    text,
                    docs_response,
                    doc_info['type']
                )

                if chunks:
                    doc_name = os.path.basename(most_relevant_path)
                    highlighted_content += f'<div class="source-guide"><h4>{doc_name}</h4>'
                    highlighted_text = doc_info['content']

                    for chunk in chunks:
                        color = random.choice(HIGHLIGHT_COLORS)
                        identifier = chunk['identifier']
                        chunk_text = chunk['text']

                        # Format source reference based on file type
                        if chunk['type'] == 'pdf':
                            page_num = identifier.split('_')[1]
                            source_ref = f'[PDF Page {page_num}]'
                        elif chunk['type'] in ['csv', 'xlsx', 'xls']:
                            row_num = identifier.split('_')[1]
                            source_ref = f'[Row {row_num}]'
                        elif chunk['type'] == 'pptx':
                            slide_num = identifier.split('_')[1]
                            source_ref = f'[Slide {slide_num}]'
                        else:
                            section_num = identifier.split('_')[1]
                            source_ref = f'[Section {section_num}]'

                        source_chunks_html.append(
                            f'<a href="#{identifier}" class="chunk-link">{source_ref}</a>'
                        )

                        highlighted_chunk = (
                            f'<div id="{identifier}">'
                            f'<span class="highlight" style="background-color: {color}">'
                            f'{chunk_text}</span>'
                            f'</div>'
                        )
                        highlighted_text = highlighted_text.replace(chunk_text, highlighted_chunk)

                    highlighted_content += f"""
                        <div class="source-content" id="scrollable-content">
                            {highlighted_text}
                        </div>
                    </div>
                    """

    response_text = docs_response or "I couldn't find a relevant answer in the provided documents."
    if source_chunks_html:
        response_text += "\n\nSources: " + " ".join(source_chunks_html)

    response_audio = text_to_audio(response_text)
    history.append([text, response_text])
    history.append(["", gr.Audio(value=response_audio, label="Audio Response")])

    new_suggested_questions = []
    if docs:
        for doc in docs:
            file_content = process_file(doc['path'])
            suggested_questions = generate_suggested_questions(file_content, model_name)
            new_suggested_questions.extend(suggested_questions)
            if docs_response:
                doc_name = os.path.basename(doc['path'])
                doc_type = os.path.splitext(doc_name)[1].lower()
                update_csv(serial_num, doc_name, doc_type, text, response_text)

    serial_num += 1
    save_current_chat(history)
    return history, new_suggested_questions, serial_num, "", highlighted_content

def safe_file_upload(files):
    """
    Safely process uploaded files, filtering out directories and handling potential upload issues.
    """
    processed_files = []
    upload_dir = "uploaded_files"
    os.makedirs(upload_dir, exist_ok=True)

    # Ensure files is a list
    if not isinstance(files, list):
        files = [files]

    for file_path in files:
        try:
            # Use os.path.isdir to check for directories
            if os.path.isdir(file_path):
                log_to_file(f"Skipping directory: {file_path}", "WARNING")
                continue

            # Use os.path.isfile to ensure it's a file
            if not os.path.isfile(file_path):
                log_to_file(f"Not a valid file: {file_path}", "WARNING")
                continue

            # Generate a unique filename to prevent overwrites
            file_name = os.path.basename(file_path)
            unique_filename = f"{uuid.uuid4()}_{file_name}"
            destination_path = os.path.join(upload_dir, unique_filename)

            # Copy file to upload directory
            shutil.copy2(file_path, destination_path)
            processed_files.append(destination_path)

        except Exception as e:
            log_to_file(f"Error processing file {file_path}: {str(e)}", "ERROR")

    return processed_files

def create_synthlinx_app() -> gr.Blocks:
    edit_message_script = """
    function editMessage(button) {
        const message = button.getAttribute('data-message');
        const textbox = document.querySelector('textarea');
        textbox.value = message;
        textbox.focus();
    }
    """
    custom_css = """
    #header {
        display: flex;
        align-items: center;
        padding: 10px;
        background: white;
        box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        position: fixed;
        top: 0;
        left: 45px;
        right: 0;
        z-index: 1002;
        height: 60px;
    }
    #logo {
        height: 40px;
        margin-right: 15px;
    }
    #header h1 {
        margin: 0;
        color: #333;
        font-size: 24px;
    }
    #file-management-panel {
        position: fixed;
        left: -300px;
        top: 100px;
        width: 300px;
        height: calc(100vh - 60px);
        background: white;
        transition: left 0.3s ease;
        box-shadow: 2px 0 5px rgba(0,0,0,0.1);
        z-index: 1000;
    }
    #file-management-panel.active {
        left: 0;
    }
    .menu-toggle {
        position: fixed;
        left: 0;
        top: 0;
        z-index: 1003;
        background: #f0f0f0;
        border: none;
        cursor: pointer;
        padding: 10px;
        margin: 0;
        width: 45px;
        height: 60px;
        display: flex;
        flex-direction: column;
        justify-content: center;
        align-items: center;
    }
    .menu-toggle span {
        display: block;
        width: 25px;
        height: 3px;
        background: #333;
        margin: 2px 0;
        transition: 0.3s;
    }
    .source-guide {
        border: 1px solid #e0e0e0;
        border-radius: 8px;
        padding: 16px;
        margin: 8px 0;
        background: #ffffff;
    }
    .source-content {
        max-height: 500px;
        overflow-y: auto;
        font-family: system-ui;
        line-height: 1.5;
    }
    .highlight {
        background-color: rgba(255, 255, 0, 0.3);
        padding: 2px 4px;
        border-radius: 3px;
    }
    .chunk-link {
        background-color: #f0f0f0;
        color: #333;
        padding: 2px 6px;
        border-radius: 3px;
        text-decoration: none;
        margin-right: 5px;
        cursor: pointer;
    }
    .chunk-link:hover {
        background-color: #e0e0e0;
    }
    #scrollable-content {
        scroll-behavior: smooth;
    }
    #main-content {
        margin-left: 0;
        margin-top: 20px;
        transition: margin-left 0.3s ease;
        padding: 20px;
        position: relative;
        z-index: 999;
    }
    .chatbot-container {
    margin-top: 10px;
    padding-top: 10px;
    }
    #main-content.shifted {
        margin-left: 300px;
    }
    #file-management-panel {
        position: fixed;
        left: -300px;
        top: 60px;
        width: 300px;
        height: calc(100vh - 60px);
        background: white;
        transition: left 0.3s ease;
        box-shadow: 2px 0 5px rgba(0,0,0,0.1);
        z-index: 1000;
    }
    #file-management-panel .file-content {
        height: 100%;
        overflow-y: auto;
        padding: 20px 15px;
    }
    #file-management-panel .upload-section,
    #file-management-panel .delete-section {
        margin-bottom: 20px;
    }
    #file-management-panel .gr-file {
        margin-bottom: 10px;
    }
    #file-management-panel .gr-form {
        max-height: calc(100vh - 250px);
        overflow-y: auto;
    }
    #file-management-panel > div {
        padding: 0 15px;
        width: 100%;
    }
    #file-management-panel .gr-form {
        display: flex;
        flex-direction: column;
        gap: 10px;
        margin-top: 10px;
    }

    #file-management-panel .gr-check-radio {
        margin-bottom: 8px;
    }

    .interaction-icons {
        display: flex;
        gap: 12px;
        padding: 8px 0;
        align-items: center;
        position: absolute;
        right: 8px;
        bottom: 0;
    }

    .icon-button {
        background: none;
        border: none;
        cursor: pointer;
        padding: 6px;
        border-radius: 4px;
        transition: background-color 0.2s;
    }

    .icon-button:hover {
        background-color: #f0f0f0;
    }

    .icon-button img {
        width: 20px;
        height: 20px;
        opacity: 0.7;
    }

    .icon-button:hover img {
        opacity: 1;
    }

    .message-container {
        position: relative;
        width: 100%;
    }

    .edit-message {
        font-size: 14px;
        padding: 4px 8px;
        background: none;
        border: none;
        cursor: pointer;
        opacity: 0.7;
    }

    .edit-message:hover {
        opacity: 1;
    }

    .highlight-section {
        margin: 10px 0;
        padding: 10px;
        border-left: 3px solid #007bff;
    }
    .highlight {
        background-color: rgba(255, 255, 0, 0.3);
        padding: 2px 4px;
        border-radius: 3px;
    }
    .chunk-link {
        background-color: #f0f0f0;
        color: #333;
        padding: 2px 6px;
        border-radius: 3px;
        text-decoration: none;
        margin-right: 5px;
        cursor: pointer;
    }
    .chunk-link:hover {
        background-color: #e0e0e0;
    }
    #scrollable-content {
        scroll-behavior: smooth;
        max-height: 300px;
        overflow-y: auto;
    }
    .page-content {
        scroll-margin-top: 60px;
        margin-bottom: 20px;
        padding: 10px;
        border-bottom: 1px solid #eee;
    }

    .highlight {
        background-color: rgba(255, 255, 0, 0.3);
        padding: 2px 4px;
        border-radius: 3px;
    }

    .chunk-link {
        display: inline-block;
        background-color: #f0f0f0;
        color: #333;
        padding: 2px 6px;
        border-radius: 3px;
        text-decoration: none;
        margin-right: 5px;
        margin-bottom: 5px;
        cursor: pointer;
    }

    .chunk-link:hover {
        background-color: #e0e0e0;
    }

    #scrollable-content {
        scroll-behavior: smooth;
        max-height: 500px;
        overflow-y: auto;
        padding: 10px;
    }
    .upload-section .file-upload {
        border: 2px dashed #ccc;
        border-radius: 4px;
        padding: 20px;
        text-align: center;
        margin-bottom: 15px;
    }

    .upload-section .file-upload.invalid {
        border-color: #ff4444;
    }

    .upload-section .file-error {
        color: #ff4444;
        font-size: 0.9em;
        margin-top: 5px;
    }
    .paragraph-content {
        margin: 10px 0;
        padding: 10px;
        border-radius: 4px;
        scroll-margin-top: 70px;
    }

    .paragraph-content.highlighted {
        background-color: rgba(255, 255, 0, 0.3);
        border-left: 3px solid #007bff;
    }

    .document-source-link {
        display: inline-block;
        background-color: #f0f0f0;
        color: #333;
        padding: 4px 8px;
        border-radius: 4px;
        text-decoration: none;
        margin: 2px 4px;
        font-size: 0.9em;
        cursor: pointer;
    }

    .document-source-link:hover {
        background-color: #e0e0e0;
        text-decoration: none;
    }
    """
    with gr.Blocks(css=custom_css) as app:
        gr.Markdown(
            """
            <div id="header">
                <img id="logo" src="https://media.licdn.com/dms/image/v2/D560BAQGF6NHHgadKYg/company-logo_200_200/company-logo_200_200/0/1707117426803?e=1738800000&v=beta&t=X8bdl_4FhyhYPECVAfvKiQoKT5Ofe-K7PtcPLper0nE" alt="Company Logo">
                <h1>Synthlinx.AI</h1>
            </div>
            """
        )
        with gr.Row():
            with gr.Column():
                toggle_button = gr.Button("☰", elem_classes=["menu-toggle"])

        with gr.Row():
            with gr.Column(elem_id="file-management-panel"):
                with gr.Column(elem_classes="file-content"):
                    gr.Markdown("### File Management")

                    with gr.Column(elem_classes="upload-section"):
                        file_list = gr.State([])
                        file_display = gr.File(
                            label="Uploaded Documents",
                            file_count="multiple",
                            type="filepath",
                            file_types=[
                                ".pdf", ".docx", ".txt", ".csv",
                                ".xlsx", ".xls", ".pptx",
                                ".png", ".jpg", ".jpeg", ".tiff",
                                ".bmp", ".webp"
                            ],
                            interactive=False
                        )
                        add_files_button = gr.UploadButton(
                            "Add More Files",
                            file_count="multiple",
                            file_types=[
                                ".pdf", ".docx", ".txt", ".csv",
                                ".xlsx", ".xls", ".pptx",
                                ".png", ".jpg", ".jpeg", ".tiff",
                                ".bmp", ".webp"
                            ],
                            type="filepath"
                        )

                    with gr.Column(elem_classes="delete-section"):
                        files_to_delete = gr.CheckboxGroup(
                            label="Select files to delete",
                            choices=[],
                            interactive=True
                        )
                        delete_files_button = gr.Button("Delete Selected Files")
                        status_message = gr.Markdown("")

            with gr.Column(elem_id="main-content"):
                model_dropdown = gr.Dropdown(
                    choices=MODEL_OPTIONS,
                    value=MODEL_OPTIONS[0],
                    label="Select Model",
                    interactive=True
                )

                chatbot = gr.Chatbot(
                    render_markdown=True,
                    bubble_full_width=False,
                    show_copy_button=True,
                    scale=2,
                    height=550,
                    elem_classes="chatbot-container"
                )

                with gr.Row():
                    with gr.Column(scale=2):
                        suggested_questions_radio = gr.Radio(
                            label="Suggested Questions",
                            choices=[],
                            interactive=True
                        )
                        text_box = gr.Textbox(
                            placeholder="Enter your question and Click on Submit",
                            container=False
                        )
                        submit_button = gr.Button("Submit")

                        source_chunks = gr.HTML(label="Source Chunks")
                        source_content = gr.HTML(label="Source Content")

                def handle_message_click(evt: gr.SelectData, chat_history):
                    clicked_message = chat_history[evt.index[0]][0]
                    return clicked_message

                def handle_chat(message, history):
                    response, audio = assistant.generate_response(message)
                    source_results = []
                    source_chunks_html = []
                    for file_name in assistant.current_files:
                        result = assistant.highlight_source_text(file_name, message)
                        if result['chunks']:
                            source_chunks_html.extend(result['chunks'])
                            source_results.append(f"[File: {file_name}]\n{result['highlighted_text']}")

                    full_response = response + "\n\n" + " ".join(source_chunks_html)

                    history.append([message, full_response])
                    history.append(["", gr.Audio(value=audio)])

                    return (
                        history,
                        "",
                        "",
                        "\n".join(source_results)
                    )

                chatbot.select(
                    handle_message_click,
                    inputs=[chatbot],
                    outputs=[text_box]
                )

        toggle_script = """
        function togglePanel() {
            const panel = document.getElementById('file-management-panel');
            const mainContent = document.getElementById('main-content');
            panel.classList.toggle('active');
            mainContent.classList.toggle('shifted');
            return [];
        }
        """

        toggle_button.click(
            fn=lambda: None,
            inputs=[],
            outputs=[],
            js=toggle_script
        )

        stored_doc = gr.State([])
        suggested_questions = gr.State([])
        serial_num = gr.State(1)

        add_files_button.upload(
            safe_file_upload,
            inputs=[add_files_button],
            outputs=[file_display]
        ).then(
            add_more_files,
            inputs=[file_display, file_list],
            outputs=[file_list, files_to_delete]
        ).then(
            summarize_and_suggest_questions,
            inputs=[chatbot, file_list, stored_doc, model_dropdown],
            outputs=[chatbot, stored_doc, suggested_questions]
        ).then(
            lambda: "",
            outputs=status_message
        ).then(
            lambda files: files if files else [],  # Ensure non-None input
            inputs=[file_display],
            outputs=[file_display]
        )

        delete_files_button.click(
            delete_selected_files,
            inputs=[files_to_delete, file_list],
            outputs=[file_list, files_to_delete, status_message]
        ).then(
            lambda files: [f["path"] for f in files] if files else [],
            inputs=[file_list],
            outputs=[file_display]
        ).then(
            lambda: [],
            outputs=[chatbot]
        ).then(
            lambda: [],
            outputs=[stored_doc]
        ).then(
            summarize_and_suggest_questions,
            inputs=[chatbot, file_list, stored_doc, model_dropdown],
            outputs=[chatbot, stored_doc, suggested_questions]
        )

        model_dropdown.change(
            summarize_and_suggest_questions,
            inputs=[chatbot, file_list, stored_doc, model_dropdown],
            outputs=[chatbot, stored_doc, suggested_questions]
        )

        suggested_questions.change(
            lambda q: gr.update(choices=q),
            inputs=[suggested_questions],
            outputs=[suggested_questions_radio]
        )

        suggested_questions_radio.change(
            lambda choice: choice if choice else "",
            inputs=[suggested_questions_radio],
            outputs=[text_box]
        )

        submit_button.click(
            llm_response,
            inputs=[chatbot, text_box, file_list, stored_doc, serial_num, model_dropdown],
            outputs=[chatbot, suggested_questions, serial_num, source_chunks, source_content]
        ).then(
            lambda: "",
            outputs=[text_box]
        )
    return app

if __name__ == "__main__":
    app = create_synthlinx_app()
    app.launch(debug=True)





Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://e181cb75b1307b3949.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://e181cb75b1307b3949.gradio.live
