In [24]:
import os
import json
import yaml
import boto3
import fitz  # PyMuPDF
import io
import shutil
from PIL import Image
from anthropic import Anthropic
from collections import OrderedDict

def load_config():
    with open('config/config.yaml', 'r') as file:
        return yaml.safe_load(file)

def init_aws_client(config):
    return boto3.client(
        's3',
        aws_access_key_id=config['aws']['aws_access_key_id'],
        aws_secret_access_key=config['aws']['aws_secret_access_key'],
        region_name=config['aws']['region_name']
    )

def init_claude(config):
    return Anthropic(
        api_key=config['anthropic']['claud_key']
    )

def get_s3_public_url(bucket_name, file_key, region):
    return f"https://{bucket_name}.s3.{region}.amazonaws.com/{file_key}"

def ensure_tmp_dir():
    """Create tmp directory in the working folder if it doesn't exist"""
    tmp_dir = os.path.join(os.getcwd(), 'tmp')
    if not os.path.exists(tmp_dir):
        os.makedirs(tmp_dir)
    else:
        # Clean existing files in tmp directory
        for filename in os.listdir(tmp_dir):
            file_path = os.path.join(tmp_dir, filename)
            try:
                if os.path.isfile(file_path):
                    os.unlink(file_path)
            except Exception as e:
                print(f'Error deleting {file_path}: {e}')
    return tmp_dir

def extract_text_from_pdf(pdf_path):
    """Extract both regular text and text from images in PDF"""
    doc = fitz.open(pdf_path)
    text_content = []
    
    for page_num in range(len(doc)):
        page = doc[page_num]
        
        # Extract regular text
        text_content.append(page.get_text())
        
        # Extract images and their text content
        image_list = page.get_images()
        for img_index, img in enumerate(image_list):
            try:
                # Get image data
                xref = img[0]
                base_image = doc.extract_image(xref)
                image_bytes = base_image["image"]
                
                # Convert to PIL Image for potential future processing
                image = Image.open(io.BytesIO(image_bytes))
                text_content.append(f"[Embedded Image {page_num + 1}-{img_index + 1}]")
                
            except Exception as e:
                print(f"Error extracting image {img_index} from page {page_num}: {str(e)}")
    
    doc.close()
    return "\n".join(text_content)

def analyze_with_claude(client, text):
    prompt = """ Analyze this document and extract the following information in JSON format. The text contains course syllabus information, likely in both Hebrew and English. The document may contain references to embedded images. Required JSON structure: { "course_name": "Course name in original language", "program_manager": "Look for 'מנהל התוכנית' or program manager", "instructors": [ { "name": "Instructor name", "role": "Role (e.g., יועץ מקצועי, מרצה, מרצה בכיר, מדריך)", "title": "Professional title if available", "description": "Additional description or background" } ], "summary": "A comprehensive summary of the course content", "embedded_images": ["Simple list of image names/references found in text"], "full_text": "The complete text from the document" } Keep all text in its original language (Hebrew or English). Ensure proper extraction of Hebrew text and names. If a field is not found, use null or empty array []. Place the complete input text in the full_text field. Note any embedded image references found in the text in the embedded_images array. Document text: {text} """
    
    try:
        message = client.messages.create(
            model="claude-3-sonnet-20240229",
            max_tokens=100000,
            messages=[
                {
                    "role": "user",
                    "content": prompt.format(text=text)
                }
            ]
        )
        
        # Get the response text and clean it up
        response_text = message.content[0].text.strip()
        
        # Try to find the JSON part in the response
        try:
            # Find the first { and last } to extract just the JSON part
            start = response_text.find('{')
            end = response_text.rfind('}') + 1
            if start != -1 and end != 0:
                json_str = response_text[start:end]
                claude_response = json.loads(json_str)
            else:
                raise ValueError("No JSON found in response")
                
        except json.JSONDecodeError as e:
            print(f"JSON Parse Error: {str(e)}")
            print(f"Response text: {response_text}")
            raise
        
        # Create ordered dictionary for the final response
        ordered_response = OrderedDict([
            ("course_name", claude_response.get("course_name", "")),
            ("program_manager", claude_response.get("program_manager", "")),
            ("instructors", claude_response.get("instructors", [])),
            ("summary", claude_response.get("summary", "")),
            ("embedded_images", claude_response.get("embedded_images", [])),
            ("full_text", claude_response.get("full_text", text))
        ])
        
        return ordered_response
        
    except Exception as e:
        print(f"Error analyzing with Claude: {str(e)}")
        print(f"Full error context: ", e)
        return OrderedDict([
            ("course_name", ""),
            ("program_manager", ""),
            ("instructors", []),
            ("summary", "Error analyzing content"),
            ("embedded_images", []),
            ("full_text", text)
        ])


def process_files(config):
    s3_client = init_aws_client(config)
    claude_client = init_claude(config)
    
    # Create and ensure tmp directory is clean
    tmp_dir = ensure_tmp_dir()
    
    # Create local directory for JSON files
    json_output_dir = os.path.join(os.getcwd(), 'output', 'json')
    os.makedirs(json_output_dir, exist_ok=True)
    
    # List all files in the upload bucket/path
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(
        Bucket=config['aws']['upload_bucket_name'],
        Prefix=config['aws']['upload_path']
    )

    for page in pages:
        for obj in page.get('Contents', []):
            file_key = obj['Key']
            file_name = os.path.basename(file_key)
            
            # Only process PDFs
            if not file_name.lower().endswith('.pdf'):
                continue

            print(f"Processing file: {file_name}")
            
            # Create paths for temporary PDF and output JSON
            tmp_pdf_path = os.path.join(tmp_dir, file_name)
            json_filename = f"{os.path.splitext(file_name)[0]}.json"
            local_json_path = os.path.join(json_output_dir, json_filename)
            
            try:
                # Download file to temporary location
                s3_client.download_file(
                    config['aws']['upload_bucket_name'],
                    file_key,
                    tmp_pdf_path
                )

                # Extract text (including image references)
                extracted_text = extract_text_from_pdf(tmp_pdf_path)
                
                # Analyze with Claude
                analysis = analyze_with_claude(claude_client, extracted_text)
                
                # Create final JSON with file_url as first item
                final_output = OrderedDict([
                    ("file_url", get_s3_public_url(
                        config['aws']['upload_bucket_name'],
                        file_key,
                        config['aws']['region_name']
                    ))
                ])
                final_output.update(analysis)
                
                # Save JSON locally
                with open(local_json_path, 'w', encoding='utf-8') as f:
                    json.dump(final_output, f, ensure_ascii=False, indent=2)
                
                # Save to S3 (ensure forward slashes for S3 path)
                json_key = os.path.join(
                    config['aws']['extract_txt_path'],
                    json_filename
                ).replace('\\', '/')  # Convert Windows path separators to forward slashes
                
                s3_client.put_object(
                    Bucket=config['aws']['txt_extract_bucket_name'],
                    Key=json_key,
                    Body=json.dumps(final_output, ensure_ascii=False, indent=2).encode('utf-8')
                )

                print(f"Successfully processed and saved: {file_name}")
                print(f"JSON saved locally at: {local_json_path}")
                print(f"JSON uploaded to S3: s3://{config['aws']['txt_extract_bucket_name']}/{json_key}")

            except Exception as e:
                print(f"Error processing file {file_name}: {str(e)}")
                print(f"Full error context: ", e)
            
            finally:
                # Clean up the temporary PDF file
                if os.path.exists(tmp_pdf_path):
                    try:
                        os.remove(tmp_pdf_path)
                    except Exception as e:
                        print(f"Error removing temporary file {tmp_pdf_path}: {e}")

def main():
    config = load_config()
    process_files(config)

In [25]:
main()

Processing file: AWS Practitioner Sylabus.pdf
Error analyzing with Claude: ' "course_name"'
Full error context:  ' "course_name"'
Successfully processed and saved: AWS Practitioner Sylabus.pdf
JSON saved locally at: C:\github_repos\BIU_LLM_Project\output\json\AWS Practitioner Sylabus.json
JSON uploaded to S3: s3://ct-external-sources/ds/rag01/extract/AWS Practitioner Sylabus.json
Processing file: AWS Solution Architect Sylabus.pdf
Error analyzing with Claude: ' "course_name"'
Full error context:  ' "course_name"'
Successfully processed and saved: AWS Solution Architect Sylabus.pdf
JSON saved locally at: C:\github_repos\BIU_LLM_Project\output\json\AWS Solution Architect Sylabus.json
JSON uploaded to S3: s3://ct-external-sources/ds/rag01/extract/AWS Solution Architect Sylabus.json
Processing file: Advanced Project Management Sylabus.pdf
Error analyzing with Claude: ' "course_name"'
Full error context:  ' "course_name"'
Successfully processed and saved: Advanced Project Management Sylabus

In [None]:
if __name__ == "__main__":
    main()

'\ndef main():\n    config = load_config()\n    process_files(config)\n\nif __name__ == "__main__":\n    main()\n'