In [1]:
import os
import json
from typing import List, Dict, Optional
import spacy
import re
import fitz
from openai import OpenAI, Client
from anthropic import Anthropic
from dataclasses import dataclass
from pathlib import Path
import shutil
from enum import Enum

In [2]:
client = Anthropic(api_key="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")

In [3]:
@dataclass
class Sentence:
    """Simple container for sentence text and length."""
    text: str
    length: int

class SplittingStrategy(Enum):
    NUMBERED_SECTIONS = "numbered_sections"
    SENTENCE_OVERLAP = "sentence_overlap"

@dataclass
class ChunkConfig:
    """Configuration for text chunking and processing."""
    max_chunk_size: int = 3000
    min_chunk_size: int = 800
    overlap_sentences: int = 2
    strategy: SplittingStrategy = SplittingStrategy.NUMBERED_SECTIONS

class DocumentProcessor:
    def __init__(self, client: Anthropic, config: ChunkConfig):
        """Initialize the document processor."""
        self.nlp = spacy.load('en_core_web_sm', disable=['tagger', 'ner'])
        self.nlp.max_length = 10000000
        self.client = client
        self.config = config
        
    def preprocess_text(self, text: str) -> str:
        """Clean and normalize text while preserving important newlines."""
        lines = text.split('\n')
        lines = [' '.join(line.split()) for line in lines if line.strip()]
        return '\n'.join(lines)
        
    def process_pdf(self, pdf_path: str) -> List[str]:
        """Extract text from PDF and split into chunks."""
        text = ""
        with fitz.open(pdf_path) as doc:
            for page in doc:
                text += page.get_text() + "\n"
        
        clean_text = self.preprocess_text(text)
        return self._chunk_text(clean_text)

    def process_txt(self, txt_path: str) -> List[str]:
        """Read text file and split into chunks."""
        try:
            with open(txt_path, 'r', encoding='utf-8') as file:
                text = file.read()
            clean_text = self.preprocess_text(text)
            return self._chunk_text(clean_text)
        except Exception as e:
            print(f"Error processing text file {txt_path}: {str(e)}")
            return []

    def _split_by_sentences(self, text: str) -> List[str]:
        """Split text using sentence overlap strategy."""
        doc = self.nlp(text)
        sentences = [Sentence(text=sent.text.strip(), length=len(sent.text.strip())) 
                    for sent in doc.sents if sent.text.strip()]
        
        chunks = []
        current_chunk = []
        current_length = 0
        
        for sentence in sentences:
            current_chunk.append(sentence)
            current_length += sentence.length
            
            if current_length >= self.config.max_chunk_size and len(current_chunk) > self.config.overlap_sentences:
                if current_length >= self.config.min_chunk_size:
                    chunks.append(' '.join(s.text for s in current_chunk))
                    overlap_sentences = current_chunk[-self.config.overlap_sentences:]
                    current_chunk = overlap_sentences.copy()
                    current_length = sum(s.length for s in current_chunk)

        if current_length >= self.config.min_chunk_size:
            chunks.append(' '.join(s.text for s in current_chunk))
            
        return chunks

    def _split_by_numbered_sections(self, text: str) -> List[str]:
        """Split text based on numbered sections."""
        lines = text.split('\n')
        initial_chunks = []
        current_chunk = []
        
        for line in lines:
            if re.match(r'^\d+\.', line.strip()):
                if current_chunk:
                    initial_chunks.append('\n'.join(current_chunk))
                current_chunk = [line]
            else:
                current_chunk.append(line)
        
        if current_chunk:
            initial_chunks.append('\n'.join(current_chunk))
        
        # Handle chunks that are too large
        final_chunks = []
        for chunk in initial_chunks:
            if len(chunk) > self.config.max_chunk_size:
                doc = self.nlp(chunk)
                sentences = list(doc.sents)
                mid_point = len(sentences) // 2
                first_half = ' '.join(sent.text.strip() for sent in sentences[:mid_point])
                second_half = ' '.join(sent.text.strip() for sent in sentences[mid_point:])
                final_chunks.extend([first_half, second_half])
            else:
                final_chunks.append(chunk)
                
        return final_chunks

    def _chunk_text(self, text: str) -> List[str]:
        """Split text using the configured strategy."""
        if self.config.strategy == SplittingStrategy.NUMBERED_SECTIONS:
            return self._split_by_numbered_sections(text)
        else:
            return self._split_by_sentences(text)

    def _parse_qa_format(self, text: str) -> List[Dict[str, str]]:
        """Parse the QUESTION_N / ANSWER_N format into structured data."""
        import re
        
        # Find all questions and answers
        questions = re.findall(r'QUESTION_\d+:\s*(.*?)(?=ANSWER_|\Z)', text, re.DOTALL)
        answers = re.findall(r'ANSWER_\d+:\s*(.*?)(?=QUESTION_|\Z)', text, re.DOTALL)
        
        if len(questions) != len(answers):
            raise ValueError(f"Mismatched Q&A count: {len(questions)} questions, {len(answers)} answers")
        
        qa_pairs = []
        for q, a in zip(questions, answers):
            qa_pairs.append({
                "question": q.strip(),
                "answer": a.strip()
            })
        
        return qa_pairs

    def generate_conversation(self, chunk: str) -> Optional[Dict]:
        """Generate conversation from chunk with validation."""
        try:
            validation_prompt = f"""Analyze this text and determine if it contains meaningful Warren Buffett insights, commentary, or narrative content.

Approve the text only if:
- It discusses business philosophy or investment thinking that applies across industries and time.
- It provides views on markets, financial practices, or economic principles that are broadly applicable.
- Buffett shares personal reflections or general lessons learned that are useful beyond a single event.

Reject the text if:
- It primarily describes a specific investment, acquisition, deal, or financial transaction.
- It focuses on a single company's business decision without a clearly stated general principle.
- It discusses short-term market conditions, quarterly earnings, or economic events without broader insights.
- It contains only financial data, figures, or statistics without meaningful explanation.

Text: {chunk}

Return ONLY "yes" if the text contains meaningful, wide-scope content, or "no" otherwise. I WANT A SINGLE YES or NO!!"""
            
            validation_response = self.client.messages.create(
                model="claude-sonnet-4-5-20250929",
                messages=[{"role": "user", "content": validation_prompt}],
                max_tokens=100,
                temperature=0
            )
            
            if validation_response.content[0].text.strip().lower() != "yes":
                return None

            conversation_prompt = f"""Below is a text excerpt from me (Warren Buffett). Your task is to generate 2 substantive questions about the key themes in this content, followed by detailed answers in my characteristic Q&A style from the annual meetings.

Text: {chunk}

CRITICAL LENGTH REQUIREMENT:
- Each answer MUST be 400-600 words minimum
- Answers under 400 words are TOO SHORT and inadequate
- Think of this as a teaching opportunity, not a quick soundbite

Question Guidelines:
- Focus on the main principles, ideas, or philosophical points in the text
- Ask questions that invite thorough, multi-faceted explanations
- Avoid yes/no questions or those requiring only brief factual answers

Answer Structure & Style:
1. OPENING: Start with a direct, clear answer to the question (2-3 sentences)

2. REASONING: Walk through the logic step-by-step
   - Explain WHY things work this way
   - Break down the underlying principles
   - Show the cause-and-effect relationships

3. EXAMPLES & ANALOGIES: Use concrete illustrations from the source text
   - If the text contains analogies or metaphors, develop them fully
   - If the text references specific examples or comparisons, explain why they matter
   - When the text discusses concrete situations, use them to illuminate principles
   - Stay grounded in what is actually present in the source material

4. BROADER IMPLICATIONS: Connect to bigger themes
   - How does this principle apply more widely?
   - What are the long-term consequences?
   - What lessons can investors and managers take away?

5. PRACTICAL APPLICATION: Ground it in reality
   - How does this play out in actual business situations?
   - What should thoughtful people do with this information?
   - What pitfalls should they avoid?

Voice & Tone:
- Write in first person as Warren Buffett
- Maintain my conversational, accessible speaking style
- Be patient and thorough in explanations, like teaching at the annual meeting
- Use simple language to explain complex ideas
- Show enthusiasm for business principles and clear thinking
- Be direct and honest, avoiding corporate-speak or jargon
- Use natural language including contractions, possessives, and quotations as needed

REMEMBER: Each answer should naturally span 400-600 words through thorough development of ideas. Develop the reasoning, explore the implications, and teach the concepts fully.

FORMAT YOUR RESPONSE EXACTLY LIKE THIS:

QUESTION_1: [Your first question here]

ANSWER_1: [Your first detailed answer here - 400-600 words]

QUESTION_2: [Your second question here]

ANSWER_2: [Your second detailed answer here - 400-600 words]

Do not include any other text, formatting, or explanations. Just the four sections above."""

            conversation_response = self.client.messages.create(
                model="claude-sonnet-4-5-20250929",
                messages=[{"role": "user", "content": conversation_prompt}],
                max_tokens=5000,
                temperature=0.4
            )
            
            response_text = conversation_response.content[0].text.strip()
            
            # Parse the simple format
            try:
                qa_pairs = self._parse_qa_format(response_text)
                if qa_pairs and len(qa_pairs) == 2:
                    # Build the ShareGPT JSON structure in Python (with proper escaping)
                    conversation_data = {
                        "conversations": [
                            [
                                {"from": "human", "value": qa_pairs[0]["question"]},
                                {"from": "gpt", "value": qa_pairs[0]["answer"]}
                            ],
                            [
                                {"from": "human", "value": qa_pairs[1]["question"]},
                                {"from": "gpt", "value": qa_pairs[1]["answer"]}
                            ]
                        ]
                    }
                    return conversation_data
                else:
                    print(f"Failed to parse exactly 2 Q&A pairs")
                    return None
                    
            except Exception as e:
                print(f"Error parsing Q&A format: {str(e)}")
                print(f"Response text: {response_text[:200]}...")
                return None
                
        except Exception as e:
            print(f"Error processing chunk: {chunk[:100]}...")
            print(f"Error details: {str(e)}")
            return None

In [4]:
config = ChunkConfig(
    max_chunk_size=3000,
    min_chunk_size=800,
    overlap_sentences=2, 
    strategy=SplittingStrategy.SENTENCE_OVERLAP  # Or NUMBERED_SECTIONS if source has clear sections
)

In [7]:
# Test with a single PDF file
processor = DocumentProcessor(client, config)
test_pdf_path = "Dataset/Unprocessed/Lessons for Corporate America/Lessons-for-Corporate-America.pdf"
chunks = processor.process_pdf(test_pdf_path)
print(f"Generated {len(chunks)} chunks from the PDF")

Generated 189 chunks from the PDF


In [8]:
if chunks:
    conversation = processor.generate_conversation(chunks[23])
    print("Sample conversation:")
    print(json.dumps(conversation, indent=2))

Sample conversation:
{
  "conversations": [
    [
      {
        "from": "human",
        "value": "You compare seeking different types of shareholders to a restaurant that can't decide between French cuisine and take-out chicken. Why is shareholder stability so important to you, and what's wrong with managements that encourage high trading volume in their stock?"
      },
      {
        "from": "gpt",
        "value": "The answer is straightforward: you simply cannot run a good business if you're constantly trying to please people who want fundamentally different things. When management encourages high trading volume, they're essentially saying they want a revolving door of owners, and that makes absolutely no sense to me.\n\nLet me walk you through the logic here. Every type of shareholder comes with different expectations. Some want high current dividends. Others want rapid capital appreciation. Still others are just looking for short-term trading profits. Now, if you try to satis

In [9]:
def process_directory(input_dir: str, output_dir: str, client: Anthropic, config: ChunkConfig):
    """Process all files in a directory and save conversations to JSON files."""
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Initialize processor
    processor = DocumentProcessor(client, config)
    
    # Get all files in input directory
    input_path = Path(input_dir)
    files = sorted(input_path.glob('*'))
    
    for file_path in files:
        # Skip directories
        if file_path.is_dir():
            continue
            
        # Only process .txt and .pdf files
        if file_path.suffix.lower() not in ['.txt', '.pdf']:
            continue
            
        print(f"Processing {file_path}")
        
        try:
            # Process the file based on its type
            if file_path.suffix.lower() == '.pdf':
                chunks = processor.process_pdf(str(file_path))
            else:  # .txt
                chunks = processor.process_txt(str(file_path))
            
            # Generate conversations for all chunks
            all_conversations = []
            for chunk in chunks:
                conversation = processor.generate_conversation(chunk)
                if conversation and 'conversations' in conversation:
                    # Add all conversations from this chunk
                    all_conversations.extend(conversation['conversations'])
            
            # Save to JSON file with same name but .json extension
            output_filename = file_path.stem + '.json'
            output_path = Path(output_dir) / output_filename
            
            # Write the combined conversations to file
            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump({"conversations": all_conversations}, f, indent=2, ensure_ascii=False)
            
            print(f"Successfully saved conversations to {output_path}")
            
        except Exception as e:
            print(f"Error processing file {file_path}: {str(e)}")
            # Save whatever we got so far
            if 'all_conversations' in locals() and all_conversations:
                output_filename = file_path.stem + '.json'
                output_path = Path(output_dir) / output_filename
                with open(output_path, 'w', encoding='utf-8') as f:
                    json.dump({"conversations": all_conversations}, f, indent=2, ensure_ascii=False)
                print(f"Successfully saved conversations to {output_path}")

In [10]:
# Process entire directory
input_directory = "Dataset/Unprocessed/Shareholder Letters/"
output_directory = "Dataset/Processed/Shareholder Letters/"
process_directory(input_directory, output_directory, client, config)

Processing Dataset\Unprocessed\Shareholder Letters\2002pdf.pdf
Successfully saved conversations to Dataset\Processed\Shareholder Letters\2002pdf.json
Processing Dataset\Unprocessed\Shareholder Letters\2003ltr.pdf
Successfully saved conversations to Dataset\Processed\Shareholder Letters\2003ltr.json
Processing Dataset\Unprocessed\Shareholder Letters\2004ltr.pdf
Successfully saved conversations to Dataset\Processed\Shareholder Letters\2004ltr.json
Processing Dataset\Unprocessed\Shareholder Letters\2005ltr.pdf
Successfully saved conversations to Dataset\Processed\Shareholder Letters\2005ltr.json
Processing Dataset\Unprocessed\Shareholder Letters\2006ltr.pdf
Successfully saved conversations to Dataset\Processed\Shareholder Letters\2006ltr.json
Processing Dataset\Unprocessed\Shareholder Letters\2007ltr.pdf
Successfully saved conversations to Dataset\Processed\Shareholder Letters\2007ltr.json
Processing Dataset\Unprocessed\Shareholder Letters\2008ltr.pdf
Successfully saved conversations to D

In [11]:
input_directory = "Dataset/Unprocessed/Lessons for Corporate America/"
output_directory = "Dataset/Processed/Lessons for Corporate America/"
process_directory(input_directory, output_directory, client, config)

Processing Dataset\Unprocessed\Lessons for Corporate America\Lessons-for-Corporate-America.pdf
Successfully saved conversations to Dataset\Processed\Lessons for Corporate America\Lessons-for-Corporate-America.json


In [12]:
config = ChunkConfig(
    max_chunk_size=2400,
    strategy=SplittingStrategy.NUMBERED_SECTIONS
)

In [13]:
# Test with a single PDF file
processor = DocumentProcessor(client, config)
test_pdf_path = "Dataset/Unprocessed/Meeting Transcripts/Berkshire Meeting Transcripts - 1994 - 2022.pdf"
chunks = processor.process_pdf(test_pdf_path)
print(f"Generated {len(chunks)} chunks from the PDF")

Generated 3450 chunks from the PDF


In [14]:
# Process entire directory
input_directory = "Dataset/Unprocessed/Meeting Transcripts/"
output_directory = "Dataset/Processed/Meeting Transcripts/"
process_directory(input_directory, output_directory, client, config)

Processing Dataset\Unprocessed\Meeting Transcripts\Berkshire Meeting Transcripts - 1994 - 2022.pdf
Error processing chunk: 20. Weapons of mass destruction pose biggest risk to Berkshire
WARREN BUFFETT: OK. Becky.
BECKY QUIC...
Error details: Error code: 500 - {'type': 'error', 'error': {'type': 'api_error', 'message': 'Internal server error'}, 'request_id': 'req_011CV8xkaskZUhsYWpPkjhqJ'}
Successfully saved conversations to Dataset\Processed\Meeting Transcripts\Berkshire Meeting Transcripts - 1994 - 2022.json


In [15]:
def merge_json_files(input_directory, output_path):
    """
    Merge all JSON files in the specified directory into a single JSON file
    maintaining the nested 'conversations' structure.
    
    Args:
        input_directory (str): Path to the directory containing JSON files
        output_path (str): Full path (including filename) for the output merged JSON file
    """
    # Initialize the merged structure
    merged_data = {
        "conversations": []
    }
    
    # Convert string paths to Path objects
    directory = Path(input_directory)
    output = Path(output_path)
    
    # Create output directory if it doesn't exist
    output.parent.mkdir(parents=True, exist_ok=True)
    
    # Iterate through all JSON files in the directory
    for file_path in directory.glob("*.json"):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
                
                # Check if the file has the expected structure
                if "conversations" in data:
                    # Extend the conversations list with the new data
                    merged_data["conversations"].extend(data["conversations"])
                else:
                    print(f"Warning: File {file_path} does not have the expected structure")
                    
        except json.JSONDecodeError:
            print(f"Error: Could not parse JSON from {file_path}")
        except Exception as e:
            print(f"Error processing {file_path}: {str(e)}")
    
    # Write the merged data to the specified output path
    try:
        with open(output, 'w', encoding='utf-8') as f:
            json.dump(merged_data, f, indent=2, ensure_ascii=False)
        print(f"Successfully created merged file at: {output}")
    except Exception as e:
        print(f"Error writing merged file: {str(e)}")

In [16]:
merge_json_files('Dataset/Processed/Shareholder Letters/', 'Dataset/Processed/Shareholder Letters/Letters.json')

Successfully created merged file at: Dataset\Processed\Shareholder Letters\Letters.json


In [17]:
os.makedirs('Dataset/Processed/Ground Truth', exist_ok=True)
shutil.copy2('Dataset/Processed/Shareholder Letters/Letters.json', 'Dataset/Processed/Ground Truth/Letters.json')
shutil.copy2('Dataset/Processed/Lessons for Corporate America/Lessons-for-Corporate-America.json', 'Dataset/Processed/Ground Truth/Lessons.json')
shutil.copy2('Dataset/Processed/Meeting Transcripts/Berkshire Meeting Transcripts - 1994 - 2022.json', 'Dataset/Processed/Ground Truth/Transcripts.json')

'Dataset/Processed/Ground Truth/Transcripts.json'

In [18]:
merge_json_files('Dataset/Processed/Ground Truth/', 'Dataset/Processed/Ground Truth/dataset_combined.json')

Successfully created merged file at: Dataset\Processed\Ground Truth\dataset_combined.json
