In [None]:
!pip show boto3 pdfplumber chromadb langchain tqdm

In [None]:
import boto3
import pdfplumber
import re
import json
import io
import chromadb
from chromadb.config import Settings
from botocore.exceptions import ClientError
from langchain.text_splitter import RecursiveCharacterTextSplitter
from tqdm import tqdm
import pandas as pd
import os
import shutil
import logging
import unicodedata

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pdf_processing.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# AWS setup
s3_client = boto3.client('s3')
bedrock_client = boto3.client('bedrock-runtime')
bucket_name = 'your-bucket-name'  # Replace with your S3 bucket name
prefix = 'path/to/pdfs/'

# Initialize Chroma
chroma_client = chromadb.Client(Settings(persist_directory=None))
collection = chroma_client.create_collection(name="pdf_embeddings")

# Semantic text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=30000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ". ", " ", ""],
    length_function=len
)

# Folder setup
temp_folder = "temp-pdf"
if os.path.exists(temp_folder):
    shutil.rmtree(temp_folder)  # Delete folder and contents
os.makedirs(temp_folder)  # Recreate empty folder

In [None]:
def clean_text(text):
    """Clean text by removing non-UTF-8 characters and control characters."""
    if not isinstance(text, str):
        text = str(text)  # Convert non-strings (e.g., None, bytes) to string
    # Normalize Unicode characters
    text = unicodedata.normalize('NFKD', text)
    # Remove control characters
    text = ''.join(c for c in text if c.isprintable() or c.isspace())
    # Strip whitespace
    text = text.strip()
    return text

def validate_chunk(text):
    """Validate chunk for Titan embedding input."""
    if not text:
        return False, "Empty or null input"
    if len(text) > 50000:  # Titan limit
        return False, f"Input too long ({len(text)} characters)"
    if not any(c.isalnum() for c in text):
        return False, "No alphanumeric content"
    return True, ""

def generate_titan_embedding(text, model_id="amazon.titan-embed-text-v2:0", dimensions=1024, normalize=True):
    cleaned_text = clean_text(text)
    is_valid, reason = validate_chunk(cleaned_text)
    if not is_valid:
        logger.warning(f"Invalid input for embedding: {reason}")
        return None
    try:
        body = json.dumps({
            "inputText": cleaned_text,
            "dimensions": dimensions,
            "normalize": normalize
        })
        response = bedrock_client.invoke_model(
            body=body,
            modelId=model_id,
            accept='application/json',
            contentType='application/json'
        )
        response_body = json.loads(response['body'].read())
        return response_body['embedding']
    except ClientError as e:
        logger.error(f"Error generating embedding: {e}")
        return None

def extract_urls(text):
    url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+'
    return re.findall(url_pattern, text)

def is_valid_pdf(file_content):
    """Check if the content is a valid PDF."""
    try:
        with pdfplumber.open(io.BytesIO(file_content)) as pdf:
            if not pdf.pages or len(pdf.pages) == 0:
                return False
            return True
    except Exception as e:
        logger.warning(f"Invalid PDF detected: {e}")
        return False

def process_pdf(file_content, file_key, temp_folder):
    chunks = []
    metadata = []
    file_key_safe = file_key.replace('/', '_')
    
    # Save raw PDF for debugging
    raw_pdf_path = os.path.join(temp_folder, f"{file_key_safe}_raw.pdf")
    with open(raw_pdf_path, 'wb') as f:
        f.write(file_content)
    logger.info(f"Saved raw PDF: {raw_pdf_path}")
    
    # Validate PDF
    if not is_valid_pdf(file_content):
        logger.error(f"Skipping {file_key}: Invalid or empty PDF")
        return chunks, metadata
    
    try:
        with pdfplumber.open(io.BytesIO(file_content)) as pdf:
            if pdf.pages is None:
                logger.error(f"Skipping {file_key}: No pages found")
                return chunks, metadata
            for page_num, page in enumerate(pdf.pages):
                text = page.extract_text() or ""
                if text:
                    text_chunks = text_splitter.split_text(text)
                    for idx, chunk in enumerate(text_chunks):
                        cleaned_chunk = clean_text(chunk)
                        if cleaned_chunk:
                            chunks.append(cleaned_chunk)
                            meta = {"file": file_key, "page": page_num + 1, "type": "text"}
                            metadata.append(meta)
                            chunk_file = os.path.join(temp_folder, f"{file_key_safe}_page{page_num+1}_text_{idx}.txt")
                            with open(chunk_file, 'w', encoding='utf-8') as f:
                                f.write(f"Metadata: {json.dumps(meta)}\n\nContent:\n{cleaned_chunk}")
                tables = page.extract_tables()
                for table_idx, table in enumerate(tables):
                    table_text = "\n".join([",".join(str(cell) if cell is not None else "" for cell in row) for row in table if row])
                    cleaned_table = clean_text(table_text)
                    if cleaned_table:
                        chunks.append(cleaned_table[:40000])
                        meta = {"file": file_key, "page": page_num + 1, "type": "table"}
                        metadata.append(meta)
                        table_file = os.path.join(temp_folder, f"{file_key_safe}_page{page_num+1}_table_{table_idx}.txt")
                        with open(table_file, 'w', encoding='utf-8') as f:
                            f.write(f"Metadata: {json.dumps(meta)}\n\nContent:\n{cleaned_table}")
                    else:
                        logger.warning(f"Skipped empty or invalid table at {file_key} page {page_num+1}")
                urls = extract_urls(text)
                for url_idx, url in enumerate(urls):
                    cleaned_url = clean_text(url)
                    if cleaned_url:
                        chunks.append(cleaned_url)
                        meta = {"file": url, "page": page_num +  "1_url", "type": "url"}
                        metadata.append(cleaned_url)
                        url_file = os.path.join(temp_folder, f"{file_key_safe}_page{page_num+1}_url_{url_idx}.txt")
                        with open(url_file, 'w', encoding='utf-8') as f:
                            f.write(f"Metadata: {json.dumps(meta)}\n\nContent: {cleaned_url}\n")
    except Exception as e:
        logger.error(f"Error processing {file_key}: {e}")
        return chunks, metadata
    
    return chunks, metadata

In [None]:
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix='')
pdf_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('.pdf')]

valid_pdfs = []
for key in tqdm(pdf_files, desc="Validating PDFs"):
    try:
        obj = s3_client.head_object(Bucket=bucket_name, Key=key)
        if obj.get('ContentType') == 'application/pdf' and obj.get('ContentLength') > 1024:
            valid_pdfs.append(key)
        else:
            logger.warning(f"Skipping {key}: Invalid Content-Type ({obj.get('ContentType')}) or too small ({obj.get('ContentLength')} bytes)")
    except ClientError as e:
        logger.error(f"Error checking {key}: {e}")

results = []
for file_key in tqdm(valid_pdfs, desc="Processing PDFs"):
    logger.info(f"Processing {file_key}..."")
    try:
        obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
        file_content = obj['Body'].read()
        if not file_content:
            logger.warning(f"Skipping {file_key}: Empty file")
            continue
    except ClientError as e:
        logger.error(f"Error downloading {file_key}: {e}")
        continue
    try:
        chunks, metadata = process_pdf(file_content, file_key, temp_folder)
    except Exception as e:
        logger.error(f"Unexpected error processing {file_key}: {e}")
        continue
    embeddings = []
    valid_chunks = []
    valid_metadata = []
    for chunk, meta in zip(chunks, metadata):
        embedding = generate_titan_embedding(chunk)
        if embedding:
            embeddings.append(embedding)
            valid_chunks.append(chunk)
            valid_metadata.append(meta)
        else:
            # Save invalid chunk for debugging
            invalid_file = os.path.join(temp_folder, f"{file_key.replace('/', '_')}_invalid_chunk.txt")
            with open(invalid_file, 'a', encoding='utf-8') as f:
                f.write(f"Metadata: {json.dumps(meta)}\n\nContent:\n{chunk}\n\n")
    if embeddings:
        collection.add(
            embeddings=embeddings,
            documents=valid_chunks,
            metadatas=valid_metadata,
            ids=[f"{file_key}_chunk_{i}" for i in range(len(valid_chunks))]
        )
    results.append({
        "file": file_key,
        "num_chunks": len(valid_chunks),
        "num_embeddings": len(embeddings),
        "saved_files": [f for f in os.listdir(temp_folder) if f.startswith(file_key.replace('/', '_'))]
    })

pd.DataFrame(results)

In [None]:
query = "Find information about machine learning"
query_embedding = generate_titan_embedding(query)
results = collection.query(query_embeddings=[query_embedding], n_results=5)

query_results = []
for doc, meta, dist in zip(results['documents'][0], results['metadatas'][0], results['distances'][0]):
    query_results.append({
        "file": meta['file'],
        "page": meta['page'],
        "type": meta['type'],
        "distance": dist,
        "content": doc[:200] + "..." if len(doc) > 200 else doc
    })

pd.DataFrame(query_results)