#Automatic Meta-data generation

In [1]:
!apt-get install -y tesseract-ocr

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
tesseract-ocr is already the newest version (4.1.1-2.1build1).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [2]:
!pip install -q \
  python-dotenv \
  streamlit \
  langdetect \
  langchain-mistralai \
  pytesseract \
  pillow \
  python-docx \
  PyPDF2 \
  pandas

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/981.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m972.8/981.5 kB[0m [31m36.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m22.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m124.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m253.0/253.0 kB[0m [31m23.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m22.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m103.1 MB/s

In [3]:
!pip install config -q

In [4]:
!pip install pdfplumber docx langchain requests python-docx

Collecting pdfplumber
  Downloading pdfplumber-0.11.7-py3-none-any.whl.metadata (42 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/42.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting docx
  Downloading docx-0.2.4.tar.gz (54 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/54.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.9/54.9 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pdfminer.six==20250506 (from pdfplumber)
  Downloading pdfminer_six-20250506-py3-none-any.whl.metadata (4.2 kB)
Collecting pypdfium2>=4.18.0 (from pdfplumber)
  Downloading pypdfium2-4.30.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (48 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

#configuration

In [5]:
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    # API
    MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY")
    MISTRAL_MODEL = "mistral-large-latest"

    # File limits
    MAX_FILE_SIZE_MB = 300
    MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024

    # Supported formats
    SUPPORTED_EXTENSIONS = ['.pdf', '.docx', '.doc', '.txt', '.xlsx', '.xls', '.md', '.jpg', '.jpeg', '.png', '.tiff', '.bmp']

    # Text analysis
    DEFAULT_READING_SPEED_WPM = 200
    MIN_TEXT_FOR_SUMMARY = 100

    # App config
    PAGE_TITLE = "Automatic Meta-Data Generation"
    PAGE_ICON = "📄"

    @classmethod
    def validate(cls):
        if not cls.MISTRAL_API_KEY:
            raise ValueError("MISTRAL_API_KEY not found in .env file")
        return True

#Document Loader

In [6]:
import os
from pathlib import Path
from config import Config

def validate_file_size(file):
    """Validate if file size is within limits"""
    if hasattr(file, 'size'):
        return file.size <= Config.MAX_FILE_SIZE_BYTES
    return True

def get_file_type(filename):
    """Get file type from filename"""
    ext = Path(filename).suffix.lower()
    if ext in ['.pdf']:
        return 'pdf'
    elif ext in ['.docx', '.doc']:
        return 'docx'
    elif ext in ['.txt']:
        return 'txt'
    elif ext in ['.xlsx', '.xls']:
        return 'excel'
    elif ext in ['.md']:
        return 'markdown'
    elif ext in ['.jpg', '.jpeg', '.png', '.tiff', '.bmp']:
        return 'image_ocr'
    return 'unknown'

def is_supported_format(filename):
    """Check if file format is supported"""
    ext = Path(filename).suffix.lower()
    return ext in Config.SUPPORTED_EXTENSIONS

def validate_document(file):
    """Complete document validation"""
    if not file:
        return False, "No file provided"

    if not is_supported_format(file.name):
        return False, f"Unsupported format. Supported: {', '.join(Config.SUPPORTED_EXTENSIONS)}"

    if not validate_file_size(file):
        return False, f"File too large. Max size: {Config.MAX_FILE_SIZE_MB}MB"

    return True, "Valid document"

def get_file_info(file):
    """Get basic file information"""
    return {
        'name': file.name,
        'size': getattr(file, 'size', 0),
        'type': get_file_type(file.name)
    }

In [7]:
# Detects file type and extracts text from PDF, DOCX, or TXT (with pdfplumber for tables)
import os
import pdfplumber
from docx import Document

def extract_text_from_file(file_path):
    ext = os.path.splitext(file_path)[1].lower()

    if ext == '.pdf':
        text = ""
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                text += page.extract_text() + "\n"
    elif ext == '.docx':
        doc = Document(file_path)
        text = "\n".join([para.text for para in doc.paragraphs])
    elif ext == '.txt':
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
    else:
        raise ValueError("Unsupported file type")

    return text

In [8]:
text=extract_text_from_file('/content/Report_Finance.pdf')

In [9]:
# Cleans and normalizes extracted text using regex + NLP techniques
import re
import spacy
from nltk.corpus import stopwords
import nltk
nltk.download('stopwords')

nlp = spacy.load("en_core_web_sm")
stop_words = set(stopwords.words("english"))

def preprocess_text(text: str) -> str:
    # Regex cleaning
    text = re.sub(r'\n{2,}', '\n', text)
    text = re.sub(r'[ \t]+', ' ', text)
    text = re.sub(r'\n\s*\n', '\n', text)
    text = text.replace('\xa0', ' ')
    text = text.encode('ascii', 'ignore').decode()

    # Remove repeating headers/footers
    lines = text.splitlines()
    line_counts = {}
    for line in lines:
        line_counts[line] = line_counts.get(line, 0) + 1
    lines = [line for line in lines if line_counts[line] < 5]
    text = " ".join(lines)

    # NLP preprocessing: stopword removal + lemmatization
    doc = nlp(text)
    cleaned = [token.lemma_ for token in doc if not token.is_stop and not token.is_punct and token.is_alpha]
    return " ".join(cleaned)


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [10]:
final_text=preprocess_text(text)

#Language Detection

In [11]:
from langdetect import detect, detect_langs
from langdetect.lang_detect_exception import LangDetectException

def detect_language(text):
    """Detect the primary language of text"""
    if not text or len(text.strip()) < 10:
        return "Unknown"

    try:
        lang_code = detect(text[:1000])  # Use first 1000 chars for efficiency
        return get_language_name(lang_code)
    except LangDetectException:
        return "Unknown"

def detect_language_with_confidence(text):
    """Detect language with confidence score"""
    if not text or len(text.strip()) < 10:
        return "Unknown", 0.0

    try:
        lang_probs = detect_langs(text[:1000])
        if lang_probs:
            top_lang = lang_probs[0]
            return get_language_name(top_lang.lang), round(top_lang.prob, 2)
    except LangDetectException:
        pass

    return "Unknown", 0.0

def get_language_name(lang_code):
    """Convert language code to language name"""
    lang_map = {
        'en': 'English',
        'es': 'Spanish',
        'fr': 'French',
        'de': 'German',
        'it': 'Italian',
        'pt': 'Portuguese',
        'ru': 'Russian',
        'ja': 'Japanese',
        'ko': 'Korean',
        'zh-cn': 'Chinese (Simplified)',
        'zh-tw': 'Chinese (Traditional)',
        'ar': 'Arabic',
        'hi': 'Hindi',
        'bn': 'Bengali',
        'ur': 'Urdu',
        'ta': 'Tamil',
        'te': 'Telugu',
        'mr': 'Marathi',
        'gu': 'Gujarati',
        'kn': 'Kannada',
        'ml': 'Malayalam',
        'pa': 'Punjabi',
        'ne': 'Nepali',
        'si': 'Sinhala',
        'th': 'Thai',
        'vi': 'Vietnamese',
        'id': 'Indonesian',
        'ms': 'Malay',
        'tl': 'Filipino',
        'nl': 'Dutch',
        'sv': 'Swedish',
        'da': 'Danish',
        'no': 'Norwegian',
        'fi': 'Finnish',
        'pl': 'Polish',
        'cs': 'Czech',
        'sk': 'Slovak',
        'hu': 'Hungarian',
        'ro': 'Romanian',
        'bg': 'Bulgarian',
        'hr': 'Croatian',
        'sr': 'Serbian',
        'sl': 'Slovenian',
        'et': 'Estonian',
        'lv': 'Latvian',
        'lt': 'Lithuanian',
        'tr': 'Turkish',
        'el': 'Greek',
        'he': 'Hebrew',
        'fa': 'Persian',
        'sw': 'Swahili',
        'af': 'Afrikaans'
    }

    return lang_map.get(lang_code, f"Unknown ({lang_code})")

def analyze_language(text):
    """Complete language analysis"""
    language, confidence = detect_language_with_confidence(text)

    return {
        'detected_language': language,
        'confidence': f"{confidence * 100:.1f}%",
        'is_reliable': confidence > 0.7
    }

In [12]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

def split_text_into_chunks(text, chunk_size=1000, chunk_overlap=50):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    return splitter.split_text(text)

#File Handling

In [13]:
import os
import tempfile
import streamlit as st
from pathlib import Path
from config import Config

def handle_file_upload():
    """Handle file upload in Streamlit"""
    uploaded_file = st.file_uploader(
        "Upload your document",
        type=[ext.replace('.', '') for ext in Config.SUPPORTED_EXTENSIONS],
        help=f"Supported formats: {', '.join(Config.SUPPORTED_EXTENSIONS)} | Max size: {Config.MAX_FILE_SIZE_MB}MB"
    )

    return uploaded_file

def save_temporary_file(uploaded_file):
    """Save uploaded file to temporary location"""
    if not uploaded_file:
        return None

    try:
        # Create temp directory if it doesn't exist
        temp_dir = Path("temp_files")
        temp_dir.mkdir(exist_ok=True)

        # Create temporary file
        file_extension = Path(uploaded_file.name).suffix
        temp_file = tempfile.NamedTemporaryFile(
            delete=False,
            suffix=file_extension,
            dir=temp_dir
        )

        # Write file content
        temp_file.write(uploaded_file.read())
        temp_file.close()

        # Reset file pointer for further use
        uploaded_file.seek(0)

        return temp_file.name

    except Exception as e:
        st.error(f"Error saving file: {str(e)}")
        return None

def cleanup_temp_files():
    """Clean up temporary files"""
    try:
        temp_dir = Path("temp_files")
        if temp_dir.exists():
            for file_path in temp_dir.glob("*"):
                if file_path.is_file():
                    file_path.unlink()
    except Exception as e:
        print(f"Warning: Could not cleanup temp files: {e}")

def get_file_size_mb(file):
    """Get file size in MB"""
    if hasattr(file, 'size'):
        return round(file.size / (1024 * 1024), 2)
    return 0

def display_file_info(file):
    """Display file information in Streamlit"""
    if file:
        col1, col2, col3 = st.columns(3)

        with col1:
            st.metric("File Name", file.name)

        with col2:
            file_size = get_file_size_mb(file)
            st.metric("File Size", f"{file_size} MB")

        with col3:
            file_type = Path(file.name).suffix.upper()
            st.metric("File Type", file_type)

def validate_uploaded_file(file):
    """Validate uploaded file"""
    if not file:
        return False, "No file uploaded"

    # Check file size
    if get_file_size_mb(file) > Config.MAX_FILE_SIZE_MB:
        return False, f"File size exceeds {Config.MAX_FILE_SIZE_MB}MB limit"

    # Check file extension
    file_ext = Path(file.name).suffix.lower()
    if file_ext not in Config.SUPPORTED_EXTENSIONS:
        return False, f"Unsupported file format. Supported: {', '.join(Config.SUPPORTED_EXTENSIONS)}"

    return True, "File is valid"

#Meta-Data Generation

In [14]:
from datetime import datetime
import re

def get_extraction_timestamp():
    """Get current timestamp for extraction"""
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def calculate_document_length(text):
    """Calculate document length in characters"""
    return len(text) if text else 0

def count_words(text):
    """Count words in text"""
    if not text:
        return 0
    return len(text.split())

def count_paragraphs(text):
    """Count paragraphs in text"""
    if not text:
        return 0
    paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
    return len(paragraphs)

def calculate_reading_time(word_count, wpm=200):
    """Calculate approximate reading time in minutes"""
    if word_count <= 0:
        return "0 min"

    minutes = word_count / wpm

    if minutes < 1:
        return f"{int(minutes * 60)} sec"
    elif minutes < 60:
        return f"{int(minutes)} min"
    else:
        hours = int(minutes // 60)
        mins = int(minutes % 60)
        return f"{hours}h {mins}min"

def format_file_size(size_bytes):
    """Format file size in human readable format"""
    if size_bytes == 0:
        return "0 B"

    for unit in ['B', 'KB', 'MB', 'GB']:
        if size_bytes < 1024.0:
            return f"{size_bytes:.1f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:.1f} TB"

def generate_basic_metadata(file, text, file_type):
    """Generate comprehensive metadata for document"""
    file_size = getattr(file, 'size', 0)
    word_count = count_words(text) # Calculate word count

    metadata = {
        'file_name': file.name,
        'extracted_on': get_extraction_timestamp(),
        'file_type': file_type.upper(),
        'file_size': format_file_size(file_size),
        'document_length': f"{calculate_document_length(text):,} characters",
        'word_count': f"{word_count:,} words", # Use calculated word count
        'approx_reading_time': calculate_reading_time(word_count), # Pass word count
        'paragraphs': f"{count_paragraphs(text)} paragraphs"
    }

    return metadata

def clean_text_for_analysis(text):
    """Clean text for better analysis"""
    if not text:
        return ""

    # Remove extra whitespace and normalize
    text = re.sub(r'\s+', ' ', text)
    text = text.strip()

    return text

In [15]:
PROMPT_TEMPLATE = """
You are an intelligent assistant designed to understand documents and extract structured information from them.

Your task is to:
1. Extract the following metadata:
   - Title (if mentioned)
   - Author (if available)
   - Date of publication or document creation (if available)
   - Keywords or topics covered
   - Type of document (choose from: research paper, legal notice, resume, report, book chapter, article, business proposal, letter, others)
2. Generate a concise summary of the content (3-5 sentences).

Read the content below and return your answer in this JSON format:
{{
  "title": "",
  "author": "",
  "date": "",
  "keywords": [],
  "document_type": "",
  "summary": ""
}}

Content:
\"\"\"{content_chunk}\"\"\"
"""


In [16]:
# 5. llm_call.py
import os
import requests

os.environ["MISTRAL_API_URL"] = "https://api.mistral.ai/v1/chat/completions"
os.environ["MISTRAL_API_KEY"] = "vzvzkE3QKin6nqTVUTwpyOwRQT2xLdkf"


In [17]:
MISTRAL_API_URL = os.getenv("MISTRAL_API_URL")
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY")

def call_llm_on_chunk(chunk):
    headers = {
        "Authorization": f"Bearer {os.getenv('MISTRAL_API_KEY')}",
        "Content-Type": "application/json"
    }
    data = {
        "model": "open-mistral-7b",
        "messages": [
            {"role": "user", "content": PROMPT_TEMPLATE.format(content_chunk=chunk)}
        ],
        "temperature": 0.3
    }
    response = requests.post(os.getenv("MISTRAL_API_URL"), headers=headers, json=data)
    if response.status_code != 200:
        print(f"❌ Error {response.status_code}: {response.text}")
        return "ERROR"
    return response.json()['choices'][0]['message']['content']


#Summary generation

In [18]:
from langchain_mistralai import ChatMistralAI
from config import Config

def initialize_mistral():
    """Initialize Mistral AI client"""
    try:
        return ChatMistralAI(
            mistral_api_key=Config.MISTRAL_API_KEY,
            model=Config.MISTRAL_MODEL,
            temperature=0.3
        )
    except Exception as e:
        print(f"Error initializing Mistral: {e}")
        return None

def generate_summary(text, max_words=200):
    """Generate document summary using Mistral AI"""
    if not text or len(text.strip()) < Config.MIN_TEXT_FOR_SUMMARY:
        return "Text too short for summary generation"

    mistral = initialize_mistral()
    if not mistral:
        return "Error: Could not initialize Mistral AI"

    # Truncate text if too long (keep first 3000 chars for efficiency)
    text_sample = text[:3000] if len(text) > 3000 else text

    prompt = f"""
    Please provide a concise summary of the following document in approximately {max_words} words.
    Focus on the main points, key findings, and important information.

    Document text:
    {text_sample}

    Summary:
    """

    try:
        response = mistral.invoke(prompt)
        return response.content.strip()
    except Exception as e:
        return f"Error generating summary: {str(e)}"

def extract_key_points(text, num_points=5):
    """Extract key points from document"""
    if not text or len(text.strip()) < Config.MIN_TEXT_FOR_SUMMARY:
        return []

    mistral = initialize_mistral()
    if not mistral:
        return ["Error: Could not initialize Mistral AI"]

    text_sample = text[:3000] if len(text) > 3000 else text

    prompt = f"""
    Extract the {num_points} most important key points from this document.
    Present them as a numbered list, each point should be concise and informative.

    Document text:
    {text_sample}

    Key Points:
    """

    try:
        response = mistral.invoke(prompt)
        points = response.content.strip().split('\n')
        return [point.strip() for point in points if point.strip()][:num_points]
    except Exception as e:
        return [f"Error extracting key points: {str(e)}"]

def generate_document_insights(text):
    """Generate comprehensive document insights"""
    if not text or len(text.strip()) < Config.MIN_TEXT_FOR_SUMMARY:
        return {
            'summary': "Text too short for analysis",
            'key_points': [],
            'document_type': "Unknown"
        }

    summary = generate_summary(text)
    key_points = extract_key_points(text)
    doc_type = classify_document_type(text)

    return {
        'summary': summary,
        'key_points': key_points,
        'document_type': doc_type
    }

def classify_document_type(text):
    """Classify document type based on content"""
    if not text:
        return "Unknown"

    mistral = initialize_mistral()
    if not mistral:
        return "Unknown"

    text_sample = text[:1000]

    prompt = f"""
    Classify this document type in one or two words (e.g., Report, Research Paper, Manual, Letter, Article, etc.):

    {text_sample}

    Document Type:
    """

    try:
        response = mistral.invoke(prompt)
        return response.content.strip()
    except Exception as e:
        return "Unknown"

In [19]:
def summarize_document_chunks(chunks):
    results = []
    for chunk in chunks:
        result = call_llm_on_chunk(chunk)
        results.append(result)
    return results

#Text Analyzer

In [20]:
import re
import string
from collections import Counter

def count_words(text):
    """Count total words in text"""
    if not text:
        return 0
    return len(text.split())

def count_sentences(text):
    """Count sentences in text"""
    if not text:
        return 0
    sentences = re.split(r'[.!?]+', text)
    return len([s for s in sentences if s.strip()])

def count_paragraphs(text):
    """Count paragraphs in text"""
    if not text:
        return 0
    paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
    return len(paragraphs)

def count_lines(text):
    """Count lines in text"""
    if not text:
        return 0
    return len([line for line in text.split('\n') if line.strip()])

def analyze_readability(text):
    """Basic readability analysis"""
    if not text:
        return "N/A"

    words = count_words(text)
    sentences = count_sentences(text)

    if sentences == 0:
        return "N/A"

    avg_words_per_sentence = words / sentences

    if avg_words_per_sentence < 15:
        return "Easy"
    elif avg_words_per_sentence < 20:
        return "Medium"
    else:
        return "Hard"

def get_most_common_words(text, top_n=10):
    """Get most common words (excluding common stop words)"""
    if not text:
        return []

    # Simple stop words
    stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
                  'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have',
                  'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should',
                  'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they'}

    # Clean and split text
    words = re.findall(r'\b[a-zA-Z]+\b', text.lower())
    filtered_words = [word for word in words if word not in stop_words and len(word) > 2]

    return Counter(filtered_words).most_common(top_n)

def calculate_text_complexity(text):
    """Calculate text complexity metrics"""
    if not text:
        return {}

    words = count_words(text)
    sentences = count_sentences(text)
    characters = len(text.replace(' ', ''))

    return {
        'avg_word_length': round(characters / words, 1) if words > 0 else 0,
        'avg_sentence_length': round(words / sentences, 1) if sentences > 0 else 0,
        'readability': analyze_readability(text)
    }

def analyze_text_structure(text):
    """Complete text structure analysis"""
    if not text:
        return {}

    analysis = {
        'word_count': count_words(text),
        'sentence_count': count_sentences(text),
        'paragraph_count': count_paragraphs(text),
        'line_count': count_lines(text),
        'character_count': len(text),
        'character_count_no_spaces': len(text.replace(' ', '')),
        'complexity': calculate_text_complexity(text),
        'top_words': get_most_common_words(text, 5)
    }

    return analysis

#Text-Extractor

In [21]:
import PyPDF2
import pytesseract
from PIL import Image
from docx import Document
import pandas as pd
from io import BytesIO

def extract_from_pdf(file):
    """Extract text from PDF file"""
    try:
        reader = PyPDF2.PdfReader(file)
        text = ""
        for page in reader.pages:
            text += page.extract_text() + "\n"
        return text.strip()
    except Exception as e:
        return f"Error extracting PDF: {str(e)}"

def extract_from_docx(file):
    """Extract text from DOCX file"""
    try:
        doc = Document(file)
        text = ""
        for paragraph in doc.paragraphs:
            text += paragraph.text + "\n"
        return text.strip()
    except Exception as e:
        return f"Error extracting DOCX: {str(e)}"

def extract_from_txt(file):
    """Extract text from TXT file"""
    try:
        content = file.read()
        if isinstance(content, bytes):
            content = content.decode('utf-8')
        return content.strip()
    except Exception as e:
        return f"Error extracting TXT: {str(e)}"

def extract_from_excel(file):
    """Extract text from Excel file"""
    try:
        df = pd.read_excel(file, sheet_name=None)
        text = ""
        for sheet_name, sheet_df in df.items():
            text += f"Sheet: {sheet_name}\n"
            text += sheet_df.to_string(index=False) + "\n\n"
        return text.strip()
    except Exception as e:
        return f"Error extracting Excel: {str(e)}"

def extract_from_image_ocr(file):
    """Extract text from image using OCR"""
    try:
        image = Image.open(file)
        text = pytesseract.image_to_string(image)
        return text.strip()
    except Exception as e:
        return f"Error extracting OCR: {str(e)}"

def extract_from_markdown(file):
    """Extract text from Markdown file"""
    try:
        content = file.read()
        if isinstance(content, bytes):
            content = content.decode('utf-8')
        return content.strip()
    except Exception as e:
        return f"Error extracting Markdown: {str(e)}"

def extract_text(file, file_type):
    """Main text extraction function"""
    extractors = {
        'pdf': extract_from_pdf,
        'docx': extract_from_docx,
        'txt': extract_from_txt,
        'excel': extract_from_excel,
        'image_ocr': extract_from_image_ocr,
        'markdown': extract_from_markdown
    }

    extractor = extractors.get(file_type)
    if not extractor:
        return f"Unsupported file type: {file_type}"

    # Reset file pointer
    if hasattr(file, 'seek'):
        file.seek(0)

    return extractor(file)

In [22]:
text = extract_text_from_file('/content/Report_Finance.pdf')
clean_text = preprocess_text(text)
chunks = split_text_into_chunks(clean_text)

In [23]:
!pip install keybert sentence-transformers -q

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.4/41.4 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m121.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m92.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m56.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m19.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [24]:
print(f"✅ Total Chunks: {len(chunks)}")

results = []
for i, chunk in enumerate(chunks):
    print(f"\n--- Generating summary for Chunk {i+1}/{len(chunks)} ---")
    summary = call_llm_on_chunk(chunk)
    print(summary)
    results.append(summary)


✅ Total Chunks: 12

--- Generating summary for Chunk 1/12 ---
{
  "title": "Credit Card Behaviour Score prediction Classification Risk base Techniques",
  "author": "Mahanti Ajay Babu",
  "date": "",
  "keywords": ["Credit Card", "Behaviour Score", "Classification", "Risk", "Techniques", "Summer Project", "EDA", "Data Preprocess", "Drop Repeated Categories", "education", "marriage", "Age", "bill statement", "previous"],
  "document_type": "report",
  "summary": "This report, titled 'Credit Card Behaviour Score prediction Classification Risk base Techniques', is a summer project submission by Mahanti Ajay Babu. The project involves the creation of a risk model for credit cards, data preprocessing, and drop of repeated categories. The report also includes an education feature, marriage feature, data visualization, and target variable for sex and age. The project was guided by the Finance Club of the Department of Mechanical Engineering at the Indian Institute of Technology Roorkee."
}

-

#Utilisation

In [25]:
import re
import json
from datetime import datetime
from pathlib import Path

def format_file_size(size_bytes):
    """Format file size in human readable format"""
    if size_bytes == 0:
        return "0 B"

    for unit in ['B', 'KB', 'MB', 'GB']:
        if size_bytes < 1024.0:
            return f"{size_bytes:.1f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:.1f} TB"

def format_timestamp(timestamp=None):
    """Format timestamp for display"""
    if timestamp is None:
        timestamp = datetime.now()

    if isinstance(timestamp, str):
        return timestamp

    return timestamp.strftime("%Y-%m-%d %H:%M:%S")

def clean_text(text):
    """Clean and normalize text"""
    if not text:
        return ""

    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text)

    # Remove special characters that might cause issues
    text = re.sub(r'[^\w\s\.,!?;:\-\(\)\"\']+', '', text)

    return text.strip()

def truncate_text(text, max_length=100):
    """Truncate text with ellipsis"""
    if not text or len(text) <= max_length:
        return text

    return text[:max_length].rsplit(' ', 1)[0] + "..."

def format_number(number):
    """Format numbers with commas"""
    if isinstance(number, (int, float)):
        return f"{number:,}"
    return str(number)

def calculate_reading_time(word_count, wpm=200):
    """Calculate reading time from word count"""
    if word_count <= 0:
        return "0 min"

    minutes = word_count / wpm

    if minutes < 1:
        return f"{int(minutes * 60)} sec"
    elif minutes < 60:
        return f"{int(minutes)} min"
    else:
        hours = int(minutes // 60)
        mins = int(minutes % 60)
        return f"{hours}h {mins}min"

def safe_divide(numerator, denominator):
    """Safe division with zero check"""
    if denominator == 0:
        return 0
    return numerator / denominator

def export_metadata_json(metadata):
    """Export metadata as JSON string"""
    try:
        return json.dumps(metadata, indent=2, ensure_ascii=False)
    except Exception as e:
        return f"Error exporting JSON: {str(e)}"

def get_file_extension(filename):
    """Get file extension from filename"""
    return Path(filename).suffix.lower()

def is_text_meaningful(text, min_length=10):
    """Check if text has meaningful content"""
    if not text:
        return False

    # Remove whitespace and check length
    clean = text.strip()
    if len(clean) < min_length:
        return False

    # Check if text has alphabetic characters
    if not re.search(r'[a-zA-Z]', clean):
        return False

    return True

#Running with example

In [26]:
# After summarizing all chunks:
combined_summaries = "\n\n".join(results)

combine_prompt = f"""
You are a smart assistant. Below are multiple partial summaries of a document, generated from different parts.

Your task is to combine them into a **single metadata + summary JSON**, like this:
{{
  "title": "",
  "author": "",
  "date": "",
  "keywords": [],
  "document_type": "",
  "summary": ""
}}

Summaries:
\"\"\"{combined_summaries}\"\"\"
"""


In [27]:
import json
import textwrap
import re
from keybert import KeyBERT
from sentence_transformers import SentenceTransformer

def call_llm_merge_summary(prompt):
    headers = {
        "Authorization": f"Bearer {os.getenv('MISTRAL_API_KEY')}",
        "Content-Type": "application/json"
    }
    data = {
        "model": "open-mistral-7b",
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0.3
    }
    response = requests.post(os.getenv("MISTRAL_API_URL"), headers=headers, json=data)
    return response.json()['choices'][0]['message']['content']

final_output = call_llm_merge_summary(combine_prompt)

In [31]:
# ============================
# 🎯 Final Output Handling
# ============================
try:
    # Use regex to extract the JSON part from the string
    json_match = re.search(r'```json\n(.*?)\n```', final_output, re.DOTALL)
    if json_match:
        json_string = json_match.group(1)
        parsed = json.loads(json_string)

        # ✅ Improve keywords using KeyBERT
        # Check if clean_text is available before using KeyBERT
        if 'text' in locals(): # Use the original 'text' variable
            kw_model = KeyBERT(model=SentenceTransformer('all-MiniLM-L6-v2'))
            # Pass the original 'text' as a list containing one element
            kb_keywords = kw_model.extract_keywords(
                [text],
                keyphrase_ngram_range=(1, 2),
                stop_words='english',
                top_n=10,
                use_maxsum=True,
                nr_candidates=20
            )
            final_keywords = [kw for kw, score in kb_keywords]
            parsed["keywords"] = final_keywords
        else:
            print("Warning: 'text' not available for keyword extraction using KeyBERT.")


        # ✅ Pretty output
        print("\n✅ Final Metadata:")
        print(json.dumps(parsed, indent=2))

        print("\n✅ Final Summary:")
        if "summary" in parsed and parsed["summary"]:
             print(textwrap.fill(parsed["summary"], width=100))
        else:
            print("Summary not available in the parsed output.")


    else:
        print("⚠️ Could not find the JSON object within the final output string.")
        print("Showing raw output:")
        print(final_output)

except json.JSONDecodeError:
    print("⚠️ Could not parse JSON. Showing raw output:")
    print(final_output)
except KeyError as e:
     print(f"⚠️ KeyError: {e} - Check if expected keys are present in the JSON output.")
     print("Showing parsed dictionary (if available):")
     if 'parsed' in locals():
         print(json.dumps(parsed, indent=2))
     else:
         print("Parsed dictionary not available.")


✅ Final Metadata:
{
  "title": "Credit Card Behaviour Score prediction Classification Risk base Techniques, Age Variable Variable bill statement correlation analysis using SMOTE Algorithm and ML model, Exploratory Data Analysis on Marriage Status and Age in a Credit Card Holder Dataset, Analyzing the Influence of Age on Customer Financial Behavior and Risk Profile, Exploratory Data Analysis for Credit Risk Prediction, SMOTE Algorithm Application in Credit Risk Portfolio, Tune Performance XGBoost Classifier for Credit Card Default Prediction, Machine Learning model predicts credit card defaulters, Machine Learning model predicts credit card default",
  "author": "Mahanti Ajay Babu (Credit Card Behaviour Score prediction Classification Risk base Techniques), Not specified (others)",
  "date": "",
  "keywords": [
    "classifier score",
    "behaviour score",
    "built credit",
    "classification risk",
    "credit cards",
    "score prediction",
    "card risk",
    "models credit",
 

In [34]:
!jupyter nbconvert --ClearOutputPreprocessor.enabled=True --inplace Code_Jupyterfile_Rakesh.ipynb -q # for clear output

usage: jupyter-nbconvert [-h] [--debug] [--show-config] [--show-config-json]
                         [--generate-config] [-y] [--execute] [--allow-errors]
                         [--stdin] [--stdout] [--inplace] [--clear-output]
                         [--coalesce-streams] [--no-prompt] [--no-input]
                         [--allow-chromium-download]
                         [--disable-chromium-sandbox] [--show-input]
                         [--embed-images] [--sanitize-html]
                         [--log-level NbConvertApp.log_level]
                         [--config NbConvertApp.config_file]
                         [--to NbConvertApp.export_format]
                         [--template TemplateExporter.template_name]
                         [--template-file TemplateExporter.template_file]
                         [--theme HTMLExporter.theme]
                         [--sanitize_html HTMLExporter.sanitize_html]
                         [--writer NbConvertApp.writer_class]
   