In [None]:
# Set up logging
import logging
import requests
import io
import base64
import re
import urllib.parse
from bs4 import BeautifulSoup
from PIL import Image
from concurrent.futures import ThreadPoolExecutor, as_completed
import gradio as gr

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

API_URL = "http://localhost:1234/v1/chat/completions"

# Check if URL contains adult content
def is_adult_content(url):
    adult_keywords = ['porn', 'xxx', 'adult', 'sex', 'nude', 'nsfw', 'explicit', 'pornography']
    parsed_url = urllib.parse.urlparse(url)
    domain = parsed_url.netloc.lower()
    path = parsed_url.path.lower()
    
    for keyword in adult_keywords:
        if keyword in domain or keyword in path:
            return True
    return False

# Extract images from a webpage with improved handling
def extract_images_from_url(url):
    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        images = []
        seen_urls = set()  # To avoid duplicates
        
        # Find all img tags
        for img in soup.find_all('img'):
            img_url = None
            # Check various image attributes in priority order
            for attr in ['src', 'data-src', 'data-lazy-src', 'data-original', 'data-srcset', 'srcset']:
                if attr in img.attrs and img[attr]:
                    img_url = img[attr]
                    # Handle srcset by taking the first URL
                    if attr in ['srcset', 'data-srcset'] and ' ' in img[attr]:
                        img_url = img[attr].split(' ')[0]
                    break
            
            if img_url:
                # Skip base64 encoded images
                if img_url.startswith('data:image'):
                    continue
                    
                # Convert relative URLs to absolute
                if img_url.startswith('/'):
                    parsed_url = urllib.parse.urlparse(url)
                    base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
                    img_url = urllib.parse.urljoin(base_url, img_url)
                elif not img_url.startswith(('http://', 'https://')):
                    img_url = urllib.parse.urljoin(url, img_url)
                
                # Skip duplicate images
                if img_url in seen_urls:
                    continue
                seen_urls.add(img_url)
                
                # Filter out common icons and tracking pixels based on URL patterns
                if any(pattern in img_url.lower() for pattern in ['icon', 'logo', 'pixel', 'tracking', 'avatar', 'blank.gif']):
                    continue
                
                # Filter out small icons, spacers, and tracking pixels
                if 'width' in img.attrs and 'height' in img.attrs:
                    try:
                        width = int(img['width'])
                        height = int(img['height'])
                        if width < 100 or height < 100:
                            continue
                    except (ValueError, TypeError):
                        pass
                
                images.append(img_url)
        
        # Return all images - no arbitrary limit
        return images
    except Exception as e:
        logger.error(f"Error extracting images: {str(e)}")
        return []

# Extract meaningful text from a webpage
def extract_text_from_url(url):
    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # Remove navigation, footer, sidebar, and other non-content elements
        for tag in soup.find_all(['nav', 'footer', 'aside']):
            tag.decompose()
        
        for tag in soup.find_all(class_=lambda x: x and any(c in str(x).lower() for c in ['nav', 'menu', 'footer', 'sidebar', 'comment', 'header', 'banner', 'ad-', 'advertisement'])):
            tag.decompose()
        
        # Look for main content containers
        main_content = soup.find(['main', 'article', 'section', 'div'], class_=lambda x: x and any(c in str(x).lower() for c in ['content', 'article', 'post', 'entry', 'main']))
        
        if main_content:
            elements = main_content.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li'])
        else:
            # Fallback to all content
            elements = soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li'])
        
        # Filter out empty paragraphs and very short text
        texts = []
        for el in elements:
            text = el.get_text(strip=True)
            if len(text) > 20:  # Filter out very short text fragments
                texts.append(text)
        
        if not texts:
            return "No meaningful text found."
        
        return " ".join(texts)
    except Exception as e:
        logger.error(f"Error extracting text: {str(e)}")
        return f"Failed to extract text: {str(e)}"

# Encode an image with robust error handling
def encode_image(image_url):
    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
        response = requests.get(image_url, headers=headers, stream=True)
        response.raise_for_status()
        
        # Check content type to verify it's an image
        content_type = response.headers.get('Content-Type', '')
        if not content_type.startswith('image/'):
            logger.warning(f"Not an image: {image_url} (Content-Type: {content_type})")
            return None
        
        # Save content to BytesIO
        img_content = response.content
        
        # Verify image can be opened with PIL
        try:
            with Image.open(io.BytesIO(img_content)) as img:
                img.verify()  # Verify it's a valid image
            return img_content
        except Exception as e:
            logger.warning(f"Invalid image format: {image_url} - {str(e)}")
            return None
            
    except Exception as e:
        logger.error(f"Error encoding image {image_url}: {str(e)}")
        return None

# Generate image captions with proper resource management - FIXED FUNCTION
def generate_caption(image_data):
    if not image_data:
        return "Failed to process image."

    try:
        # Create a new BytesIO object for each operation to avoid closing issues
        img_bytes = io.BytesIO(image_data)
        
        # Open and validate the image
        with Image.open(img_bytes) as image:
            img_format = image.format
            if not img_format:
                logger.warning("Image has no format information")
                img_format = "JPEG"  # Default to JPEG
            
            # Get image dimensions for potential resizing
            width, height = image.size
            
            # Create a new copy for processing to avoid resource issues
            image_copy = image.copy()
        
        # Resize large images to save bandwidth
        max_size = 1024
        if max(width, height) > max_size:
            if width > height:
                new_width = max_size
                new_height = int(height * (max_size / width))
            else:
                new_height = max_size
                new_width = int(width * (max_size / height))
            
            # Use the copied image for resizing
            image_copy = image_copy.resize((new_width, new_height), Image.LANCZOS)
        
        # Handle image mode conversion in a separate buffer
        buffered = io.BytesIO()
        
        if image_copy.mode in ['RGBA', 'LA']:
            # Convert to RGB
            background = Image.new('RGB', image_copy.size, (255, 255, 255))
            if image_copy.mode == 'RGBA':
                background.paste(image_copy, mask=image_copy.split()[3])
            else:
                background.paste(image_copy, mask=image_copy.split()[1])
            background.save(buffered, format="JPEG", quality=85)
            # Explicitly close the background image
            background.close()
        else:
            # Make sure we're in a common format (RGB or L)
            if image_copy.mode not in ['RGB', 'L']:
                image_copy = image_copy.convert('RGB')
            image_copy.save(buffered, format="JPEG", quality=85)
        
        # Get base64 encoded data and ensure resources are closed
        buffered.seek(0)
        base64_image = base64.b64encode(buffered.getvalue()).decode("utf-8")
        
        # Explicitly close resources
        buffered.close()
        image_copy.close()
        
        # Validate base64 data
        if not base64_image:
            logger.error("Failed to encode image to base64")
            return "Error generating caption: Failed to encode image"

        payload = {
            "model": "llava",
            "messages": [{"role": "user", "content": [
                {"type": "text", "text": "Describe this image in detail. What is shown? What is the main subject? What key elements are visible? Provide a thorough but concise caption."},
                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
            ]}],
            "temperature": 0.5,
            "max_tokens": 30000000
        }

        logger.info("Sending image for captioning")
        response = requests.post(API_URL, json=payload)
        response.raise_for_status()
        
        caption = response.json()["choices"][0]["message"]["content"]
        logger.info("Successfully generated caption")
        return caption
        
    except Exception as e:
        logger.error(f"Error generating caption: {str(e)}")
        return f"Image description unavailable"

# Process images with improved resource management
def process_images(images, max_workers=3):
    captions = []
    
    if not images:
        return [], []
    
    # First encode all images
    encoded_images = []
    for img_url in images:
        encoded = encode_image(img_url)
        if encoded:
            encoded_images.append((img_url, encoded))
    
    if not encoded_images:
        return [], []
    
    # Then caption them with better error handling
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_img = {executor.submit(generate_caption, img_data): img_url 
                         for img_url, img_data in encoded_images}
        
        completed_captions = []
        for future in as_completed(future_to_img):
            img_url = future_to_img[future]
                
            try:
                caption = future.result()
                if caption and not caption.startswith("Error"):
                    completed_captions.append((img_url, caption))
            except Exception as e:
                logger.error(f"Exception while captioning image {img_url}: {str(e)}")
    
    # Return results sorted by original image order
    result_urls = []
    result_captions = []
    for url, data in encoded_images:
        for img_url, caption in completed_captions:
            if url == img_url:
                result_urls.append(img_url)
                result_captions.append(caption)
                break
    
    return result_urls, result_captions

# Process a URL
def process_url(url):
    if is_adult_content(url):
        return [], [], "Sorry, I cannot process adult content."
    
    try:
        # Extract images
        images = extract_images_from_url(url)
        logger.info(f"Found {len(images)} images")
        
        # Process images concurrently
        image_urls, captions = process_images(images)
        logger.info(f"Generated {len(captions)} captions")
        
        # Extract text from webpage
        text = extract_text_from_url(url)
        logger.info(f"Extracted {len(text)} characters of text")
        
        # If text extraction failed but we have images, still continue
        if text.startswith("Failed to extract text") and captions:
            text = "Text extraction failed, but image content was analyzed."
        
        # Combine captions with text for summarization
        combined_text = text
        if captions:
            caption_text = "\n\nImage descriptions:\n" + "\n".join([f"- {caption}" for caption in captions])
            combined_text = combined_text + caption_text
        
        # Generate summary
        logger.info("Generating summary")
        summary = summarize_text(combined_text)
        
        return image_urls, captions, summary
    except Exception as e:
        logger.error(f"Error in process_url: {str(e)}")
        return [], [], f"Error processing URL: {str(e)}"

# Summarize extracted text with image captions included
def summarize_text(text):
    if not text or text == "No meaningful text found." or text.startswith("Failed to extract text"):
        return "No content found to summarize."

    # Increase text length limit for more content
    text = text[:8000] if len(text) > 8000 else text

    payload = {
        "model": "slim-summary-phi-3",
        "messages": [
            {"role": "system", "content": "You are an expert content analyst and summarizer. Create a comprehensive summary that includes key insights from both the text and image descriptions. Organize the summary in a structured format with main points and supporting details."},
            {"role": "user", "content": f"Please summarize the following webpage content, including both text and image descriptions:\n\n{text}"}
        ],
        "temperature": 0.3,
        "max_tokens": 1000000
    }

    try:
        response = requests.post(API_URL, json=payload)
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]
    except Exception as e:
        logger.error(f"Summarization failed: {str(e)}")
        return "I was able to extract content but couldn't generate a summary. Here are the key points from the images and text I found."

# Extract URL from text with improved validation
def extract_url(text):
    url_pattern = re.compile(r'(https?://\S+|www\.\S+)')
    match = url_pattern.search(text)
    
    if not match:
        return None
        
    url = match.group(0)
    
    # Handle URLs that don't start with http/https
    if url.startswith('www.'):
        url = 'https://' + url
        
    # Remove trailing punctuation that might have been captured
    url = url.rstrip('.,;:!?)')
    
    # Validate URL format
    try:
        result = urllib.parse.urlparse(url)
        return url if all([result.scheme, result.netloc]) else None
    except:
        return None

# Chatbot with improved error handling
def chatbot(message, history):
    if not message.strip():
        return history, ""
        
    url = extract_url(message)
    response = ""

    if url:
        # Show processing message
        processing_msg = f"Processing {url}... This may take some time depending on the content amount. Please wait for complete results."
        history.append((message, processing_msg))
        yield history, ""
        
        # Check for adult content
        if is_adult_content(url):
            response = "Sorry, I cannot process adult content."
            history[-1] = (message, response)
            yield history, ""
            return
            
        try:
            # Process URL
            images, captions, summary = process_url(url)
            
            if not images and not summary:
                response = f"I was unable to extract meaningful content from {url}. The site may block scraping or use complex JavaScript."
            elif summary.startswith("Error") or summary.startswith("Summarization failed"):
                # Fallback to just showing what we have
                response = f"**Partial results from {url}:**\n\n"
                if images and captions:
                    response += "**Image Descriptions:**\n\n"
                    for i, caption in enumerate(captions):
                        response += f"**Image {i+1}:** {caption}\n\n"
                else:
                    response += "I was able to find the page but couldn't fully process its content. Try a different URL or ask a simpler question."
            else:
                response = f"**Summary of {url}:**\n\n{summary}\n\n"
                if captions:
                    response += "**Image Descriptions:**\n\n"
                    for i, caption in enumerate(captions):
                        response += f"**Image {i+1}:** {caption}\n\n"
            
            history[-1] = (message, response)
            yield history, ""
        except Exception as e:
            logger.error(f"Error in chatbot URL processing: {str(e)}")
            response = f"I encountered an error when processing {url}. The site may be unavailable or not compatible with my analysis capabilities."
            history[-1] = (message, response)
            yield history, ""
    
    elif history:
        # Context-aware Q&A after summarization
        try:
            # Get all previous content for context
            context = "\n\n".join([turn[1] for turn in history if "Summary of" in turn[1] or "Image Descriptions:" in turn[1]])
            
            if not context:
                response = "Please provide a URL first so I can analyze content."
            else:
                payload = {
                    "model": "slim-summary-phi-3",
                    "messages": [
                        {"role": "system", "content": "Answer based on the previously summarized content. Be specific and cite information from the summary. If the question cannot be answered based on the available information, clearly state that."},
                        {"role": "user", "content": f"Based on this summarized content, answer the following question: {message}\n\nSummarized content:\n{context}"}
                    ],
                    "temperature": 0.3,
                    "max_tokens": 500000
                }

                api_response = requests.post(API_URL, json=payload)
                api_response.raise_for_status()
                response = api_response.json()["choices"][0]["message"]["content"]
        except Exception as e:
            logger.error(f"Error in Q&A: {str(e)}")
            response = "I couldn't process your question about the previous content. Could you try rephrasing it?"
    else:
        response = "I'm here to summarize web content. Please provide a URL to begin."

    history.append((message, response))
    yield history, ""

# Gradio UI with better styling and functionality
with gr.Blocks(css="footer {visibility: hidden}") as iface:
    gr.Markdown("# Enhanced Web Content Analyzer")
    gr.Markdown("""
    Enter a URL to extract and analyze web content:
    1. Captions all meaningful images without timeouts
    2. Extracts relevant text (ignores navigation, ads, etc.)
    3. Generates a comprehensive summary using all available content
    
    
    Note: Processing may take longer for content-heavy sites, but will be more complete.
    """)
    
    chatbot_interface = gr.Chatbot(label="Chat", height=600, bubble_full_width=False)
    with gr.Row():
        msg = gr.Textbox(placeholder="Enter a URL ...", show_label=False)
        submit = gr.Button("Send")
    
    
    
    msg.submit(chatbot, inputs=[msg, chatbot_interface], outputs=[chatbot_interface, msg])
    submit.click(chatbot, inputs=[msg, chatbot_interface], outputs=[chatbot_interface, msg])

# Launch
if __name__ == "__main__":
    try:
        iface.launch()
    except Exception as e:
        logger.error(f"Error launching interface: {str(e)}")



* Running on local URL:  http://127.0.0.1:7870


2025-03-07 09:36:26,079 - INFO - HTTP Request: GET http://127.0.0.1:7870/gradio_api/startup-events "HTTP/1.1 200 OK"
2025-03-07 09:36:26,092 - INFO - HTTP Request: HEAD http://127.0.0.1:7870/ "HTTP/1.1 200 OK"



To create a public link, set `share=True` in `launch()`.


2025-03-07 09:36:48,528 - INFO - Found 2 images
2025-03-07 09:36:55,903 - INFO - Sending image for captioning
2025-03-07 09:36:55,915 - INFO - Sending image for captioning
2025-03-07 09:38:46,580 - INFO - Successfully generated caption
2025-03-07 09:40:24,275 - INFO - Successfully generated caption
2025-03-07 09:40:24,276 - INFO - Generated 2 captions
2025-03-07 09:40:25,857 - INFO - Extracted 25 characters of text
2025-03-07 09:40:25,858 - INFO - Generating summary
