In [4]:
import os
import re
import json

import PyPDF2
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import NMF, LatentDirichletAllocation
import spacy
from pathlib import Path
import uuid
from datetime import datetime
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# Initialize NLTK resources
try:
    nltk.data.find("tokenizers/punkt")
    nltk.data.find("corpora/stopwords")
    nltk.data.find("corpora/wordnet")
except LookupError:
    nltk.download("punkt", quiet=True)
    nltk.download("stopwords", quiet=True)
    nltk.download("wordnet", quiet=True)

# Load spaCy model
try:
    nlp = spacy.load("en_core_web_trf")
except:
    logging.info("Downloading spaCy model...")
    spacy.cli.download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm")


class DynamicTopicClassifier:
    def __init__(self, topics_file="topics_database.json"):
        """Initialize the dynamic topic classifier"""
        self.stop_words = set(stopwords.words("english"))
        self.lemmatizer = WordNetLemmatizer()

        # For TF-IDF approach
        self.vectorizer = TfidfVectorizer(
            max_features=5000, stop_words="english", ngram_range=(1, 3), min_df=2
        )

        # For topic modeling
        self.nmf_model = NMF(n_components=10, random_state=42)
        self.lda_model = LatentDirichletAllocation(n_components=10, random_state=42)

        # Topic database
        self.topics_file = topics_file
        self.topics_db = self.load_topics_database()

        logging.info(
            f"Initialized classifier with {len(self.topics_db['topics'])} existing topics"
        )

    def load_topics_database(self):
        """Load the topics database from file or create a new one"""
        if os.path.exists(self.topics_file):
            with open(self.topics_file, "r") as f:
                return json.load(f)
        else:
            # Initialize with empty structure
            topics_db = {"topics": {}, "hierarchies": {}, "documents": []}
            return topics_db

    def save_topics_database(self):
        """Save the topics database to file"""
        with open(self.topics_file, "w") as f:
            json.dump(self.topics_db, f, indent=2)

    def preprocess_text(self, text):
        """Clean and preprocess text"""
        if not text:
            return ""

        # Convert to lowercase
        text = text.lower()
        # Remove special characters
        text = re.sub(r"[^\w\s]", " ", text)
        # Tokenize
        tokens = nltk.word_tokenize(text)
        # Remove stopwords and lemmatize
        tokens = [
            self.lemmatizer.lemmatize(word)
            for word in tokens
            if word not in self.stop_words and len(word) > 2
        ]
        return " ".join(tokens)

    def extract_text_from_pdf(self, pdf_path):
        """Extract text content from a PDF file"""
        text = ""
        try:
            with open(pdf_path, "rb") as file:
                reader = PyPDF2.PdfReader(file)
                for page in reader.pages:
                    page_text = page.extract_text()
                    if page_text:
                        text += page_text + " "
        except Exception as e:
            logging.error(f"Error extracting text from PDF {pdf_path}: {str(e)}")
        return text

    def extract_keywords(self, text, n=10):
        """Extract keywords from text using spaCy"""
        try:
            # Limit text length to avoid memory issues
            doc = nlp(text[:20000])

            # Extract noun phrases as potential keywords
            keywords = []
            for chunk in doc.noun_chunks:
                if 1 <= len(chunk.text.split()) <= 3:  # Phrases with 1-3 words
                    clean_text = re.sub(r"[^\w\s]", "", chunk.text.lower())
                    if clean_text and len(clean_text) > 2:
                        keywords.append(clean_text)

            # Extract important entities
            for ent in doc.ents:
                if ent.label_ in [
                    "ORG",
                    "PRODUCT",
                    "WORK_OF_ART",
                    "EVENT",
                    "LAW",
                    "LANGUAGE",
                ]:
                    clean_text = re.sub(r"[^\w\s]", "", ent.text.lower())
                    if clean_text and len(clean_text) > 2:
                        keywords.append(clean_text)

            # Count frequencies and return top n
            keyword_freq = {}
            for kw in keywords:
                if kw not in self.stop_words and len(kw) > 2:
                    keyword_freq[kw] = keyword_freq.get(kw, 0) + 1

            sorted_keywords = sorted(
                keyword_freq.items(), key=lambda x: x[1], reverse=True
            )
            return [kw for kw, _ in sorted_keywords[:n]]

        except Exception as e:
            logging.error(f"Error extracting keywords: {str(e)}")
            # Simple fallback - extract words by frequency
            words = text.lower().split()
            word_freq = {}
            for word in words:
                word = re.sub(r"[^\w]", "", word)
                if word and word not in self.stop_words and len(word) > 3:
                    word_freq[word] = word_freq.get(word, 0) + 1

            sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
            return [word for word, _ in sorted_words[:n]]

    def generate_topics(self, text):
        """Generate topics using NMF and LDA"""
        try:
            # Vectorize the text
            vectorized_text = self.vectorizer.fit_transform([text])

            # NMF topic modeling
            nmf_topics = self.nmf_model.fit_transform(vectorized_text)
            nmf_components = self.nmf_model.components_

            # LDA topic modeling
            lda_topics = self.lda_model.fit_transform(vectorized_text)
            lda_components = self.lda_model.components_

            # Extract top words for each topic
            feature_names = self.vectorizer.get_feature_names_out()

            nmf_topic_words = []
            for topic_idx, topic in enumerate(nmf_components):
                top_features_ind = topic.argsort()[: -10 - 1 : -1]
                top_features = [feature_names[i] for i in top_features_ind]
                nmf_topic_words.append(top_features)

            lda_topic_words = []
            for topic_idx, topic in enumerate(lda_components):
                top_features_ind = topic.argsort()[: -10 - 1 : -1]
                top_features = [feature_names[i] for i in top_features_ind]
                lda_topic_words.append(top_features)

            return nmf_topic_words, lda_topic_words
        except Exception as e:
            logging.error(f"Error generating topics: {str(e)}")
            return [], []

    def find_similar_topics(self, keywords):
        """Find similar topics in the database based on keyword overlap"""
        similar_topics = []

        # Check keywords against existing topics
        for topic, data in self.topics_db["topics"].items():
            # Check for keyword overlap
            topic_keywords = data.get("keywords", [])
            overlap = set(keywords).intersection(set(topic_keywords))

            if overlap:
                similarity = len(overlap) / max(len(keywords), len(topic_keywords))
                similar_topics.append((topic, similarity))

        # Sort by similarity score
        return sorted(similar_topics, key=lambda x: x[1], reverse=True)

    def classify_document(self, file_path):
        """Classify a document and return potential topics"""
        if not os.path.exists(file_path):
            logging.error(f"File not found: {file_path}")
            return {"error": "File not found"}

        logging.info(f"Processing document: {file_path}")

        # Extract text from PDF
        text = self.extract_text_from_pdf(file_path)
        if not text:
            logging.error(f"Could not extract text from: {file_path}")
            return {"error": "Could not extract text from document"}

        # Preprocess text
        processed_text = self.preprocess_text(text)

        # Extract keywords
        keywords = self.extract_keywords(text)
        logging.info(f"Extracted {len(keywords)} keywords")

        # Generate potential topics
        nmf_topics, lda_topics = self.generate_topics(processed_text)

        # Find similar existing topics
        similar_topics = self.find_similar_topics(keywords)

        # Prepare suggested topics
        suggested_topics = []
        for topic, score in similar_topics:
            if score > 0.1:  # Minimum similarity threshold
                suggested_topics.append(
                    {
                        "name": topic,
                        "similarity": score,
                        "keywords": self.topics_db["topics"][topic]["keywords"][:5],
                    }
                )

        # Generate potential new topics from NMF and LDA
        potential_topics = []

        # From NMF
        for i, topic_words in enumerate(nmf_topics):
            if topic_words:  # Skip empty topics
                potential_topics.append(
                    {
                        "name": f"Topic {i + 1}",
                        "keywords": topic_words[:5],
                        "source": "NMF",
                    }
                )

        # From LDA
        for i, topic_words in enumerate(lda_topics):
            if topic_words:  # Skip empty topics
                potential_topics.append(
                    {
                        "name": f"Topic {i + 1}",
                        "keywords": topic_words[:5],
                        "source": "LDA",
                    }
                )

        return {
            "file": os.path.basename(file_path),
            "file_path": file_path,
            "keywords": keywords,
            "suggested_topics": suggested_topics,
            "potential_topics": potential_topics,
            "text_preview": text[:500] + "..." if len(text) > 500 else text,
        }

    def add_topic(self, topic_name, keywords=None, parent_topic=None):
        """Add a new topic to the database"""
        if topic_name in self.topics_db["topics"]:
            logging.warning(f"Topic already exists: {topic_name}")
            return False

        logging.info(f"Adding new topic: {topic_name}")
        self.topics_db["topics"][topic_name] = {
            "keywords": keywords or [],
            "documents": [],
            "created_at": datetime.now().isoformat(),
        }

        # Add hierarchy relationship if parent topic is provided
        if parent_topic:
            if parent_topic not in self.topics_db["hierarchies"]:
                self.topics_db["hierarchies"][parent_topic] = []

            if topic_name not in self.topics_db["hierarchies"][parent_topic]:
                self.topics_db["hierarchies"][parent_topic].append(topic_name)

        self.save_topics_database()
        return True

    def add_document_to_topic(self, doc_id, topic_name, file_path, keywords):
        """Add a document to a topic"""
        # Ensure topic exists
        if topic_name not in self.topics_db["topics"]:
            logging.warning(f"Topic does not exist: {topic_name}")
            return False

        # Create document entry if it doesn't exist
        doc_exists = False
        for doc in self.topics_db["documents"]:
            if doc["id"] == doc_id:
                doc_exists = True
                if topic_name not in doc["topics"]:
                    doc["topics"].append(topic_name)
                break

        if not doc_exists:
            self.topics_db["documents"].append(
                {
                    "id": doc_id,
                    "file_path": file_path,
                    "file_name": os.path.basename(file_path),
                    "topics": [topic_name],
                    "keywords": keywords,
                    "added_at": datetime.now().isoformat(),
                }
            )

        # Add document to topic
        if doc_id not in self.topics_db["topics"][topic_name]["documents"]:
            self.topics_db["topics"][topic_name]["documents"].append(doc_id)

            # Update topic keywords based on document keywords
            self.topics_db["topics"][topic_name]["keywords"] = list(
                set(self.topics_db["topics"][topic_name]["keywords"]) | set(keywords)
            )

        self.save_topics_database()
        logging.info(f"Added document {doc_id} to topic: {topic_name}")
        return True

    def get_document_topics(self, file_path):
        """Get all topics associated with a document"""
        for doc in self.topics_db["documents"]:
            if doc["file_path"] == file_path:
                return doc["topics"]
        return []

    def get_topic_documents(self, topic_name):
        """Get all documents associated with a topic"""
        if topic_name not in self.topics_db["topics"]:
            return []

        document_ids = self.topics_db["topics"][topic_name]["documents"]
        documents = []

        for doc_id in document_ids:
            for doc in self.topics_db["documents"]:
                if doc["id"] == doc_id:
                    documents.append(doc)
                    break

        return documents

    def get_topic_hierarchy(self):
        """Get the full topic hierarchy"""
        hierarchy = {}

        # Get all root topics (those without parents)
        root_topics = set(self.topics_db["topics"].keys())
        for parent, children in self.topics_db["hierarchies"].items():
            for child in children:
                if child in root_topics:
                    root_topics.remove(child)

        # Build hierarchy starting from root topics
        def build_hierarchy(topic):
            result = {"name": topic, "children": []}

            if topic in self.topics_db["hierarchies"]:
                for child in self.topics_db["hierarchies"][topic]:
                    result["children"].append(build_hierarchy(child))

            return result

        # Create hierarchy for each root topic
        for topic in root_topics:
            hierarchy[topic] = build_hierarchy(topic)

        return hierarchy

    def process_pdf(self, pdf_path, auto_classify=False, auto_create_topics=False):
        """
        Process a PDF document - extract text, classify, and optionally auto-assign topics

        Parameters:
        - pdf_path: Path to the PDF file
        - auto_classify: If True, automatically assign to similar existing topics
        - auto_create_topics: If True, automatically create new topics from NMF results

        Returns:
        - Dictionary with classification results and actions taken
        """
        # Generate a document ID
        doc_id = f"doc_{uuid.uuid4().hex[:10]}"

        # Classify the document
        result = self.classify_document(pdf_path)
        if "error" in result:
            return result

        actions_taken = []

        # Auto-assign to existing topics if specified
        if auto_classify and result["suggested_topics"]:
            for topic_suggestion in result["suggested_topics"]:
                topic_name = topic_suggestion["name"]
                if (
                    topic_suggestion["similarity"] > 0.2
                ):  # Minimum threshold for auto-assignment
                    self.add_document_to_topic(
                        doc_id, topic_name, pdf_path, result["keywords"]
                    )
                    actions_taken.append(
                        f"Auto-assigned to existing topic: {topic_name}"
                    )

        # Auto-create new topics if specified
        if auto_create_topics and result["potential_topics"]:
            # Use the first NMF topic as a new topic
            for potential_topic in result["potential_topics"]:
                if potential_topic["source"] == "NMF":
                    # Create a topic name from the keywords
                    topic_name = "_".join(potential_topic["keywords"][:2])
                    topic_name = re.sub(r"\W+", "_", topic_name)

                    # Only create if it doesn't exist
                    if topic_name not in self.topics_db["topics"]:
                        self.add_topic(topic_name, potential_topic["keywords"])
                        self.add_document_to_topic(
                            doc_id, topic_name, pdf_path, result["keywords"]
                        )
                        actions_taken.append(
                            f"Auto-created and assigned to new topic: {topic_name}"
                        )
                    break

        # Add actions taken to the result
        result["doc_id"] = doc_id
        result["actions_taken"] = actions_taken

        return result


def process_directory(
    directory_path, classifier=None, auto_classify=False, auto_create_topics=False
):
    """
    Process all PDF files in a directory

    Parameters:
    - directory_path: Path to directory containing PDFs
    - classifier: Optional existing classifier instance
    - auto_classify: Whether to automatically assign to similar topics
    - auto_create_topics: Whether to automatically create new topics

    Returns:
    - List of results for each processed file
    """
    if classifier is None:
        classifier = DynamicTopicClassifier()

    results = []

    # Get all PDF files in the directory
    pdf_files = list(Path(directory_path).glob("*.pdf"))
    logging.info(f"Found {len(pdf_files)} PDF files in {directory_path}")

    # Process each file
    for pdf_path in pdf_files:
        result = classifier.process_pdf(
            str(pdf_path),
            auto_classify=auto_classify,
            auto_create_topics=auto_create_topics,
        )
        results.append(result)

    return results


def print_classification_summary(results):
    """Print a summary of classification results"""
    print("\n===== Classification Summary =====")

    for result in results:
        if "error" in result:
            print(
                f"Error processing {result.get('file', 'unknown file')}: {result['error']}"
            )
            continue

        print(f"\nFile: {result['file']}")
        print(f"Top Keywords: {', '.join(result['keywords'][:5])}")

        if result["suggested_topics"]:
            print("Suggested Topics:")
            for topic in result["suggested_topics"][:3]:
                print(f"  - {topic['name']} (similarity: {topic['similarity']:.2f})")

        if result["actions_taken"]:
            print("Actions Taken:")
            for action in result["actions_taken"]:
                print(f"  - {action}")


def main():
    # Create the classifier
    classifier = DynamicTopicClassifier()

    # Example usage: Process a single PDF
    pdf_path = "sample_document.pdf"  # Change this to your actual PDF path
    if os.path.exists(pdf_path):
        print(f"Processing single document: {pdf_path}")
        result = classifier.process_pdf(pdf_path)

        if "error" not in result:
            print(f"\nDocument: {result['file']}")
            print(f"Keywords: {', '.join(result['keywords'])}")

            if result["suggested_topics"]:
                print("\nSuggested Topics:")
                for topic in result["suggested_topics"]:
                    print(f"- {topic['name']} (similarity: {topic['similarity']:.2f})")
                    print(f"  Keywords: {', '.join(topic['keywords'])}")

            print("\nPotential New Topics:")
            for topic in result["potential_topics"][:3]:
                print(f"- {topic['name']} ({topic['source']})")
                print(f"  Keywords: {', '.join(topic['keywords'])}")
        else:
            print(f"Error: {result['error']}")

    # Example usage: Process a directory of PDFs
    directory_path = "/Users/bharaths/Developer/score/"  # Change this to your actual directory path
    if os.path.exists(directory_path):
        print(f"\nProcessing directory: {directory_path}")
        results = process_directory(
            directory_path,
            classifier=classifier,
            auto_classify=True,  # Automatically assign to similar topics
            auto_create_topics=True,  # Automatically create new topics
        )

        # Print summary
        print_classification_summary(results)

        # Print topic hierarchy
        hierarchy = classifier.get_topic_hierarchy()
        print("\n===== Topic Hierarchy =====")

        def print_hierarchy(hierarchy_item, level=0):
            print(f"{'  ' * level}└─ {hierarchy_item['name']}")
            for child in hierarchy_item["children"]:
                print_hierarchy(child, level + 1)

        for topic, hierarchy_item in hierarchy.items():
            print_hierarchy(hierarchy_item)


if __name__ == "__main__":
    main()


2025-03-13 23:02:48,758 - INFO - Initialized classifier with 0 existing topics
2025-03-13 23:02:48,786 - INFO - Found 2 PDF files in /Users/bharaths/Developer/score/
2025-03-13 23:02:48,792 - INFO - Processing document: /Users/bharaths/Developer/score/Module II Part -B.docx.pdf



Processing directory: /Users/bharaths/Developer/score/


2025-03-13 23:02:52,984 - INFO - Extracted 10 keywords
2025-03-13 23:02:52,996 - ERROR - Error generating topics: max_df corresponds to < documents than min_df
2025-03-13 23:02:52,997 - INFO - Processing document: /Users/bharaths/Developer/score/temp_Module 1.pdf
2025-03-13 23:02:55,772 - INFO - Extracted 10 keywords
2025-03-13 23:02:55,776 - ERROR - Error generating topics: max_df corresponds to < documents than min_df



===== Classification Summary =====

File: Module II Part -B.docx.pdf
Top Keywords: parsing, stack, input, reduce, precedence

File: temp_Module 1.pdf
Top Keywords: actions, the agent, example, artificial intelligence, knowledge

===== Topic Hierarchy =====


In [16]:
import PyPDF2
from transformers import pipeline
from typing import List, Tuple

# 1. PDF Text Extraction
def extract_text_from_pdf(pdf_path: str) -> str:
    """Extract text content from a PDF file"""
    text = ""
    try:
            with open(pdf_path, "rb") as file:
                reader = PyPDF2.PdfReader(file)
                for page in reader.pages:
                    page_text = page.extract_text()
                    if page_text:
                        text += page_text + " "
    except Exception as e:
            logging.error(f"Error extracting text from PDF {pdf_path}: {str(e)}")
    return text
# 2. Text Preprocessing
def preprocess_text(text: str, max_length: int = 1024) -> str:
    """
    Truncate text to model's maximum length while preserving paragraphs
    """
    paragraphs = text.split('\n\n')
    processed_text = ""
    
    for para in paragraphs:
        if len(processed_text) + len(para) < max_length:
            processed_text += para + '\n\n'
        else:
            break
            
    return processed_text.strip()

# 3. Classification Setup
class TopicClassifier:
    def __init__(self):
        self.classifier = pipeline(
            "zero-shot-classification",
            model="facebook/bart-large-mnli"
        )
        self.taxonomy = {
            "LR Parsing": "Compiler Design",
            "Compiler Design": "Computer Science",
            "Shift-Reduce Parsing": "Compiler Design",
            "Syntax Analysis": "Compiler Design",
            "Computer Science": None,
            "Biology": None,
            "Genetics": "Biology"
        }
    
    def get_all_labels(self) -> List[str]:
        """Get all available labels from taxonomy"""
        return list(self.taxonomy.keys()) + list(set(self.taxonomy.values())) 

    def classify(self, text: str, threshold: float = 0.7) -> List[Tuple[str, float]]:
        """Classify text with hierarchical awareness"""
        candidate_labels = self.get_all_labels()
        result = self.classifier(text, candidate_labels, multi_label=True)
        return [(label, score) for label, score in zip(result['labels'], result['scores']) 
                if score >= threshold]

    def expand_hierarchy(self, labels: List[str]) -> List[str]:
        """Add parent tags for hierarchical taxonomy"""
        all_tags = set()
        for label in labels:
            current_tag = label
            while current_tag is not None:
                all_tags.add(current_tag)
                current_tag = self.taxonomy.get(current_tag)
        return sorted(all_tags, key=lambda x: (len(self.taxonomy.get(x, '') or 0, x)))

# 4. Main Workflow
def process_pdf(pdf_path: str, threshold: float = 0.7) -> List[str]:
    # Extract text
    raw_text = extract_text_from_pdf(pdf_path)
    
    # Preprocess
    processed_text = preprocess_text(raw_text)
    
    if not processed_text:
        raise ValueError("No text extracted from PDF")
    
    # Classify
    classifier = TopicClassifier()
    classifications = classifier.classify(processed_text, threshold)
    
    # Get base labels
    base_labels = [label for label, _ in classifications]
    
    # Expand hierarchy
    full_tags = classifier.expand_hierarchy(base_labels)
    
    return full_tags

if __name__ == "__main__":
    # Example usage
    pdf_path = "/Users/bharaths/Developer/score/temp_Module 1.pdf"  # Replace with your PDF path
    confidence_threshold = 0.7
    
    try:
        tags = process_pdf(pdf_path, confidence_threshold)
        print("Automatically Generated Tags:")
        print("\n".join(f"- {tag}" for tag in tags))
    except Exception as e:
        print(f"Error processing PDF: {str(e)}")

Error processing PDF: No text extracted from PDF
