In [14]:
# Run this first if you haven't installed pymongo
!pip install pymongo






In [1]:
from pymongo import MongoClient
import pprint
import json
from datetime import datetime


In [12]:
# Connect to your specific MongoDB setup
try:
    # Your MongoDB connection details
    client = MongoClient('mongodb://localhost:27017/')
    
    # Your specific database and collection
    db = client['cornerstone_curated']
    collection = db['bulk_generate_content_metadata']
    
    # Test connection
    server_info = client.server_info()
    print("✅ Successfully connected to MongoDB!")
    print(f"📊 MongoDB version: {server_info['version']}")
    print(f"🗂️  Database: {db.name}")
    print(f"📝 Collection: {collection.name}")
    
except Exception as e:
    print(f"❌ Connection failed: {e}")


✅ Successfully connected to MongoDB!
📊 MongoDB version: 8.0.11
🗂️  Database: cornerstone_curated
📝 Collection: bulk_generate_content_metadata


In [13]:
# Get basic collection information
try:
    # Count total documents
    total_docs = collection.count_documents({})
    print(f"📊 Total documents in collection: {total_docs}")
    
    # Get collection stats
    stats = db.command("collstats", collection.name)
    print(f"📦 Collection size: {stats.get('size', 'N/A')} bytes")
    print(f"📄 Average document size: {stats.get('avgObjSize', 'N/A')} bytes")
    
except Exception as e:
    print(f"❌ Error getting collection info: {e}")


📊 Total documents in collection: 6
📦 Collection size: 673830 bytes
📄 Average document size: 112305 bytes


In [14]:
# Fetch and examine the first few documents to understand structure
try:
    print("🔍 Examining document structure...")
    print("="*60)
    
    # Get first document to see structure
    sample_doc = collection.find_one()
    
    if sample_doc:
        print("📋 Sample Document Structure:")
        print("-" * 30)
        
        # Print all field names and types
        for field, value in sample_doc.items():
            field_type = type(value).__name__
            
            # Show preview for string fields
            if isinstance(value, str):
                preview = value[:100] + "..." if len(value) > 100 else value
                print(f"🔑 {field} ({field_type}): {repr(preview)}")
            else:
                print(f"🔑 {field} ({field_type}): {value}")
        
        print("\n" + "="*60)
        print("📝 Full Sample Document:")
        pprint.pprint(sample_doc, width=80, depth=3)
    else:
        print("❌ No documents found in collection")
        
except Exception as e:
    print(f"❌ Error exploring documents: {e}")


🔍 Examining document structure...
📋 Sample Document Structure:
------------------------------
🔑 _id (ObjectId): 68d643d1aaacd5b61bdf086e
🔑 contentId (str): 'f85e86a2-b0e2-4adc-a155-878d66ecfb94'
🔑 status (str): 'COMPLETED'
🔑 data (dict): {'contentId': 'f85e86a2-b0e2-4adc-a155-878d66ecfb94', 'title': 'Self Defence Course', 'description': 'A comprehensive course designed to teach self-defence techniques and strategies for personal safety.', 'requirement': '', 'metadata': {'proficiency': 'beginner', 'type': 'Interactive', 'vertical': 'Martial Arts / Personal Safety', 'duration': '120', 'isCompliance': False, 'language': 'English', 'skills': None}, 'outline': {'sections': [{'sectionNumber': 1, 'sectionName': 'Introduction to Self-Defence', 'sectionId': '68d643ce338b76a48af0339c', 'chapters': [{'chapterId': '68d643ce338b76a48af0339d', 'chapterNumber': 1, 'chapterName': 'Understanding Personal Safety', 'learningContent': [{'type': 'h1', 'data': {'content': 'Understanding Personal Safety'}}, 

In [7]:
async def fetch_document_by_id(document_id: str, content_field: str = "content"):
    """Fetch a single document from MongoDB by ID."""
    try:
        db = get_db()
        collection = db[MONGODB_COLLECTION]
        
        # Handle both ObjectId and string IDs
        if ObjectId.is_valid(document_id):
            query = {"_id": ObjectId(document_id)}
        else:
            query = {"_id": document_id}
        
        document = await collection.find_one(query)
        if not document:
            raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found")
        
        # Handle nested field paths (like "data.content")
        content = document
        for field_part in content_field.split('.'):
            if isinstance(content, dict) and field_part in content:
                content = content[field_part]
            else:
                raise HTTPException(status_code=400, detail=f"Field '{content_field}' not found in document")
        
        # Make sure we got a string
        if not isinstance(content, str):
            raise HTTPException(status_code=400, detail=f"Field '{content_field}' is not text content")
        
        return document, content
        
    except Exception as e:
        logging.error(f"Error fetching document {document_id}: {e}")
        raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")


In [9]:
# Cell: Test Nested Field Access
try:
    print("🧪 Testing nested field access...")
    
    # Get a sample document
    sample_doc = collection.find_one()
    
    if sample_doc:
        print("📋 Document structure:")
        pprint.pprint(sample_doc, depth=2)
        
        # Test accessing nested content
        if 'data' in sample_doc and 'content' in sample_doc['data']:
            nested_content = sample_doc['data']['content']
            print(f"\n✅ Successfully accessed nested content:")
            print(f"📝 Content preview: {nested_content[:200]}...")
            print(f"📊 Word count: {len(nested_content.split())}")
            
            print(f"\n🚀 Use this in your API call:")
            print(f'   "content_field": "data.content"')
        else:
            print("❌ Expected nested structure not found")
            
except Exception as e:
    print(f"❌ Error testing nested access: {e}")


🧪 Testing nested field access...
📋 Document structure:
{'_id': ObjectId('68d643d1aaacd5b61bdf086e'),
 'contentId': 'f85e86a2-b0e2-4adc-a155-878d66ecfb94',
 'contentType': 'ICF',
 'createdOn': '2025-09-26T13:12:09.248233',
 'data': {'contentId': 'f85e86a2-b0e2-4adc-a155-878d66ecfb94',
          'contentType': 'ICF',
          'contextMode': 'WEB',
          'courseId': '68d643bc338b76a48af03398',
          'description': 'A comprehensive course designed to teach '
                         'self-defence techniques and strategies for personal '
                         'safety.',
          'metadata': {...},
          'outline': {...},
          'requirement': '',
          'requirementDocs': [],
          'title': 'Self Defence Course',
          'transcript': 'Understanding Personal Safety\n'
                        'Personal safety is a fundamental aspect of '
                        'self-defense that encompasses awareness, prevention, '
                        'and preparation agains

In [10]:
# -----------------------------------------------
# Cell 1: Install the MongoDB driver
# -----------------------------------------------

# -----------------------------------------------
# Cell 2: Run the Python code to fetch data
# -----------------------------------------------
from pymongo import MongoClient
import pprint  # For nicely formatted output

# --- IMPORTANT ---
# This code assumes you are running the notebook on the SAME computer
# as your MongoDB server (which is on 'localhost').
#
# If you are running this in GOOGLE COLAB, 'localhost' will not work.
# You must use Option 1 (ngrok) from our earlier conversation.
# -----------------

try:
    # 1. Connect to your MongoDB server
    # (From your screenshot: localhost:27017)
    client = MongoClient('mongodb://localhost:27017/')
    
    # 2. Access your specific database
    # (From your screenshot: cornerstone_curated)
    db = client['cornerstone_curated']
    
    # 3. Access your specific collection
    # (From your screenshot: bulk_generate_content_metadata)
    collection = db['bulk_generate_content_metadata']

    print(f"✅ Successfully connected to collection: {collection.name}")
    print("-" * 40)

    # 4. Find all documents in the collection
    all_documents = collection.find()

    # 5. Loop through and print each document
    doc_count = 0
    for document in all_documents:
        pprint.pprint(document)
        print("-" * 20)  # Add a separator
        doc_count += 1
    
    if doc_count == 0:
        print("No documents found in this collection.")
    else:
        print(f"\nFound and printed {doc_count} document(s).")

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    # 6. Close the connection
    if 'client' in locals():
        client.close()
        print("\nMongoDB connection closed.")

✅ Successfully connected to collection: bulk_generate_content_metadata
----------------------------------------
{'_id': ObjectId('68d643d1aaacd5b61bdf086e'),
 'contentId': 'f85e86a2-b0e2-4adc-a155-878d66ecfb94',
 'contentType': 'ICF',
 'createdOn': '2025-09-26T13:12:09.248233',
 'data': {'contentId': 'f85e86a2-b0e2-4adc-a155-878d66ecfb94',
          'contentType': 'ICF',
          'contextMode': 'WEB',
          'courseId': '68d643bc338b76a48af03398',
          'description': 'A comprehensive course designed to teach '
                         'self-defence techniques and strategies for personal '
                         'safety.',
          'metadata': {'duration': '120',
                       'isCompliance': False,
                       'language': 'English',
                       'proficiency': 'beginner',
                       'skills': None,
                       'type': 'Interactive',
                       'vertical': 'Martial Arts / Personal Safety'},
          'outline':

KeyboardInterrupt: 

In [27]:
import json
import re
import os
import logging
from typing import Dict, Any, Union, Optional
import asyncio

# Third-Party Imports
import textstat
from better_profanity import profanity

# Google Generative AI Imports
import google.generativeai as genai

# Setup logging for Jupyter
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# ==============================================================================
# TOOL FUNCTIONS
# ==============================================================================

def check_length_and_structure(text: str) -> dict:
    """Analyzes content length and structure (word count, paragraphs)."""
    logging.info(f"Running LengthAndStructureCheck on text (approx {len(text)} chars)")
    words = text.split()
    word_count = len(words)
    sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()]
    paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
    issues = []
    score = 10
    
    if word_count < 15:
        issues.append(f"Content is very short with only {word_count} words.")
        score -= 3
    if word_count > 150 and len(paragraphs) == 1:
        issues.append("Long content is not broken into paragraphs, making it hard to read.")
        score -= 2
    if len(sentences) < 2 and word_count > 20:
        issues.append("Content consists of a single long sentence; consider breaking it up.")
        score -= 2
    
    return {
        "word_count": word_count, 
        "sentences": len(sentences), 
        "paragraphs": len(paragraphs), 
        "issues": issues,
        "score": max(1, score),
        "score_explanation": f"Structure score: {max(1, score)}/10. Based on {word_count} words, {len(sentences)} sentences, {len(paragraphs)} paragraphs."
    }

def check_readability_with_textstat(text: str) -> dict:
    """Uses the 'textstat' library to calculate objective readability scores."""
    logging.info("Running ReadabilityCheck")
    if not text.strip():
        return {"score": 0, "level": "unassessable", "readability_score": 1, "score_explanation": "Cannot assess readability of empty text."}
    
    flesch_score = textstat.flesch_reading_ease(text)
    level = "Very Easy"
    readability_score = 10
    
    if flesch_score < 30: 
        level = "Very Confusing (College Graduate)"
        readability_score = 3
    elif flesch_score < 60: 
        level = "Difficult"
        readability_score = 6
    elif flesch_score < 80: 
        level = "Fairly Easy"
        readability_score = 8
    
    return {
        "flesch_reading_ease_score": flesch_score, 
        "level": level,
        "readability_score": readability_score,
        "score_explanation": f"Readability score: {readability_score}/10. Flesch score of {flesch_score:.1f} indicates {level.lower()} reading level."
    }

def check_professionalism_with_library(text: str) -> dict:
    """Checks for unprofessional content using the 'better-profanity' library."""
    logging.info("Running ProfessionalismCheck")
    issues = []
    score = 10

    if profanity.contains_profanity(text):
        issues.append("Inappropriate or profane language was found.")
        score -= 5
    if re.search(r'\b[A-Z]{4,}\b', text) and sum(1 for c in text if c.isupper()) / max(1, len(text)) > 0.3:
        issues.append("Excessive capitalization is used, which appears unprofessional.")
        score -= 3
    if re.search(r'[!?@#$%^&*()]{4,}', text):
        issues.append("Excessive punctuation or symbols are used.")
        score -= 2

    return {
        "issues": issues,
        "score": max(1, score),
        "score_explanation": f"Professionalism score: {max(1, score)}/10. {len(issues)} professional issues detected."
    }

def check_redundancy(text: str) -> dict:
    """Analyzes the text for repetitive sentences and overused words."""
    logging.info("Running RedundancyCheck")
    sentences = [s.lower().strip() for s in re.split(r'[.!?]+', text) if s.strip()]
    issues = []
    score = 10
    
    if len(sentences) > 2:
        unique_sentences = len(set(sentences))
        total_sentences = len(sentences)
        redundancy_ratio = unique_sentences / total_sentences
        if redundancy_ratio < 0.5:
            redundancy_percentage = 100 - (redundancy_ratio * 100)
            issues.append(f"High sentence redundancy detected. {redundancy_percentage:.0f}% of sentences are repetitive.")
            score = max(1, int(redundancy_ratio * 10))
            
        return {
            "redundancy_issues": issues,
            "score": score,
            "score_explanation": f"Redundancy score: {score}/10. Unique sentence ratio: {redundancy_ratio:.2f}"
        }
    
    return {
        "redundancy_issues": [],
        "score": 10,
        "score_explanation": "Redundancy score: 10/10. Not enough sentences to check for repetition."
    }

# ==============================================================================
# AI MODEL SETUP
# ==============================================================================

def setup_google_ai(api_key: str):
    """Setup Google AI with API key"""
    try:
        genai.configure(api_key=api_key)
        model = genai.GenerativeModel('gemini-1.5-flash-latest')
        logging.info("✅ Google AI Model Initialized")
        return model
    except Exception as e:
        logging.error(f"❌ Failed to initialize Google AI: {e}")
        return None

# ==============================================================================
# MAIN VALIDATION FUNCTION
# ==============================================================================

async def validate_content(plain_text: str, google_api_key: str) -> dict:
    """
    Main function to validate content quality using Google AI
    
    Args:
        plain_text (str): The text to analyze
        google_api_key (str): Your Google AI API key
    
    Returns:
        dict: Complete quality analysis report
    """
    
    # Setup the model
    model = setup_google_ai(google_api_key)
    if model is None:
        return {"error": "Failed to initialize Google AI model. Check your API key."}
    
    if not plain_text or not plain_text.strip():
        return {"error": "Input text cannot be empty or just whitespace."}

    logging.info("Starting content validation...")
    
    try:
        # Run all analysis tools
        structure_result = check_length_and_structure(plain_text)
        readability_result = check_readability_with_textstat(plain_text)
        professionalism_result = check_professionalism_with_library(plain_text)
        redundancy_result = check_redundancy(plain_text)
        
        # Create prompt for AI analysis
        system_prompt = """You are an expert Quality Assurance assistant. Analyze the given text and tool results to provide a comprehensive quality report.

Your task:
1. Review the original text for grammar and spelling errors
2. Synthesize the tool analysis results 
3. Generate a final JSON report

Return ONLY a valid JSON object with this structure:
{
    "overall_score": <integer 1-10, average of all categories>,
    "category_scores": {
        "grammar_and_spelling": <score 1-10 based on your analysis>,
        "readability": <score from tool>,
        "professionalism_and_tone": <score from tool>,
        "structure": <score from tool>,
        "redundancy": <score from tool>
    },
    "score_explanations": {
        "grammar_and_spelling": "<your detailed analysis with specific errors or 'No errors found'>",
        "readability": "<explanation from tool>",
        "professionalism_and_tone": "<explanation from tool>", 
        "structure": "<explanation from tool>",
        "redundancy": "<explanation from tool>"
    },
    "summary": "<natural language summary of findings and suggestions>"
}"""

        analysis_prompt = f"""
TEXT TO ANALYZE:
{plain_text}

TOOL ANALYSIS RESULTS:
Structure: {json.dumps(structure_result, indent=2)}
Readability: {json.dumps(readability_result, indent=2)}
Professionalism: {json.dumps(professionalism_result, indent=2)}
Redundancy: {json.dumps(redundancy_result, indent=2)}

{system_prompt}
"""
        
        # Generate AI analysis
        response = await asyncio.to_thread(model.generate_content, analysis_prompt)
        result_text = response.text
        
        # Extract JSON from response
        json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
        
        if json_match:
            report_json = json.loads(json_match.group(0))
            logging.info(f"✅ Analysis complete. Overall score: {report_json.get('overall_score')}")
            return report_json
        else:
            logging.error("No JSON found in AI response")
            return {
                "error": "AI failed to generate proper JSON response",
                "raw_response": result_text
            }
            
    except Exception as e:
        logging.error(f"❌ Error during analysis: {e}")
        return {"error": f"Analysis failed: {str(e)}"}

# ==============================================================================
# CONVENIENCE FUNCTION FOR JUPYTER
# ==============================================================================

def analyze_text(text: str, api_key: str = None):
    """
    Convenience function to analyze text in Jupyter notebook
    
    Usage:
        result = analyze_text("Your text here", "your-google-api-key")
        print(json.dumps(result, indent=2))
    """
    if api_key is None:
        # Try to get from environment
        api_key = os.getenv("GOOGLE_API_KEY")
        if not api_key:
            return {
                "error": "Please provide your Google API key either as parameter or set GOOGLE_API_KEY environment variable"
            }
    
    # Run the async function
    import asyncio
    try:
        # Check if we're in a Jupyter environment with existing event loop
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # Use nest_asyncio for Jupyter compatibility
            try:
                import nest_asyncio
                nest_asyncio.apply()
            except ImportError:
                return {"error": "Please install nest-asyncio: pip install nest-asyncio"}
        
        return asyncio.run(validate_content(text, api_key))
    except Exception as e:
        return {"error": f"Failed to run analysis: {str(e)}"}

# ==============================================================================
# EXAMPLE USAGE
# ==============================================================================

print("Content Quality Analyzer loaded successfully!")
print("\nUsage:")
print("result = analyze_text('Your text to analyze', 'your-google-api-key')")
print("print(json.dumps(result, indent=2))")


Content Quality Analyzer loaded successfully!

Usage:
result = analyze_text('Your text to analyze', 'your-google-api-key')
print(json.dumps(result, indent=2))


  from .autonotebook import tqdm as notebook_tqdm


In [22]:
# Look for fields that might contain text content to analyze
try:
    print("🔍 Looking for potential content fields...")
    print("-" * 50)
    
    # Get a few documents to analyze field patterns
    sample_docs = list(collection.find().limit(3))
    
    # Look for fields that contain substantial text
    content_fields = []
    
    for doc in sample_docs:
        for field, value in doc.items():
            if isinstance(value, str) and len(value) > 50:  # Fields with substantial text
                content_fields.append(field)
    
    # Get unique content field names
    unique_content_fields = list(set(content_fields))
    
    print("📝 Potential content fields found:")
    for field in unique_content_fields:
        # Show sample content from this field
        sample = collection.find_one({field: {"$exists": True}})
        if sample and field in sample:
            content_preview = sample[field][:200] + "..." if len(sample[field]) > 200 else sample[field]
            print(f"\n🔑 Field: '{field}'")
            print(f"   Preview: {repr(content_preview)}")
    
except Exception as e:
    print(f"❌ Error finding content fields: {e}")


🔍 Looking for potential content fields...
--------------------------------------------------
📝 Potential content fields found:


In [23]:
# Test fetching documents by different ID formats
try:
    print("🔍 Testing document retrieval...")
    print("-" * 40)
    
    # Get a sample document to test with
    sample = collection.find_one()
    
    if sample:
        doc_id = sample['_id']
        print(f"📋 Sample Document ID: {doc_id}")
        print(f"📋 ID Type: {type(doc_id)}")
        
        # Test fetching by ID
        retrieved = collection.find_one({"_id": doc_id})
        
        if retrieved:
            print("✅ Successfully retrieved document by ID")
            
            # Show which fields could be used for content analysis
            print("\n📝 Available fields for content analysis:")
            for field, value in retrieved.items():
                if isinstance(value, str) and len(value.strip()) > 20:
                    word_count = len(value.split())
                    char_count = len(value)
                    print(f"   • {field}: {word_count} words, {char_count} characters")
        else:
            print("❌ Failed to retrieve document")
    
except Exception as e:
    print(f"❌ Error testing document retrieval: {e}")


🔍 Testing document retrieval...
----------------------------------------
📋 Sample Document ID: 68d643d1aaacd5b61bdf086e
📋 ID Type: <class 'bson.objectid.ObjectId'>
✅ Successfully retrieved document by ID

📝 Available fields for content analysis:
   • contentId: 1 words, 36 characters
   • createdOn: 1 words, 26 characters
   • updatedOn: 1 words, 26 characters


In [24]:
# Show a few documents formatted for content analysis preview
try:
    print("📊 CONTENT ANALYSIS PREVIEW")
    print("="*60)
    
    # Get first 3 documents
    documents = list(collection.find().limit(3))
    
    for i, doc in enumerate(documents, 1):
        print(f"\n📄 Document {i}:")
        print(f"   ID: {doc.get('_id')}")
        
        # Look for common content field names
        content_fields_to_check = ['content', 'text', 'body', 'description', 'article', 'post']
        
        found_content = False
        
        # Check each potential content field
        for field in content_fields_to_check:
            if field in doc and isinstance(doc[field], str) and len(doc[field].strip()) > 0:
                content = doc[field]
                word_count = len(content.split())
                print(f"   Content Field: '{field}'")
                print(f"   Word Count: {word_count}")
                print(f"   Preview: {content[:150]}{'...' if len(content) > 150 else ''}")
                found_content = True
                break
        
        # If no standard content fields found, show all string fields
        if not found_content:
            print("   No standard content fields found. Available string fields:")
            for field, value in doc.items():
                if isinstance(value, str) and len(value.strip()) > 20:
                    word_count = len(value.split())
                    print(f"   • {field}: {word_count} words")
                    print(f"     Preview: {value[:100]}{'...' if len(value) > 100 else ''}")
        
        print("-" * 40)

except Exception as e:
    print(f"❌ Error previewing documents: {e}")


📊 CONTENT ANALYSIS PREVIEW

📄 Document 1:
   ID: 68d643d1aaacd5b61bdf086e
   No standard content fields found. Available string fields:
   • contentId: 1 words
     Preview: f85e86a2-b0e2-4adc-a155-878d66ecfb94
   • createdOn: 1 words
     Preview: 2025-09-26T13:12:09.248233
   • updatedOn: 1 words
     Preview: 2025-09-26T13:15:25.670073
----------------------------------------

📄 Document 2:
   ID: 68d6505fd9b20230f3fdbc06
   No standard content fields found. Available string fields:
   • contentId: 1 words
     Preview: 7ca1d954-0cfb-41b1-bd19-16b70f3f5ca7
   • createdOn: 1 words
     Preview: 2025-09-26T14:05:43.902220
   • updatedOn: 1 words
     Preview: 2025-09-26T14:35:57.645843
----------------------------------------

📄 Document 3:
   ID: 68d65d17e9a29ff96c41717e
   No standard content fields found. Available string fields:
   • contentId: 1 words
     Preview: 7c2d50e3-bd0e-44bf-b491-867d63be0b2d
   • createdOn: 1 words
     Preview: 2025-09-26T14:59:59.806485
   • updatedOn

In [25]:
# Generate summary information for API integration
try:
    print("📋 MONGODB CONFIGURATION SUMMARY")
    print("="*50)
    
    # Connection details
    print("🔗 Connection Details:")
    print(f"   MONGODB_URL=mongodb://localhost:27017")
    print(f"   MONGODB_DB_NAME=cornerstone_curated")
    print(f"   MONGODB_COLLECTION=bulk_generate_content_metadata")
    
    # Collection stats
    total_docs = collection.count_documents({})
    print(f"\n📊 Collection Stats:")
    print(f"   Total Documents: {total_docs}")
    
    # Sample document for testing
    sample = collection.find_one()
    if sample:
        print(f"\n🧪 Sample Document ID for Testing:")
        print(f"   Document ID: {sample['_id']}")
        
        # Find the best content field
        content_field = None
        max_words = 0
        
        for field, value in sample.items():
            if isinstance(value, str) and len(value.strip()) > 20:
                word_count = len(value.split())
                if word_count > max_words:
                    max_words = word_count
                    content_field = field
        
        if content_field:
            print(f"   Best Content Field: '{content_field}' ({max_words} words)")
            
            # Generate test API call
            print(f"\n🚀 Test API Call:")
            test_call = f'''curl -X POST "http://localhost:3000/validate-from-mongodb" \\
     -H "Content-Type: application/json" \\
     -d '{{
       "document_id": "{sample['_id']}",
       "content_field": "{content_field}",
       "save_results": true
     }}\''''
            print(test_call)
    
    print("\n✅ Ready for API integration!")
    
except Exception as e:
    print(f"❌ Error generating summary: {e}")


📋 MONGODB CONFIGURATION SUMMARY
🔗 Connection Details:
   MONGODB_URL=mongodb://localhost:27017
   MONGODB_DB_NAME=cornerstone_curated
   MONGODB_COLLECTION=bulk_generate_content_metadata

📊 Collection Stats:
   Total Documents: 6

🧪 Sample Document ID for Testing:
   Document ID: 68d643d1aaacd5b61bdf086e
   Best Content Field: 'contentId' (1 words)

🚀 Test API Call:
curl -X POST "http://localhost:3000/validate-from-mongodb" \
     -H "Content-Type: application/json" \
     -d '{
       "document_id": "68d643d1aaacd5b61bdf086e",
       "content_field": "contentId",
       "save_results": true
     }'

✅ Ready for API integration!


In [10]:
# Clean up - close MongoDB connection
try:
    client.close()
    print("✅ MongoDB connection closed successfully")
except:
    print("❌ Error closing connection")


✅ MongoDB connection closed successfully


In [31]:
import json
import re
import os
import logging
from typing import Dict, Any, Union, Optional
import asyncio

# Third-Party Imports
import textstat
from better_profanity import profanity
from dotenv import load_dotenv

# OpenAI Import
import openai

# Load environment variables from .env file
load_dotenv()

# Setup logging for Jupyter
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# ==============================================================================
# TOOL FUNCTIONS
# ==============================================================================

def check_length_and_structure(text: str) -> dict:
    """Analyzes content length and structure (word count, paragraphs)."""
    logging.info(f"Running LengthAndStructureCheck on text (approx {len(text)} chars)")
    words = text.split()
    word_count = len(words)
    sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()]
    paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
    issues = []
    score = 10
    
    if word_count < 15:
        issues.append(f"Content is very short with only {word_count} words.")
        score -= 3
    if word_count > 150 and len(paragraphs) == 1:
        issues.append("Long content is not broken into paragraphs, making it hard to read.")
        score -= 2
    if len(sentences) < 2 and word_count > 20:
        issues.append("Content consists of a single long sentence; consider breaking it up.")
        score -= 2
    
    return {
        "word_count": word_count, 
        "sentences": len(sentences), 
        "paragraphs": len(paragraphs), 
        "issues": issues,
        "score": max(1, score),
        "score_explanation": f"Structure score: {max(1, score)}/10. Based on {word_count} words, {len(sentences)} sentences, {len(paragraphs)} paragraphs."
    }

def check_readability_with_textstat(text: str) -> dict:
    """Uses the 'textstat' library to calculate objective readability scores."""
    logging.info("Running ReadabilityCheck")
    if not text.strip():
        return {"score": 0, "level": "unassessable", "readability_score": 1, "score_explanation": "Cannot assess readability of empty text."}
    
    flesch_score = textstat.flesch_reading_ease(text)
    level = "Very Easy"
    readability_score = 10
    
    if flesch_score < 30: 
        level = "Very Confusing (College Graduate)"
        readability_score = 3
    elif flesch_score < 60: 
        level = "Difficult"
        readability_score = 6
    elif flesch_score < 80: 
        level = "Fairly Easy"
        readability_score = 8
    
    return {
        "flesch_reading_ease_score": flesch_score, 
        "level": level,
        "readability_score": readability_score,
        "score_explanation": f"Readability score: {readability_score}/10. Flesch score of {flesch_score:.1f} indicates {level.lower()} reading level."
    }

def check_professionalism_with_library(text: str) -> dict:
    """Checks for unprofessional content using the 'better-profanity' library."""
    logging.info("Running ProfessionalismCheck")
    issues = []
    score = 10

    if profanity.contains_profanity(text):
        issues.append("Inappropriate or profane language was found.")
        score -= 5
    if re.search(r'\b[A-Z]{4,}\b', text) and sum(1 for c in text if c.isupper()) / max(1, len(text)) > 0.3:
        issues.append("Excessive capitalization is used, which appears unprofessional.")
        score -= 3
    if re.search(r'[!?@#$%^&*()]{4,}', text):
        issues.append("Excessive punctuation or symbols are used.")
        score -= 2

    return {
        "issues": issues,
        "score": max(1, score),
        "score_explanation": f"Professionalism score: {max(1, score)}/10. {len(issues)} professional issues detected."
    }

def check_redundancy(text: str) -> dict:
    """Analyzes the text for repetitive sentences and overused words."""
    logging.info("Running RedundancyCheck")
    sentences = [s.lower().strip() for s in re.split(r'[.!?]+', text) if s.strip()]
    issues = []
    score = 10
    
    if len(sentences) > 2:
        unique_sentences = len(set(sentences))
        total_sentences = len(sentences)
        redundancy_ratio = unique_sentences / total_sentences
        if redundancy_ratio < 0.5:
            redundancy_percentage = 100 - (redundancy_ratio * 100)
            issues.append(f"High sentence redundancy detected. {redundancy_percentage:.0f}% of sentences are repetitive.")
            score = max(1, int(redundancy_ratio * 10))
            
        return {
            "redundancy_issues": issues,
            "score": score,
            "score_explanation": f"Redundancy score: {score}/10. Unique sentence ratio: {redundancy_ratio:.2f}"
        }
    
    return {
        "redundancy_issues": [],
        "score": 10,
        "score_explanation": "Redundancy score: 10/10. Not enough sentences to check for repetition."
    }

# ==============================================================================
# OPENAI MODEL SETUP
# ==============================================================================

def setup_openai_client():
    """Setup OpenAI client with API key from .env file"""
    try:
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            raise ValueError("OPENAI_API_KEY not found in .env file")
        
        client = openai.OpenAI(api_key=api_key)
        logging.info("✅ OpenAI Client Initialized")
        return client
    except Exception as e:
        logging.error(f"❌ Failed to initialize OpenAI client: {e}")
        return None

# ==============================================================================
# MAIN VALIDATION FUNCTION
# ==============================================================================

async def validate_content(plain_text: str) -> dict:
    """
    Main function to validate content quality using OpenAI
    
    Args:
        plain_text (str): The text to analyze
    
    Returns:
        dict: Complete quality analysis report
    """
    
    # Setup the OpenAI client
    client = setup_openai_client()
    if client is None:
        return {"error": "Failed to initialize OpenAI client. Check your API key in .env file."}
    
    if not plain_text or not plain_text.strip():
        return {"error": "Input text cannot be empty or just whitespace."}

    logging.info("Starting content validation...")
    
    try:
        # Run all analysis tools
        structure_result = check_length_and_structure(plain_text)
        readability_result = check_readability_with_textstat(plain_text)
        professionalism_result = check_professionalism_with_library(plain_text)
        redundancy_result = check_redundancy(plain_text)
        
        # Create system message for OpenAI
        system_message = """You are an expert Quality Assurance assistant. Analyze the given text and tool results to provide a comprehensive quality report.

Your task:
1. Review the original text for grammar and spelling errors
2. Synthesize the tool analysis results 
3. Generate a final JSON report

Return ONLY a valid JSON object with this exact structure:
{
    "overall_score": <integer 1-10, average of all categories>,
    "category_scores": {
        "grammar_and_spelling": <score 1-10 based on your analysis>,
        "readability": <score from tool>,
        "professionalism_and_tone": <score from tool>,
        "structure": <score from tool>,
        "redundancy": <score from tool>
    },
    "score_explanations": {
        "grammar_and_spelling": "<your detailed analysis with specific errors or 'No errors found'>",
        "readability": "<explanation from tool>",
        "professionalism_and_tone": "<explanation from tool>", 
        "structure": "<explanation from tool>",
        "redundancy": "<explanation from tool>"
    },
    "summary": "<natural language summary of findings and suggestions>"
}"""

        user_message = f"""
TEXT TO ANALYZE:
{plain_text}

TOOL ANALYSIS RESULTS:
Structure: {json.dumps(structure_result, indent=2)}
Readability: {json.dumps(readability_result, indent=2)}
Professionalism: {json.dumps(professionalism_result, indent=2)}
Redundancy: {json.dumps(redundancy_result, indent=2)}

Please analyze this text and provide the quality report in JSON format.
"""
        
        # Generate OpenAI analysis
        def get_openai_response():
            response = client.chat.completions.create(
                model="gpt-3.5-turbo",  # You can change to "gpt-4" if you have access
                messages=[
                    {"role": "system", "content": system_message},
                    {"role": "user", "content": user_message}
                ],
                temperature=0.3,
                max_tokens=2000
            )
            return response.choices[0].message.content
        
        result_text = await asyncio.to_thread(get_openai_response)
        
        # Extract JSON from response
        json_match = re.search(r'\{.*\}', result_text, re.DOTALL)
        
        if json_match:
            report_json = json.loads(json_match.group(0))
            logging.info(f"✅ Analysis complete. Overall score: {report_json.get('overall_score')}")
            return report_json
        else:
            logging.error("No JSON found in OpenAI response")
            return {
                "error": "OpenAI failed to generate proper JSON response",
                "raw_response": result_text
            }
            
    except Exception as e:
        logging.error(f"❌ Error during analysis: {e}")
        return {"error": f"Analysis failed: {str(e)}"}

# ==============================================================================
# CONVENIENCE FUNCTION FOR JUPYTER
# ==============================================================================

def analyze_text(text: str):
    """
    Convenience function to analyze text in Jupyter notebook
    Uses OpenAI API key from .env file
    
    Usage:
        result = analyze_text("Your text here")
        print(json.dumps(result, indent=2))
    """
    
    # Run the async function
    import asyncio
    try:
        # Check if we're in a Jupyter environment with existing event loop
        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                # Use nest_asyncio for Jupyter compatibility
                try:
                    import nest_asyncio
                    nest_asyncio.apply()
                except ImportError:
                    return {"error": "Please install nest-asyncio: pip install nest-asyncio"}
        except RuntimeError:
            pass  # No event loop running
        
        return asyncio.run(validate_content(text))
    except Exception as e:
        return {"error": f"Failed to run analysis: {str(e)}"}

# ==============================================================================
# EXAMPLE USAGE
# ==============================================================================

print("Content Quality Analyzer with OpenAI loaded successfully!")
print("\nMake sure your .env file contains:")
print("OPENAI_API_KEY=your_openai_api_key_here")
print("\nUsage:")
print("result = analyze_text('Your text to analyze')")
print("print(json.dumps(result, indent=2))")


Content Quality Analyzer with OpenAI loaded successfully!

Make sure your .env file contains:
OPENAI_API_KEY=your_openai_api_key_here

Usage:
result = analyze_text('Your text to analyze')
print(json.dumps(result, indent=2))


In [30]:
# Step 3: Test with a sample text
sample_text = """
This is a sample text for testing our content quality analyzer. The text should be long enough to provide meaningful analysis results. We want to check grammar, readability, professionalism and overall structure. This tool will help us identify areas for improvement in our writing.
"""

# Analyze the text
result = analyze_text(sample_text)

# Display the results in a nice format
print("=" * 60)
print("CONTENT QUALITY ANALYSIS REPORT")
print("=" * 60)
print(json.dumps(result, indent=2))


2025-11-09 16:54:54,642 - INFO - ✅ Google AI Model Initialized
2025-11-09 16:54:54,644 - INFO - Starting content validation...
2025-11-09 16:54:54,645 - INFO - Running LengthAndStructureCheck on text (approx 284 chars)
2025-11-09 16:54:54,647 - INFO - Running ReadabilityCheck
2025-11-09 16:54:54,648 - INFO - Running ProfessionalismCheck
2025-11-09 16:54:54,718 - INFO - Running RedundancyCheck
2025-11-09 16:54:54,976 - ERROR - ❌ Error during analysis: 400 API key not valid. Please pass a valid API key. [reason: "API_KEY_INVALID"
domain: "googleapis.com"
metadata {
  key: "service"
  value: "generativelanguage.googleapis.com"
}
, locale: "en-US"
message: "API key not valid. Please pass a valid API key."
]


CONTENT QUALITY ANALYSIS REPORT
{
  "error": "Analysis failed: 400 API key not valid. Please pass a valid API key. [reason: \"API_KEY_INVALID\"\ndomain: \"googleapis.com\"\nmetadata {\n  key: \"service\"\n  value: \"generativelanguage.googleapis.com\"\n}\n, locale: \"en-US\"\nmessage: \"API key not valid. Please pass a valid API key.\"\n]"
}


In [32]:
# this is testing the langchain integration with the langgraph code to check the efficiency of the code
!pip install langchain langchain-openai langchain-community tavily-search textstat better_profanity python-dotenv openai



ERROR: Could not find a version that satisfies the requirement tavily-search (from versions: none)
ERROR: No matching distribution found for tavily-search


In [34]:
import json
import re
import os
import logging
from typing import Dict, Any, Union, Optional

# --- Python Built-in ---
from datetime import datetime
import asyncio

# --- Third-Party Imports ---
# (We removed uvicorn, FastAPI, HTTPException, BaseModel)
import textstat
from dotenv import load_dotenv
from better_profanity import profanity

# --- LangChain Imports ---
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain.tools import Tool
from langchain_community.tools.tavily_search import TavilySearchResults

# =V============================================================================
# 1. INITIAL SETUP (Env, Logging)
# ==============================================================================

# Load environment variables from .env file
load_dotenv()

# Setup professional logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# ==============================================================================
# 2. TOOL LOGIC FUNCTIONS
# (These are all fast, local tools)
# ==============================================================================

def check_length_and_structure(text: str) -> str:
    """Analyzes content length and structure (word count, paragraphs)."""
    logging.info(f"Running LengthAndStructureCheck on text (approx {len(text)} chars)")
    words = text.split()
    word_count = len(words)
    sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()]
    paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
    issues = []
    score = 10
    
    if word_count < 15:
        issues.append(f"Content is very short with only {word_count} words.")
        score -= 3
    if word_count > 150 and len(paragraphs) == 1:
        issues.append("Long content is not broken into paragraphs, making it hard to read.")
        score -= 2
    if len(sentences) < 2 and word_count > 20:
        issues.append("Content consists of a single long sentence; consider breaking it up.")
        score -= 2
    
    return json.dumps({
        "word_count": word_count, 
        "sentences": len(sentences), 
        "paragraphs": len(paragraphs), 
        "issues": issues,
        "score": max(1, score),
        "score_explanation": f"Structure score: {max(1, score)}/10. Based on {word_count} words, {len(sentences)} sentences, {len(paragraphs)} paragraphs."
    })

def check_readability_with_textstat(text: str) -> str:
    """Uses the 'textstat' library to calculate objective readability scores."""
    logging.info("Running ReadabilityCheck")
    if not text.strip():
        return json.dumps({"score": 0, "level": "unassessable", "readability_score": 1, "score_explanation": "Cannot assess readability of empty text."})
    
    flesch_score = textstat.flesch_reading_ease(text)
    level = "Very Easy"
    readability_score = 10
    
    if flesch_score < 30: 
        level = "Very Confusing (College Graduate)"
        readability_score = 3
    elif flesch_score < 60: 
        level = "Difficult"
        readability_score = 6
    elif flesch_score < 80: 
        level = "Fairly Easy"
        readability_score = 8
    
    return json.dumps({
        "flesch_reading_ease_score": flesch_score, 
        "level": level,
        "readability_score": readability_score,
        "score_explanation": f"Readability score: {readability_score}/10. Flesch score of {flesch_score:.1f} indicates {level.lower()} reading level."
    })

def check_professionalism_with_library(text: str) -> str:
    """Checks for unprofessional content using the 'better-profanity' library."""
    logging.info("Running ProfessionalismCheck")
    issues = []
    score = 10

    if profanity.contains_profanity(text):
        issues.append("Inappropriate or profane language was found.")
        score -= 5
    if re.search(r'\b[A-Z]{4,}\b', text) and sum(1 for c in text if c.isupper()) / max(1, len(text)) > 0.3:
        issues.append("Excessive capitalization is used, which appears unprofessional.")
        score -= 3
    if re.search(r'[!?@#$%^&*()]{4,}', text):
        issues.append("Excessive punctuation or symbols are used.")
        score -= 2

    return json.dumps({
        "issues": issues,
        "score": max(1, score),
        "score_explanation": f"Professionalism score: {max(1, score)}/10. {len(issues)} professional issues detected."
    })

def check_redundancy(text: str) -> str:
    """Analyzes the text for repetitive sentences and overused words."""
    logging.info("Running RedundancyCheck")
    sentences = [s.lower().strip() for s in re.split(r'[.!?]+', text) if s.strip()]
    issues = []
    score = 10
    
    if len(sentences) > 2:
        unique_sentences = len(set(sentences))
        total_sentences = len(sentences)
        redundancy_ratio = unique_sentences / total_sentences
        if redundancy_ratio < 0.5:
            redundancy_percentage = 100 - (redundancy_ratio * 100)
            issues.append(f"High sentence redundancy detected. {redundancy_percentage:.0f}% of sentences are repetitive.")
            score = max(1, int(redundancy_ratio * 10))
        
        return json.dumps({
            "redundancy_issues": issues,
            "score": score,
            "score_explanation": f"Redundancy score: {score}/10. Unique sentence ratio: {redundancy_ratio:.2f}"
        })
    
    return json.dumps({
        "redundancy_issues": [],
        "score": 10,
        "score_explanation": "Redundancy score: 10/10. Not enough sentences to check for repetition."
    })

# ==============================================================================
# 3. AGENT CLASS DEFINITION
# ==============================================================================

class ContentQualityAgent:
    def __init__(self, model="gpt-4o-mini", temperature=0):
        self.llm = ChatOpenAI(model=model, temperature=temperature)
        self.setup_agent()

    def setup_agent(self):
        tools = [
            Tool(name="ReadabilityCheck", func=check_readability_with_textstat, description="Use to get the readability score of a piece of text."),
            Tool(name="ProfessionalismCheck", func=check_professionalism_with_library, description="Use to check a piece of text for unprofessional language."),
            Tool(name="LengthAndStructureCheck", func=check_length_and_structure, description="Use to get word count and structure of a piece of text."),
            Tool(name="RedundancyCheck", func=check_redundancy, description="Use to check a piece of text for repetitive sentences."),
            TavilySearchResults(name="FactCheckSearch", max_results=3, description="Use to verify factual claims in a piece of text.")
        ]
        
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an expert Quality Assurance assistant. Your goal is to provide a complete quality report for a given piece of plain text.

            **Your Job:**
            1.  You will be given a piece of plain text.
            2.  You will use your tools (ReadabilityCheck, ProfessionalismCheck, etc.) to get reports on the text.
            3.  You MUST **perform your own grammar and spelling check** on the original text. Look for spelling mistakes, incorrect punctuation, subject-verb agreement, and incorrect word usage.
            4.  You will then synthesize all of this information—the tool outputs AND your own grammar analysis—into the final JSON report.

            **Your Plan (What the agent executor will do):**
            - You will receive a plain text input.
            - You must run these tools:
                    1.  ReadabilityCheck
                    2.  ProfessionalismCheck
                    3.  LengthAndStructureCheck
                    4.  RedundancyCheck
                    5.  FactCheckSearch (use this *only* if the text makes a specific, verifiable factual claim, e.g., "The sun is 100 miles away").
            
            **FINAL ANSWER (Your Synthesis Step):**
            After all tools run, you will receive their outputs. You must then look at the *original text* again, perform your detailed grammar and spelling check, and then generate the final JSON.

            **FINAL ANSWER FORMATTING INSTRUCTIONS:**
            Your final answer MUST be a single JSON object with the specified structure.
            {{
                "overall_score": <An integer score from 1-10, which is the average of all category scores>,
                "category_scores": {{
                    "grammar_and_spelling": <Score 1-10, based on YOUR analysis. Deduct points for errors.>,
                    "readability": <Score 1-10, from the ReadabilityCheck tool output>,
                    "professionalism_and_tone": <Score 1-10, from the ProfessionalismCheck tool output>,
                    "factual_accuracy": <Score 1-10. Default to 10 if no claims to check or if FactCheckSearch finds no errors.>,
                    "redundancy": <Score 1-10, from the RedundancyCheck tool output>
                }},
                "score_explanations": {{
                    "grammar_and_spelling": "<Explanation for grammar score, including a list of errors YOU found (e.g., 'Spelling error: "wrogn" should be "wrong"'). If no errors, say 'No errors found.'>",
                    "readability": "<Explanation for readability score, from the ReadabilityCheck tool output>",
                    "professionalism_and_tone": "<Explanation for professionalism score, from the ProfessionalismCheck tool output>",
                    "factual_accuracy": "<Explanation for factual accuracy score. If claims were checked, summarize findings.>",
                    "redundancy": "<Explanation for redundancy score, from the RedundancyCheck tool output>"
                }},
                "summary": "<A natural language summary of key issues and suggestions for improvement.>"
            }}
            """),
            ("human", "{input}"),
            ("placeholder", "{agent_scratchpad}"),
        ])
        
        agent = create_openai_functions_agent(self.llm, tools, prompt)
        self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

    async def validate_async(self, input_text: str) -> Dict[str, Any]:
        """Runs the agent asynchronously on a plain text string."""
        
        prompt = f"Please provide a comprehensive quality analysis of the following text:\n---\n{input_text}\n---"
        
        result = {} 
        try:
            # Use ainvoke for async execution
            result = await self.agent_executor.ainvoke({"input": prompt})
            
            # Find the JSON object in the output
            json_match = re.search(r'\{.*\}', result['output'], re.DOTALL)
            if json_match:
                return json.loads(json_match.group(0))
            else:
                logging.error(f"No JSON object found in agent output: {result['output']}")
                raise json.JSONDecodeError("No JSON object found in agent output.", result.get('output', ''), 0)
        except (json.JSONDecodeError, KeyError) as e:
            logging.error(f"❌ Error during agent execution or parsing: {e}")
            return {"error": "Failed to generate a valid report.", "raw_output": result.get('output', 'No output was generated.')}
        except Exception as e:
            logging.error(f"❌ Unexpected agent error: {e}")
            return {"error": f"An unexpected error occurred: {str(e)}", "raw_output": "Agent execution failed."}


# ==============================================================================
# 4. GLOBAL AGENT INITIALIZATION
# ==============================================================================

logging.info("🚀 Initializing Content Quality Agent...")
validator: Optional[ContentQualityAgent] = None

if not os.getenv("OPENAI_API_KEY") or not os.getenv("TAVILY_API_KEY"):
    logging.critical("❌ FATAL ERROR: API keys (OPENAI_API_KEY, TAVILY_API_KEY) must be in .env file.")
    validator = None
else:
    try:
        validator = ContentQualityAgent()
        logging.info("✅ Agent Initialized. Ready to test.")
    except Exception as e:
        logging.exception(f"Failed to initialize ContentQualityAgent: {e}")
        validator = None

# ==============================================================================
# 5. JUPYTER TEST EXECUTION
# ==============================================================================

async def run_test():
    """Defines and runs the test in an async function."""
    
    if validator is None:
        print("❌ Validator not initialized. Check your .env file and API keys.")
        return

    # --- DEFINE YOUR TEST TEXT HERE ---
    test_text = "ATTENTION!!! THIS IS THE MOST IMPORTANT BREAKTHROUGH IN SCIENCE HISTORY!!!! SCIENTISTS HAVE FINALLY DISCOVERED THE SECRET TO IMMORTALITY AND IT'S ABSOLUTELY AMAZING!!!! EVERYONE NEEDS TO KNOW THIS RIGHT NOW!!!! THE GOVERNMENT DOESN'T WANT YOU TO KNOW THIS TRUTH!!!! WAKE UP PEOPLE!!!!!!! THIS IS NOT A JOKE!!!!!"
    
    print(f"--- Validating text: ---\n{test_text}\n--------------------------")
    
    # Run the agent
    report = await validator.validate_async(test_text)
    
    # Print the results
    if "error" in report:
        print(f"❌ Agent failed to produce valid report. Raw output: {report.get('raw_output')}")
    else:
        print("✅ Successfully generated report!")
        # Pretty-print the JSON
        print(json.dumps(report, indent=2))
        
        print("\n--- Summary ---")
        print(f"Overall Score: {report.get('overall_score')}")
        print(f"Grammar: {report.get('score_explanations', {}).get('grammar_and_spelling')}")
        print(f"Factual Accuracy: {report.get('score_explanations', {}).get('factual_accuracy')}")
        print(f"Summary: {report.get('summary')}")

# --- This runs the async function ---
# In a Jupyter notebook, you can often just use 'await run_test()'
# but using asyncio.run() is the most reliable way.
print("--- Starting Agent Test ---")
asyncio.run(run_test())

2025-11-10 11:41:46,938 - INFO - 🚀 Initializing Content Quality Agent...
2025-11-10 11:41:46,970 - INFO - ✅ Agent Initialized. Ready to test.


--- Starting Agent Test ---
--- Validating text: ---
ATTENTION!!! THIS IS THE MOST IMPORTANT BREAKTHROUGH IN SCIENCE HISTORY!!!! SCIENTISTS HAVE FINALLY DISCOVERED THE SECRET TO IMMORTALITY AND IT'S ABSOLUTELY AMAZING!!!! EVERYONE NEEDS TO KNOW THIS RIGHT NOW!!!! THE GOVERNMENT DOESN'T WANT YOU TO KNOW THIS TRUTH!!!! WAKE UP PEOPLE!!!!!!! THIS IS NOT A JOKE!!!!!
--------------------------


[1m> Entering new AgentExecutor chain...[0m


2025-11-10 11:41:47,962 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-11-10 11:41:49,262 - INFO - Running ReadabilityCheck


[32;1m[1;3m
Invoking: `ReadabilityCheck` with `ATTENTION!!! THIS IS THE MOST IMPORTANT BREAKTHROUGH IN SCIENCE HISTORY!!!! SCIENTISTS HAVE FINALLY DISCOVERED THE SECRET TO IMMORTALITY AND IT'S ABSOLUTELY AMAZING!!!! EVERYONE NEEDS TO KNOW THIS RIGHT NOW!!!! THE GOVERNMENT DOESN'T WANT YOU TO KNOW THIS TRUTH!!!! WAKE UP PEOPLE!!!!!!! THIS IS NOT A JOKE!!!!!`


[0m[36;1m[1;3m{"flesch_reading_ease_score": 59.279420289855096, "level": "Difficult", "readability_score": 6, "score_explanation": "Readability score: 6/10. Flesch score of 59.3 indicates difficult reading level."}[0m

2025-11-10 11:41:50,087 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-11-10 11:41:51,402 - INFO - Running ProfessionalismCheck


[32;1m[1;3m
Invoking: `ProfessionalismCheck` with `ATTENTION!!! THIS IS THE MOST IMPORTANT BREAKTHROUGH IN SCIENCE HISTORY!!!! SCIENTISTS HAVE FINALLY DISCOVERED THE SECRET TO IMMORTALITY AND IT'S ABSOLUTELY AMAZING!!!! EVERYONE NEEDS TO KNOW THIS RIGHT NOW!!!! THE GOVERNMENT DOESN'T WANT YOU TO KNOW THIS TRUTH!!!! WAKE UP PEOPLE!!!!!!! THIS IS NOT A JOKE!!!!!`


[0m[33;1m[1;3m{"issues": ["Excessive capitalization is used, which appears unprofessional.", "Excessive punctuation or symbols are used."], "score": 5, "score_explanation": "Professionalism score: 5/10. 2 professional issues detected."}[0m

2025-11-10 11:41:55,773 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-11-10 11:41:58,297 - INFO - Running LengthAndStructureCheck on text (approx 311 chars)


[32;1m[1;3m
Invoking: `LengthAndStructureCheck` with `ATTENTION!!! THIS IS THE MOST IMPORTANT BREAKTHROUGH IN SCIENCE HISTORY!!!! SCIENTISTS HAVE FINALLY DISCOVERED THE SECRET TO IMMORTALITY AND IT'S ABSOLUTELY AMAZING!!!! EVERYONE NEEDS TO KNOW THIS RIGHT NOW!!!! THE GOVERNMENT DOESN'T WANT YOU TO KNOW THIS TRUTH!!!! WAKE UP PEOPLE!!!!!!! THIS IS NOT A JOKE!!!!!`


[0m[38;5;200m[1;3m{"word_count": 46, "sentences": 7, "paragraphs": 1, "issues": [], "score": 10, "score_explanation": "Structure score: 10/10. Based on 46 words, 7 sentences, 1 paragraphs."}[0m

2025-11-10 11:41:59,182 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-11-10 11:42:00,832 - INFO - Running RedundancyCheck


[32;1m[1;3m
Invoking: `RedundancyCheck` with `ATTENTION!!! THIS IS THE MOST IMPORTANT BREAKTHROUGH IN SCIENCE HISTORY!!!! SCIENTISTS HAVE FINALLY DISCOVERED THE SECRET TO IMMORTALITY AND IT'S ABSOLUTELY AMAZING!!!! EVERYONE NEEDS TO KNOW THIS RIGHT NOW!!!! THE GOVERNMENT DOESN'T WANT YOU TO KNOW THIS TRUTH!!!! WAKE UP PEOPLE!!!!!!! THIS IS NOT A JOKE!!!!!`


[0m[31;1m[1;3m{"redundancy_issues": [], "score": 10, "score_explanation": "Redundancy score: 10/10. Unique sentence ratio: 1.00"}[0m

2025-11-10 11:42:01,697 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


[32;1m[1;3mNow, I will perform a detailed grammar and spelling check on the original text.

### Grammar and Spelling Check:
1. **Spelling Errors**: No spelling errors found.
2. **Punctuation Issues**: The excessive use of exclamation marks and capitalization is not standard and can be considered a punctuation issue.
3. **Subject-Verb Agreement**: No issues found.
4. **Word Usage**: The phrase "the most important breakthrough in science history" could be more clearly stated as "the most important breakthrough in the history of science."

### Final JSON Report:
```json
{
    "overall_score": 7,
    "category_scores": {
        "grammar_and_spelling": 8,
        "readability": 6,
        "professionalism_and_tone": 5,
        "factual_accuracy": 10,
        "redundancy": 10
    },
    "score_explanations": {
        "grammar_and_spelling": "Grammar score: 8/10. No spelling errors found. Punctuation is excessive, and the phrase 'the most important breakthrough in science history' could b

In [37]:
# this code has the check_redundancy and check hallunication 
import json
import re
import os
import logging
from typing import Dict, Any, Union, Optional

# --- Python Built-in ---
from datetime import datetime
import asyncio

# --- Third-Party Imports ---
import textstat
from dotenv import load_dotenv
from better_profanity import profanity

# --- LangChain Imports ---
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain.tools import Tool
from langchain_community.tools.tavily_search import TavilySearchResults

# ==============================================================================
# 1. INITIAL SETUP (Env, Logging)
# ==============================================================================

# Load environment variables from .env file
load_dotenv()

# Setup professional logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# ==============================================================================
# 2. TOOL LOGIC FUNCTIONS
# ==============================================================================

def check_length_and_structure(text: str) -> str:
    """Analyzes content length and structure (word count, paragraphs)."""
    logging.info(f"Running LengthAndStructureCheck on text (approx {len(text)} chars)")
    words = text.split()
    word_count = len(words)
    sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()]
    paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
    issues = []
    score = 10
    
    if word_count < 15:
        issues.append(f"Content is very short with only {word_count} words.")
        score -= 3
    if word_count > 150 and len(paragraphs) == 1:
        issues.append("Long content is not broken into paragraphs, making it hard to read.")
        score -= 2
    if len(sentences) < 2 and word_count > 20:
        issues.append("Content consists of a single long sentence; consider breaking it up.")
        score -= 2
    
    return json.dumps({
        "word_count": word_count, 
        "sentences": len(sentences), 
        "paragraphs": len(paragraphs), 
        "issues": issues,
        "score": max(1, score),
        "score_explanation": f"Structure score: {max(1, score)}/10. Based on {word_count} words, {len(sentences)} sentences, {len(paragraphs)} paragraphs."
    })

def check_readability_with_textstat(text: str) -> str:
    """Uses the 'textstat' library to calculate objective readability scores."""
    logging.info("Running ReadabilityCheck")
    if not text.strip():
        return json.dumps({"score": 0, "level": "unassessable", "readability_score": 1, "score_explanation": "Cannot assess readability of empty text."})
    
    flesch_score = textstat.flesch_reading_ease(text)
    level = "Very Easy"
    readability_score = 10
    
    if flesch_score < 30: 
        level = "Very Confusing (College Graduate)"
        readability_score = 3
    elif flesch_score < 60: 
        level = "Difficult"
        readability_score = 6
    elif flesch_score < 80: 
        level = "Fairly Easy"
        readability_score = 8
    
    return json.dumps({
        "flesch_reading_ease_score": flesch_score, 
        "level": level,
        "readability_score": readability_score,
        "score_explanation": f"Readability score: {readability_score}/10. Flesch score of {flesch_score:.1f} indicates {level.lower()} reading level."
    })

def check_professionalism_with_library(text: str) -> str:
    """Checks for unprofessional content using the 'better-profanity' library."""
    logging.info("Running ProfessionalismCheck")
    issues = []
    score = 10

    if profanity.contains_profanity(text):
        issues.append("Inappropriate or profane language was found.")
        score -= 5
    if re.search(r'\b[A-Z]{4,}\b', text) and sum(1 for c in text if c.isupper()) / max(1, len(text)) > 0.3:
        issues.append("Excessive capitalization is used, which appears unprofessional.")
        score -= 3
    if re.search(r'[!?@#$%^&*()]{4,}', text):
        issues.append("Excessive punctuation or symbols are used.")
        score -= 2

    return json.dumps({
        "issues": issues,
        "score": max(1, score),
        "score_explanation": f"Professionalism score: {max(1, score)}/10. {len(issues)} professional issues detected."
    })

# --- NEWLY ADDED TOOL ---
def check_redundancy(text: str) -> str:
    """Analyzes the text for repetitive sentences using local logic."""
    logging.info("Running RedundancyCheck")
    sentences = [s.lower().strip() for s in re.split(r'[.!?]+', text) if s.strip()]
    issues = []
    score = 10
    
    if len(sentences) > 2:
        unique_sentences = len(set(sentences))
        total_sentences = len(sentences)
        redundancy_ratio = unique_sentences / total_sentences
        if redundancy_ratio < 0.5:
            redundancy_percentage = 100 - (redundancy_ratio * 100)
            issues.append(f"High sentence redundancy detected. {redundancy_percentage:.0f}% of sentences are repetitive.")
            score = max(1, int(redundancy_ratio * 10))
        
        return json.dumps({
            "redundancy_issues": issues,
            "score": score,
            "score_explanation": f"Redundancy score: {score}/10. Unique sentence ratio: {redundancy_ratio:.2f}"
        })
    
    return json.dumps({
        "redundancy_issues": [],
        "score": 10,
        "score_explanation": "Redundancy score: 10/10. Not enough sentences to check for repetition."
    })

# ==============================================================================
# 3. AGENT CLASS DEFINITION (UPDATED)
# ==============================================================================

class ContentQualityAgent:
    def __init__(self, model="gpt-4o-mini", temperature=0):
        self.llm = ChatOpenAI(model=model, temperature=temperature)
        self.setup_agent()

    def setup_agent(self):
        # --- UPDATED tools list ---
        tools = [
            Tool(name="ReadabilityCheck", func=check_readability_with_textstat, description="Use to get the readability score of a piece of text."),
            Tool(name="ProfessionalismCheck", func=check_professionalism_with_library, description="Use to check a piece of text for unprofessional language."),
            Tool(name="LengthAndStructureCheck", func=check_length_and_structure, description="Use to get word count and structure of a piece of text."),
            Tool(name="RedundancyCheck", func=check_redundancy, description="Use to check a piece of text for repetitive sentences."), # <-- NEWLY ADDED
            TavilySearchResults(name="FactCheckSearch", max_results=3, description="Use to verify factual claims in a piece of text.")
        ]
        
        # --- UPDATED system prompt ---
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an expert Quality Assurance assistant. Your goal is to provide a complete quality report for a given piece of plain text.

            **Your Job:**
            1.  You will be given a piece of plain text.
            2.  You will use your tools (ReadabilityCheck, ProfessionalismCheck, etc.) to get reports on the text.
            3.  You MUST **perform your own grammar and spelling check** on the original text. Look for spelling mistakes, incorrect punctuation, subject-verb agreement, and incorrect word usage.
            4.  You MUST **check for hallucinations**. To do this, use the `FactCheckSearch` tool to verify any factual claims. If a claim is contradicted by the search results, or if no supporting evidence is found, you must flag it.
            5.  You will then synthesize all of this information—the tool outputs, your grammar analysis, AND your hallucination analysis—into the final JSON report.

            **Your Plan (What the agent executor will do):**
            - You will receive a plain text input.
            - You must run these tools:
                    1.  ReadabilityCheck
                    2.  ProfessionalismCheck
                    3.  LengthAndStructureCheck
                    4.  RedundancyCheck
                    5.  FactCheckSearch (use this *only* if the text makes a specific, verifiable factual claim, e.g., "The sun is 100 miles away").
            
            **FINAL ANSWER (Your Synthesis Step):**
            After all tools run, you will receive their outputs. You must then look at the *original text* again, perform your detailed grammar and spelling check, and analyze the `FactCheckSearch` results to check for hallucinations. Then, generate the final JSON.

            **FINAL ANSWER FORMATTING INSTRUCTIONS:**
            Your final answer MUST be a single JSON object with the specified structure.
            {{
                "overall_score": <An integer score from 1-10, which is the average of all category scores>,
                "category_scores": {{
                    "grammar_and_spelling": <Score 1-10, based on YOUR analysis. Deduct points for errors.>,
                    "readability": <Score 1-10, from the ReadabilityCheck tool output>,
                    "professionalism_and_tone": <Score 1-10, from the ProfessionalismCheck tool output>,
                    "hallucination_and_factual_accuracy": <Score 1-10. Default to 10 if no claims to check. If claims were checked, score is based on `FactCheckSearch` results. Deduct points for contradictions or lack of support.>,
                    "redundancy": <Score 1-10, from the RedundancyCheck tool output>
                }},
                "score_explanations": {{
                    "grammar_and_spelling": "<Explanation for grammar score, including a list of errors YOU found (e.g., 'Spelling error: "wrogn" should be "wrong"'). If no errors, say 'No errors found.'>",
                    "readability": "<Explanation for readability score, from the ReadabilityCheck tool output>",
                    "professionalism_and_tone": "<Explanation for professionalism score, from the ProfessionalismCheck tool output>",
                    "hallucination_and_factual_accuracy": "<Explanation for factual accuracy score. If claims were checked, summarize findings from FactCheckSearch. If claims are contradicted, state it clearly.>",
                    "redundancy": "<Explanation for redundancy score, from the RedundancyCheck tool output>"
                }},
                "summary": "<A natural language summary of key issues and suggestions for improvement.>"
            }}
            """),
            ("human", "{input}"),
            ("placeholder", "{agent_scratchpad}"),
        ])
        
        agent = create_openai_functions_agent(self.llm, tools, prompt)
        self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

    async def validate_async(self, input_text: str) -> Dict[str, Any]:
        """Runs the agent asynchronously on a plain text string."""
        
        prompt = f"Please provide a comprehensive quality analysis of the following text:\n---\n{input_text}\n---"
        
        result = {} 
        try:
            # Use ainvoke for async execution
            result = await self.agent_executor.ainvoke({"input": prompt})
            
            # Find the JSON object in the output
            json_match = re.search(r'\{.*\}', result['output'], re.DOTALL)
            if json_match:
                return json.loads(json_match.group(0))
            else:
                logging.error(f"No JSON object found in agent output: {result['output']}")
                raise json.JSONDecodeError("No JSON object found in agent output.", result.get('output', ''), 0)
        except (json.JSONDecodeError, KeyError) as e:
            logging.error(f"❌ Error during agent execution or parsing: {e}")
            return {"error": "Failed to generate a valid report.", "raw_output": result.get('output', 'No output was generated.')}
        except Exception as e:
            logging.error(f"❌ Unexpected agent error: {e}")
            return {"error": f"An unexpected error occurred: {str(e)}", "raw_output": "Agent execution failed."}


# ==============================================================================
# 4. GLOBAL AGENT INITIALIZATION
# ==============================================================================

logging.info("🚀 Initializing Content Quality Agent...")
validator: Optional[ContentQualityAgent] = None

if not os.getenv("OPENAI_API_KEY") or not os.getenv("TAVILY_API_KEY"):
    logging.critical("❌ FATAL ERROR: API keys (OPENAI_API_KEY, TAVILY_API_KEY) must be in .env file.")
    validator = None
else:
    try:
        validator = ContentQualityAgent()
        logging.info("✅ Agent Initialized. Ready to test.")
    except Exception as e:
        logging.exception(f"Failed to initialize ContentQualityAgent: {e}")
        validator = None

# ==============================================================================
# 5. JUPYTER TEST EXECUTION (UPDATED)
# ==============================================================================

async def run_test():
    """Defines and runs the test in an async function."""
    
    if validator is None:
        print("❌ Validator not initialized. Check your .env file and API keys.")
        return

    # --- UPDATED TEST TEXT ---
    # This new text includes redundancy and two factual claims (one correct, one incorrect).
    test_text = """
This fucking research paper is complete bullshit and the authors don't know what the hell they're talking about. The damn methodology is shit and their conclusions are fucking wrong. Anyone who believes this crap is a complete idiot and needs to get their head out of their ass. This is the worst piece of garbage I've ever read in my entire life.
    """
    
    print(f"--- Validating text: ---\n{test_text}\n--------------------------")
    
    # Run the agent
    report = await validator.validate_async(test_text)
    
    # Print the results
    if "error" in report:
        print(f"❌ Agent failed to produce valid report. Raw output: {report.get('raw_output')}")
    else:
        print("✅ Successfully generated report!")
        # Pretty-print the JSON
        print(json.dumps(report, indent=2))
        
        # --- UPDATED print statements ---
        print("\n--- Summary ---")
        print(f"Overall Score: {report.get('overall_score')}")
        
        explanations = report.get('score_explanations', {})
        print(f"\nGrammar: {explanations.get('grammar_and_spelling')}")
        print(f"Redundancy: {explanations.get('redundancy')}")
        print(f"Hallucination/Factual: {explanations.get('hallucination_and_factual_accuracy')}")
        
        print(f"\nSummary: {report.get('summary')}")

# --- This runs the async function ---
print("--- Starting Agent Test ---")
asyncio.run(run_test())

2025-11-10 11:48:38,192 - INFO - 🚀 Initializing Content Quality Agent...
2025-11-10 11:48:38,224 - INFO - ✅ Agent Initialized. Ready to test.


--- Starting Agent Test ---
--- Validating text: ---

This fucking research paper is complete bullshit and the authors don't know what the hell they're talking about. The damn methodology is shit and their conclusions are fucking wrong. Anyone who believes this crap is a complete idiot and needs to get their head out of their ass. This is the worst piece of garbage I've ever read in my entire life.
    
--------------------------


[1m> Entering new AgentExecutor chain...[0m


2025-11-10 11:48:39,124 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-11-10 11:48:40,682 - INFO - Running ReadabilityCheck


[32;1m[1;3m
Invoking: `ReadabilityCheck` with `This fucking research paper is complete bullshit and the authors don't know what the hell they're talking about. The damn methodology is shit and their conclusions are fucking wrong. Anyone who believes this crap is a complete idiot and needs to get their head out of their ass. This is the worst piece of garbage I've ever read in my entire life.`


[0m[36;1m[1;3m{"flesch_reading_ease_score": 72.38959677419356, "level": "Fairly Easy", "readability_score": 8, "score_explanation": "Readability score: 8/10. Flesch score of 72.4 indicates fairly easy reading level."}[0m

2025-11-10 11:48:41,676 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-11-10 11:48:43,026 - INFO - Running ProfessionalismCheck


[32;1m[1;3m
Invoking: `ProfessionalismCheck` with `This fucking research paper is complete bullshit and the authors don't know what the hell they're talking about. The damn methodology is shit and their conclusions are fucking wrong. Anyone who believes this crap is a complete idiot and needs to get their head out of their ass. This is the worst piece of garbage I've ever read in my entire life.`


[0m[33;1m[1;3m{"issues": ["Inappropriate or profane language was found."], "score": 5, "score_explanation": "Professionalism score: 5/10. 1 professional issues detected."}[0m

2025-11-10 11:48:44,240 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-11-10 11:48:45,558 - INFO - Running LengthAndStructureCheck on text (approx 347 chars)


[32;1m[1;3m
Invoking: `LengthAndStructureCheck` with `This fucking research paper is complete bullshit and the authors don't know what the hell they're talking about. The damn methodology is shit and their conclusions are fucking wrong. Anyone who believes this crap is a complete idiot and needs to get their head out of their ass. This is the worst piece of garbage I've ever read in my entire life.`


[0m[38;5;200m[1;3m{"word_count": 62, "sentences": 4, "paragraphs": 1, "issues": [], "score": 10, "score_explanation": "Structure score: 10/10. Based on 62 words, 4 sentences, 1 paragraphs."}[0m

2025-11-10 11:48:47,035 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-11-10 11:48:48,648 - INFO - Running RedundancyCheck


[32;1m[1;3m
Invoking: `RedundancyCheck` with `This fucking research paper is complete bullshit and the authors don't know what the hell they're talking about. The damn methodology is shit and their conclusions are fucking wrong. Anyone who believes this crap is a complete idiot and needs to get their head out of their ass. This is the worst piece of garbage I've ever read in my entire life.`


[0m[31;1m[1;3m{"redundancy_issues": [], "score": 10, "score_explanation": "Redundancy score: 10/10. Unique sentence ratio: 1.00"}[0m

2025-11-10 11:48:49,547 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


[32;1m[1;3m
Invoking: `FactCheckSearch` with `{'query': "This research paper is complete bullshit and the authors don't know what they're talking about."}`


[0m[36;1m[1;3m[{'title': 'How To Spot Bad Science - Farnam Street', 'url': 'https://fs.blog/spot-bad-science/', 'content': 'Entitled “Randomized, Double-Blind, Placebo-Controlled, Linear Dose, Crossover Study to Evaluate the Efficacy and Safety of a Green Coffee Bean Extract in Overweight Subjects,” it was published in 2012 in the journal Diabetes, Metabolic Syndrome and Obesity: Targets and Therapy. On the face of it, and to the untrained eye, the study may appear legitimate, but it is rife with serious problems, as Scott Gavura explained in the article “Dr. Oz and Green Coffee Beans – More Weight Loss [...] The Food and Drug Administration (FDA) recommends studies relating to weight-loss consist of at least 3,000 participants receiving the active medication and at least 1,500 receiving a placebo, all for a minimum period of

2025-11-10 11:48:54,216 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


[32;1m[1;3m{
    "overall_score": 5,
    "category_scores": {
        "grammar_and_spelling": 3,
        "readability": 8,
        "professionalism_and_tone": 5,
        "hallucination_and_factual_accuracy": 10,
        "redundancy": 10
    },
    "score_explanations": {
        "grammar_and_spelling": "The text contains several instances of inappropriate language and slang, which detracts from its overall quality. Additionally, the use of profanity is excessive. No traditional spelling or grammatical errors were found, but the tone is highly informal. Overall, the score is reduced due to the lack of professionalism.",
        "readability": "The readability score is 8/10, indicating that the text is fairly easy to read. The Flesch reading ease score of 72.4 supports this assessment.",
        "professionalism_and_tone": "The professionalism score is 5/10 due to the presence of inappropriate and profane language. This language undermines the credibility of the critique.",
        "ha

In [11]:
"""
Fetch Learning Content from MongoDB
Efficiently retrieves content from nested structure
"""

from pymongo import MongoClient
from typing import List, Dict, Optional
import pprint


class LearningContentFetcher:
    """Fetches learning content from MongoDB collection"""
    
    def __init__(self, connection_string: str = 'mongodb://localhost:27017/',
                 database: str = 'cornerstone_curated',
                 collection: str = 'bulk_generate_content_metadata'):
        """Initialize MongoDB connection"""
        self.client = MongoClient(connection_string)
        self.db = self.client[database]
        self.collection = self.db[collection]
        
        print(f"✅ Connected to MongoDB")
        print(f"🗂️  Database: {database}")
        print(f"📝 Collection: {collection}")
    
    def get_learning_content_by_id(self, doc_id: str) -> Optional[str]:
        """
        Fetch learning content for a specific document ID
        
        Args:
            doc_id: MongoDB ObjectId as string or custom ID
            
        Returns:
            Learning content string or None
        """
        try:
            from bson.objectid import ObjectId
            
            # Try as ObjectId first, then as regular ID
            try:
                query = {"_id": ObjectId(doc_id)}
            except:
                query = {"_id": doc_id}
            
            # Use projection to fetch only the content field
            doc = self.collection.find_one(
                query,
                {"data.content": 1, "_id": 0}
            )
            
            if doc and 'data' in doc and 'content' in doc['data']:
                return doc['data']['content']
            else:
                print(f"❌ No content found for ID: {doc_id}")
                return None
                
        except Exception as e:
            print(f"❌ Error fetching content: {e}")
            return None
    
    def get_all_learning_content(self, limit: Optional[int] = None) -> List[Dict[str, str]]:
        """
        Fetch all learning content from collection
        
        Args:
            limit: Maximum number of documents to fetch (None for all)
            
        Returns:
            List of dicts with _id and content
        """
        try:
            # Use projection to fetch only necessary fields
            projection = {"_id": 1, "data.content": 1}
            
            # Build query
            cursor = self.collection.find({}, projection)
            
            if limit:
                cursor = cursor.limit(limit)
            
            results = []
            for doc in cursor:
                if 'data' in doc and 'content' in doc['data']:
                    results.append({
                        "_id": str(doc['_id']),
                        "content": doc['data']['content']
                    })
            
            print(f"✅ Fetched {len(results)} documents with learning content")
            return results
            
        except Exception as e:
            print(f"❌ Error fetching all content: {e}")
            return []
    
    def get_learning_content_with_metadata(self, limit: Optional[int] = None) -> List[Dict]:
        """
        Fetch learning content along with useful metadata
        
        Args:
            limit: Maximum number of documents to fetch
            
        Returns:
            List of dicts with content and metadata
        """
        try:
            # Fetch more fields for context
            projection = {
                "_id": 1,
                "data.content": 1,
                "data.title": 1,
                "data.topic": 1,
                "data.subtopic": 1,
                "created_at": 1,
                "updated_at": 1
            }
            
            cursor = self.collection.find({}, projection)
            
            if limit:
                cursor = cursor.limit(limit)
            
            results = []
            for doc in cursor:
                if 'data' in doc and 'content' in doc['data']:
                    result = {
                        "_id": str(doc['_id']),
                        "content": doc['data']['content']
                    }
                    
                    # Add metadata if available
                    if 'data' in doc:
                        result['title'] = doc['data'].get('title', 'N/A')
                        result['topic'] = doc['data'].get('topic', 'N/A')
                        result['subtopic'] = doc['data'].get('subtopic', 'N/A')
                    
                    if 'created_at' in doc:
                        result['created_at'] = doc['created_at']
                    
                    results.append(result)
            
            print(f"✅ Fetched {len(results)} documents with content and metadata")
            return results
            
        except Exception as e:
            print(f"❌ Error fetching content with metadata: {e}")
            return []
    
    def get_content_by_filter(self, filter_query: Dict, limit: Optional[int] = None) -> List[Dict[str, str]]:
        """
        Fetch learning content based on custom filter
        
        Args:
            filter_query: MongoDB query filter
            limit: Maximum number of documents to fetch
            
        Returns:
            List of dicts with _id and content
        """
        try:
            projection = {"_id": 1, "data.content": 1}
            
            cursor = self.collection.find(filter_query, projection)
            
            if limit:
                cursor = cursor.limit(limit)
            
            results = []
            for doc in cursor:
                if 'data' in doc and 'content' in doc['data']:
                    results.append({
                        "_id": str(doc['_id']),
                        "content": doc['data']['content']
                    })
            
            print(f"✅ Fetched {len(results)} documents matching filter")
            return results
            
        except Exception as e:
            print(f"❌ Error fetching filtered content: {e}")
            return []
    
    def get_content_stats(self) -> Dict:
        """Get statistics about the learning content"""
        try:
            pipeline = [
                {
                    "$project": {
                        "content_length": {"$strLenCP": "$data.content"},
                        "word_count": {
                            "$size": {
                                "$split": ["$data.content", " "]
                            }
                        }
                    }
                },
                {
                    "$group": {
                        "_id": None,
                        "total_docs": {"$sum": 1},
                        "avg_content_length": {"$avg": "$content_length"},
                        "min_content_length": {"$min": "$content_length"},
                        "max_content_length": {"$max": "$content_length"},
                        "avg_word_count": {"$avg": "$word_count"},
                        "total_words": {"$sum": "$word_count"}
                    }
                }
            ]
            
            result = list(self.collection.aggregate(pipeline))
            
            if result:
                stats = result[0]
                stats.pop('_id', None)
                return stats
            else:
                return {}
                
        except Exception as e:
            print(f"❌ Error getting content stats: {e}")
            return {}
    
    def sample_content(self, n: int = 3) -> None:
        """Display sample content for inspection"""
        try:
            print(f"\n📋 Sampling {n} documents...")
            print("="*70)
            
            docs = self.get_learning_content_with_metadata(limit=n)
            
            for i, doc in enumerate(docs, 1):
                print(f"\n📄 Document {i}:")
                print(f"   ID: {doc['_id']}")
                print(f"   Title: {doc.get('title', 'N/A')}")
                print(f"   Topic: {doc.get('topic', 'N/A')}")
                print(f"   Subtopic: {doc.get('subtopic', 'N/A')}")
                
                content = doc['content']
                print(f"   Content Length: {len(content)} chars")
                print(f"   Word Count: {len(content.split())} words")
                print(f"   Preview: {content[:200]}...")
                print("-"*70)
                
        except Exception as e:
            print(f"❌ Error sampling content: {e}")
    
    def close(self):
        """Close MongoDB connection"""
        self.client.close()
        print("🔒 MongoDB connection closed")


# ============================================================================
# USAGE EXAMPLES
# ============================================================================

def main():
    """Example usage"""
    
    # Initialize fetcher
    fetcher = LearningContentFetcher()
    
    # Example 1: Get statistics about content
    print("\n📊 Content Statistics:")
    print("="*70)
    stats = fetcher.get_content_stats()
    pprint.pprint(stats)
    
    # Example 2: Sample some content
    fetcher.sample_content(n=3)
    
    # Example 3: Fetch all content (with limit for testing)
    print("\n📥 Fetching all learning content...")
    all_content = fetcher.get_all_learning_content(limit=5)
    print(f"✅ Retrieved {len(all_content)} documents")
    
    # Example 4: Fetch content by specific ID
    if all_content:
        first_id = all_content[0]['_id']
        print(f"\n🔍 Fetching content by ID: {first_id}")
        content = fetcher.get_learning_content_by_id(first_id)
        if content:
            print(f"✅ Content length: {len(content)} chars")
            print(f"📝 Preview: {content[:300]}...")
    
    # Example 5: Fetch with metadata for QC processing
    print("\n📦 Fetching content with metadata...")
    content_with_meta = fetcher.get_content_with_metadata(limit=5)
    for item in content_with_meta:
        print(f"\n📄 {item['title']}")
        print(f"   Topic: {item['topic']} > {item['subtopic']}")
        print(f"   Content: {len(item['content'])} chars")
    
    # Example 6: Filter by specific criteria
    print("\n🔎 Example: Filter by topic (if available)")
    # Adjust this filter based on your actual data structure
    # filtered = fetcher.get_content_by_filter(
    #     {"data.topic": "Mathematics"}, 
    #     limit=5
    # )
    
    # Close connection
    fetcher.close()


if __name__ == "__main__":
    main()


# ============================================================================
# QUICK REFERENCE - Common Use Cases
# ============================================================================

"""
# 1. FETCH SINGLE CONTENT BY ID
fetcher = LearningContentFetcher()
content = fetcher.get_learning_content_by_id("your_document_id")

# 2. FETCH ALL CONTENT (BATCH)
all_content = fetcher.get_all_learning_content(limit=100)

# 3. FETCH WITH METADATA
content_with_meta = fetcher.get_content_with_metadata(limit=10)

# 4. ITERATE AND PROCESS
for item in all_content:
    doc_id = item['_id']
    content = item['content']
    # Process content (e.g., run QC Agent)
    # result = qc_agent.validate(content)

# 5. FILTER BY CUSTOM QUERY
filtered = fetcher.get_content_by_filter(
    {"data.topic": "Science", "data.subtopic": "Physics"},
    limit=50
)

# 6. BATCH PROCESSING WITH QC AGENT
fetcher = LearningContentFetcher()
qc_agent = QCAgent()

content_items = fetcher.get_all_learning_content(limit=100)

for item in content_items:
    print(f"Processing: {item['_id']}")
    report = await qc_agent.validate(item['content'])
    
    # Save results back to MongoDB or separate collection
    # db.qc_results.insert_one({
    #     "content_id": item['_id'],
    #     "redundancy_score": report.redundancy_score,
    #     "hallucination_score": report.hallucination_score,
    #     "overall_score": report.overall_score,
    #     "passed": report.passed
    # })

fetcher.close()
"""

✅ Connected to MongoDB
🗂️  Database: cornerstone_curated
📝 Collection: bulk_generate_content_metadata

📊 Content Statistics:
❌ Error getting content stats: PlanExecutor error during aggregation :: caused by :: $strLenCP requires a string argument, found: missing, full error: {'ok': 0.0, 'errmsg': 'PlanExecutor error during aggregation :: caused by :: $strLenCP requires a string argument, found: missing', 'code': 34471, 'codeName': 'Location34471'}
{}

📋 Sampling 3 documents...
✅ Fetched 0 documents with content and metadata

📥 Fetching all learning content...
✅ Fetched 0 documents with learning content
✅ Retrieved 0 documents

📦 Fetching content with metadata...


AttributeError: 'LearningContentFetcher' object has no attribute 'get_content_with_metadata'