In [57]:
import os
import json
from typing import List, Dict, Optional
import spacy
import re
import fitz
from openai import OpenAI, Client
from anthropic import Anthropic
from dataclasses import dataclass
from pathlib import Path
import shutil
from enum import Enum

In [58]:
client = Anthropic(api_key="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")

In [67]:
#To swap to vllm:
#  1.  "claude-3-5-sonnet-latest" --> "/model"
#  2.  .content[0].text --> choices[0].message.content 
#  3.  client.messages --> client.chat.completions

class SplittingStrategy(Enum):
    NUMBERED_SECTIONS = "numbered_sections"
    SENTENCE_OVERLAP = "sentence_overlap"

@dataclass
class ChunkConfig:
    """Configuration for text chunking and processing."""
    max_chunk_size: int = 1600
    min_chunk_size: int = 500  # Only used for sentence overlap strategy
    overlap_sentences: int = 2  # Only used for sentence overlap strategy
    strategy: SplittingStrategy = SplittingStrategy.NUMBERED_SECTIONS

class DocumentProcessor:
    def __init__(self, client: Anthropic, config: ChunkConfig):
        """Initialize the document processor."""
        self.nlp = spacy.load('en_core_web_sm', disable=['tagger', 'ner'])
        self.nlp.max_length = 10000000
        self.client = client
        self.config = config
        
    def preprocess_text(self, text: str) -> str:
        """Clean and normalize text while preserving important newlines."""
        lines = text.split('\n')
        lines = [' '.join(line.split()) for line in lines if line.strip()]
        return '\n'.join(lines)
        
    def process_pdf(self, pdf_path: str) -> List[str]:
        """Extract text from PDF and split into chunks."""
        text = ""
        with fitz.open(pdf_path) as doc:
            for page in doc:
                text += page.get_text() + "\n"
        
        clean_text = self.preprocess_text(text)
        return self._chunk_text(clean_text)

    def process_txt(self, txt_path: str) -> List[str]:
        """Read text file and split into chunks."""
        try:
            with open(txt_path, 'r', encoding='utf-8') as file:
                text = file.read()
            clean_text = self.preprocess_text(text)
            return self._chunk_text(clean_text)
        except Exception as e:
            print(f"Error processing text file {txt_path}: {str(e)}")
            return []

    def _split_by_sentences(self, text: str) -> List[str]:
        """Split text using sentence overlap strategy."""
        doc = self.nlp(text)
        sentences = [Sentence(text=sent.text.strip(), length=len(sent.text.strip())) 
                    for sent in doc.sents if sent.text.strip()]
        
        chunks = []
        current_chunk = []
        current_length = 0
        
        for sentence in sentences:
            current_chunk.append(sentence)
            current_length += sentence.length
            
            if current_length >= self.config.max_chunk_size and len(current_chunk) > self.config.overlap_sentences:
                if current_length >= self.config.min_chunk_size:
                    chunks.append(' '.join(s.text for s in current_chunk))
                    overlap_sentences = current_chunk[-self.config.overlap_sentences:]
                    current_chunk = overlap_sentences.copy()
                    current_length = sum(s.length for s in current_chunk)

        if current_length >= self.config.min_chunk_size:
            chunks.append(' '.join(s.text for s in current_chunk))
            
        return chunks

    def _split_by_numbered_sections(self, text: str) -> List[str]:
        """Split text based on numbered sections."""
        lines = text.split('\n')
        initial_chunks = []
        current_chunk = []
        
        for line in lines:
            if re.match(r'^\d+\.', line.strip()):
                if current_chunk:
                    initial_chunks.append('\n'.join(current_chunk))
                current_chunk = [line]
            else:
                current_chunk.append(line)
        
        if current_chunk:
            initial_chunks.append('\n'.join(current_chunk))
        
        # Handle chunks that are too large
        final_chunks = []
        for chunk in initial_chunks:
            if len(chunk) > self.config.max_chunk_size:
                doc = self.nlp(chunk)
                sentences = list(doc.sents)
                mid_point = len(sentences) // 2
                first_half = ' '.join(sent.text.strip() for sent in sentences[:mid_point])
                second_half = ' '.join(sent.text.strip() for sent in sentences[mid_point:])
                final_chunks.extend([first_half, second_half])
            else:
                final_chunks.append(chunk)
                
        return final_chunks

    def _chunk_text(self, text: str) -> List[str]:
        """Split text using the configured strategy."""
        if self.config.strategy == SplittingStrategy.NUMBERED_SECTIONS:
            return self._split_by_numbered_sections(text)
        else:
            return self._split_by_sentences(text)

    def generate_conversation(self, chunk: str) -> Optional[Dict]:
        """Generate conversation from chunk with validation."""
        try:
            validation_prompt = f"""Analyze this text and determine if it contains meaningful Warren Buffett insights, commentary, or narrative content.

Approve the text only if:
- It discusses business philosophy or investment thinking that applies across industries and time.
- It provides views on markets, financial practices, or economic principles that are broadly applicable.
- Buffett shares personal reflections or general lessons learned that are useful beyond a single event.

Reject the text if:
- It primarily describes a specific investment, acquisition, deal, or financial transaction.
- It focuses on a single company's business decision without a clearly stated general principle.
- It discusses short-term market conditions, quarterly earnings, or economic events without broader insights.
- It contains only financial data, figures, or statistics without meaningful explanation.

Text: {chunk}

Return ONLY "yes" if the text contains meaningful, wide-scope content, or "no" otherwise. I WANT A SINGLE YES or NO!!"""
            
            validation_response = self.client.messages.create(
                model="claude-3-5-sonnet-latest",
                messages=[{"role": "user", "content": validation_prompt}],
                max_tokens=100,
                temperature=0
            )
            
            if validation_response.content[0].text.strip().lower() != "yes":
                return None

            conversation_prompt = f"""Below is a text excerpt from me (Warren Buffett). Generate 2 questions about the key themes in this content. Then, provide answers using only my exact words and ideas from this text, but restructure them into my characteristic Q&A style like at the annual meetings. Maintain my plain-spoken voice while elaborating on the concepts present in the text.

Text: {chunk}

Guidelines:
- Questions should focus on the main principles or ideas explicitly discussed in the text
- Answers should:
  * Start with the core information from the text
  * Elaborate on these specific points in my conversational style
  * Use any analogies or examples that appear in the text
  * Maintain my natural speaking rhythm and tone
  * Focus on clarity and directness
- Write answers in first person
- Aim for detailed responses (200+ words) while staying true to the source material

Return as a properly formatted JSON string without any control characters or extra whitespace in ShareGPT format:
{{"conversations": [
  [
    {{"from": "human", "value": "question here"}},
    {{"from": "gpt", "value": "answer here"}}
  ],
  [
    {{"from": "human", "value": "second question"}},
    {{"from": "gpt", "value": "second answer"}}
  ]
]}}

Important: Ensure the response is valid JSON with no control characters, and all whitespace in values is single spaces only."""

            conversation_response = self.client.messages.create(
                model="claude-3-5-sonnet-latest",
                messages=[{"role": "user", "content": conversation_prompt}],
                max_tokens=2000,
                temperature=0.4
            )
            
            response_text = conversation_response.content[0].text.strip()
            response_text = response_text.replace('```json', '').replace('```', '').strip()
            
            try:
                conversation_data = json.loads(response_text)
                return conversation_data
            except json.JSONDecodeError as e:
                print(f"JSON parsing error: {str(e)}")
                print(f"Response text: {response_text[:200]}...")
                return None
                
        except Exception as e:
            print(f"Error processing chunk: {chunk[:100]}...")
            print(f"Error details: {str(e)}")
            return None

In [69]:
config = ChunkConfig(
    max_chunk_size=1600,
    min_chunk_size=500,
    overlap_sentences=2,
    strategy=SplittingStrategy.SENTENCE_OVERLAP
)

In [70]:
# Test with a single PDF file
processor = DocumentProcessor(client, config)
test_pdf_path = "Dataset/Unprocessed/Lessons for Corporate America/Lessons-for-Corporate-America.pdf"
chunks = processor.process_pdf(test_pdf_path)
print(f"Generated {len(chunks)} chunks from the PDF")

Generated 378 chunks from the PDF


In [None]:
if chunks:
    conversation = processor.generate_conversation(chunks[23])
    print("Sample conversation:")
    print(json.dumps(conversation, indent=2))

In [53]:
# Process entire directory
input_directory = "Dataset/Unprocessed/Shareholder Letters/"
output_directory = "Dataset/Processed/Shareholder Letters/"
process_directory(input_directory, output_directory, client, config)

Processing Dataset/Unprocessed/Shareholder Letters/1977.txt
Successfully saved conversations to Dataset/Processed/Shareholder Letters/1977.json
Processing Dataset/Unprocessed/Shareholder Letters/1978.txt
Successfully saved conversations to Dataset/Processed/Shareholder Letters/1978.json
Processing Dataset/Unprocessed/Shareholder Letters/1979.txt
Successfully saved conversations to Dataset/Processed/Shareholder Letters/1979.json
Processing Dataset/Unprocessed/Shareholder Letters/1980.txt
Successfully saved conversations to Dataset/Processed/Shareholder Letters/1980.json
Processing Dataset/Unprocessed/Shareholder Letters/1981.txt
Successfully saved conversations to Dataset/Processed/Shareholder Letters/1981.json
Processing Dataset/Unprocessed/Shareholder Letters/1982.txt
Successfully saved conversations to Dataset/Processed/Shareholder Letters/1982.json
Processing Dataset/Unprocessed/Shareholder Letters/1983.txt
Successfully saved conversations to Dataset/Processed/Shareholder Letters/19

In [54]:
input_directory = "Dataset/Unprocessed/Lessons for Corporate America/"
output_directory = "Dataset/Processed/Lessons for Corporate America/"
process_directory(input_directory, output_directory, client, config)

Processing Dataset/Unprocessed/Lessons for Corporate America/Lessons-for-Corporate-America.pdf
Successfully saved conversations to Dataset/Processed/Lessons for Corporate America/Lessons-for-Corporate-America.json


In [72]:
config = ChunkConfig(
    max_chunk_size=1600,
    strategy=SplittingStrategy.NUMBERED_SECTIONS
)

In [73]:
# Test with a single PDF file
processor = DocumentProcessor(client, config)
test_pdf_path = "Dataset/Unprocessed/Meeting Transcripts/Berkshire Meeting Transcripts - 1994 - 2022.pdf"
chunks = processor.process_pdf(test_pdf_path)
print(f"Generated {len(chunks)} chunks from the PDF")

Generated 3796 chunks from the PDF


In [56]:
# Process entire directory
input_directory = "Dataset/Unprocessed/Meeting Transcripts/"
output_directory = "Dataset/Processed/Meeting Transcripts/"
process_directory(input_directory, output_directory, client, config)

Processing Dataset/Unprocessed/Meeting Transcripts/Berkshire Meeting Transcripts - 1994 - 2022.pdf
Error processing chunk: 18. Unrealistic investment expectations for pension funds
WARREN BUFFETT: Zone 8.
AUDIENCE MEMBER: M...
Error details: Error code: 529 - {'type': 'error', 'error': {'type': 'overloaded_error', 'message': 'Overloaded'}}
Error processing chunk: 9. Buffett (D) and Munger (R) both endorse Social Security
WARREN BUFFETT: Number 3. (Laughter)
AUDI...
Error details: Error code: 529 - {'type': 'error', 'error': {'type': 'overloaded_error', 'message': 'Overloaded'}}
Error processing chunk: And I think you’ve seen that over the last two years and we’re seeing it month by month. I would say...
Error details: Error code: 529 - {'type': 'error', 'error': {'type': 'overloaded_error', 'message': 'Overloaded'}}
Error processing chunk: 11. Why See’s Candies does well in inflationary times
WARREN BUFFETT: With those modest statements, ...
Error details: Error code: 529 - {'type': 'e

In [83]:
def merge_json_files(input_directory, output_path):
    """
    Merge all JSON files in the specified directory into a single JSON file
    maintaining the nested 'conversations' structure.
    
    Args:
        input_directory (str): Path to the directory containing JSON files
        output_path (str): Full path (including filename) for the output merged JSON file
    """
    # Initialize the merged structure
    merged_data = {
        "conversations": []
    }
    
    # Convert string paths to Path objects
    directory = Path(input_directory)
    output = Path(output_path)
    
    # Create output directory if it doesn't exist
    output.parent.mkdir(parents=True, exist_ok=True)
    
    # Iterate through all JSON files in the directory
    for file_path in directory.glob("*.json"):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
                
                # Check if the file has the expected structure
                if "conversations" in data:
                    # Extend the conversations list with the new data
                    merged_data["conversations"].extend(data["conversations"])
                else:
                    print(f"Warning: File {file_path} does not have the expected structure")
                    
        except json.JSONDecodeError:
            print(f"Error: Could not parse JSON from {file_path}")
        except Exception as e:
            print(f"Error processing {file_path}: {str(e)}")
    
    # Write the merged data to the specified output path
    try:
        with open(output, 'w', encoding='utf-8') as f:
            json.dump(merged_data, f, indent=2, ensure_ascii=False)
        print(f"Successfully created merged file at: {output}")
    except Exception as e:
        print(f"Error writing merged file: {str(e)}")

In [86]:
merge_json_files('Dataset/Processed/Shareholder Letters/', 'Dataset/Processed/Shareholder Letters/Letters.json')

Successfully created merged file at: Dataset/Processed/Shareholder Letters/Letters.json


In [90]:
os.makedirs('Dataset/Processed/Ground Truth', exist_ok=True)
shutil.copy2('Dataset/Processed/Shareholder Letters/Letters.json', 'Dataset/Processed/Ground Truth/Letters.json')
shutil.copy2('Dataset/Processed/Lessons for Corporate America/Lessons-for-Corporate-America.json', 'Dataset/Processed/Ground Truth/Lessons.json')
shutil.copy2('Dataset/Processed/Meeting Transcripts/Berkshire Meeting Transcripts - 1994 - 2022.json', 'Dataset/Processed/Ground Truth/Transcripts.json')

'Dataset/Processed/Ground Truth/Transcripts.json'

In [91]:
merge_json_files('Dataset/Processed/Ground Truth/', 'Dataset/Processed/Ground Truth/dataset_combined.json')

Successfully created merged file at: Dataset/Processed/Ground Truth/dataset_combined.json
