# RAG System Workshop: Earnings Call Analysis

Welcome to the RAG (Retrieval-Augmented Generation) workshop! In this notebook, you'll build a simple RAG system to analyze earnings call transcripts.

## What you'll learn:
1. **Data Ingestion**: Load and process earnings call transcripts
2. **Text Chunking**: Split documents into manageable pieces
3. **Embeddings**: Convert text to vectors
4. **Vector Storage**: Store embeddings in Pinecone
5. **Retrieval**: Find relevant information
6. **Generation**: Create answers using retrieved context

## Part 1: Setup and Configuration

First, let's install the required packages and set up our configuration.

In [15]:
# Install required packages
!pip install pinecone 
!pip install openai
!pip install python-dotenv


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [18]:
import os
import glob
import hashlib
import time
from typing import List, Dict, Any

# External libraries
from pinecone import Pinecone, ServerlessSpec
import openai
from dotenv import load_dotenv

#Load environment variables from .env file
load_dotenv()

True

In [None]:
# ============================================
# TODO: Fill in your API keys and configuration
# ============================================

# Option 1: Set directly in code (for workshop)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")  # TODO: Add your OpenAI API key
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")  # TODO: Add your Pinecone API key

# Option 2: Use environment variables (recommended for production)
# OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
#PINECONE_API_KEY= os.getenv("PINECONE_API_KEY") 

# Configuration parameters
EMBEDDING_MODEL = "text-embedding-3-small"  # OpenAI embedding model
EMBEDDING_DIMENSION = 1536  # Dimension for text-embedding-3-small

# TODO: Adjust these chunking parameters
CHUNK_SIZE = 500  # Maximum number of characters per chunk
CHUNK_OVERLAP = 50  # Number of overlapping characters between chunks

# Pinecone configuration
PINECONE_INDEX_NAME = "earnings-calls"  # TODO: Choose your index name
PINECONE_ENVIRONMENT = "us-east-1"  # TODO: Update based on your Pinecone region

# Initialize OpenAI client
openai_client = openai.OpenAI(api_key=OPENAI_API_KEY)

print("✅ Configuration complete!")

✅ Configuration complete!


## Part 2: Data Ingestion Pipeline

### Step 1: Load Documents

In [4]:
def load_earnings_call(filepath: str) -> Dict[str, Any]:
    """
    Load a single earnings call transcript.
    
    Args:
        filepath: Path to the transcript file
    
    Returns:
        Dictionary with 'content' and 'metadata'
    """
    # Read the file content
    with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
        content = f.read()
    
    # Extract metadata from filename (format: YYYY-Mon-DD-TICKER.txt)
    filename = os.path.basename(filepath)
    parts = filename.replace('.txt', '').split('-')
    
    metadata = {
        'filename': filename,
        'filepath': filepath,
        'ticker': parts[3] if len(parts) >= 4 else 'UNKNOWN',
        'date': f"{parts[0]}-{parts[1]}-{parts[2]}" if len(parts) >= 3 else 'UNKNOWN',
        'year': parts[0] if len(parts) >= 1 else 'UNKNOWN'
    }
    
    return {
        'content': content,
        'metadata': metadata
    }

# Test loading a single document
test_file = "earnings-call-transcripts/Transcripts/AAPL/2020-Jan-28-AAPL.txt"
if os.path.exists(test_file):
    doc = load_earnings_call(test_file)
    print(f"Loaded document: {doc['metadata']['filename']}")
    print(f"Content preview: {doc['content'][:200]}...")
    print(f"Metadata: {doc['metadata']}")

Loaded document: 2020-Jan-28-AAPL.txt
Content preview: 

Thomson Reuters StreetEvents Event Brief
E D I T E D   V E R S I O N

Q1 2020 Apple Inc Earnings Call
JANUARY 28, 2020 / 10:00PM GMT

Metadata: {'filename': '2020-Jan-28-AAPL.txt', 'filepath': 'earnings-call-transcripts/Transcripts/AAPL/2020-Jan-28-AAPL.txt', 'ticker': 'AAPL', 'date': '2020-Jan-28', 'year': '2020'}


In [5]:
def load_all_documents(base_path: str = "earnings-call-transcripts/Transcripts", 
                      tickers: List[str] = None,
                      limit: int = None) -> List[Dict[str, Any]]:
    """
    Load multiple earnings call transcripts.
    
    Args:
        base_path: Base directory containing transcripts
        tickers: List of tickers to load (None = load all)
        limit: Maximum number of documents to load
    
    Returns:
        List of document dictionaries
    """
    documents = []
    if tickers:
        # Load specific tickers
        for ticker in tickers:
            pattern = os.path.join(base_path, ticker, "*.txt")
            files = glob.glob(pattern)
            
            for filepath in files[:limit] if limit else files:
                doc = load_earnings_call(filepath)
                documents.append(doc)
    else:
        # Load all documents
        pattern = os.path.join(base_path, "*", "*.txt")
        files = glob.glob(pattern)
        
        for filepath in files[:limit] if limit else files:
            doc = load_earnings_call(filepath)
            documents.append(doc)
    
    print(f"Loaded {len(documents)} documents")
    return documents

# Load sample documents for testing
# TODO: Adjust the tickers and limit as needed
documents = load_all_documents(tickers=['AAPL', 'MSFT'], limit=5)
print(f"\nSample document metadata:")
for doc in documents[:3]:
    print(f"  - {doc['metadata']['ticker']}: {doc['metadata']['date']}")

Loaded 10 documents

Sample document metadata:
  - AAPL: 2018-May-01
  - AAPL: 2019-Oct-30
  - AAPL: 2016-Jan-26


### Step 2: Text Chunking

Large documents need to be split into smaller chunks for effective retrieval.

In [6]:
def simple_text_splitter(text: str, 
                        chunk_size: int = CHUNK_SIZE, 
                        chunk_overlap: int = CHUNK_OVERLAP) -> List[str]:
    """
    Split text into chunks with overlap.
    
    Args:
        text: Text to split
        chunk_size: Maximum characters per chunk
        chunk_overlap: Overlapping characters between chunks
    
    Returns:
        List of text chunks
    """
    chunks = []
    
    # TODO: Implement the chunking logic
    # Hint: Use a sliding window approach
    
    # Simple implementation
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        
        # Add chunk if it's not empty
        if chunk.strip():
            chunks.append(chunk)
        
        # Move start position (with overlap)
        start = end - chunk_overlap
        
        # Break if we've reached the end
        if end >= len(text):
            break
    
    return chunks

# Test the chunker
sample_text = "This is a sample text. " * 50  # Create a long text
chunks = simple_text_splitter(sample_text)
print(f"Created {len(chunks)} chunks from sample text")
print(f"First chunk: {chunks[0][:100]}...")
print(f"Chunk size: {len(chunks[0])} characters")

Created 3 chunks from sample text
First chunk: This is a sample text. This is a sample text. This is a sample text. This is a sample text. This is ...
Chunk size: 500 characters


In [None]:
def chunk_documents(documents: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Split all documents into chunks.
    
    Args:
        documents: List of document dictionaries
    
    Returns:
        List of chunk dictionaries with text and metadata
    """
    all_chunks = []
    
    for doc in documents:
        # Split document into chunks
        text_chunks = simple_text_splitter(doc['content'])
        
        # Create chunk objects with metadata
        for i, chunk_text in enumerate(text_chunks):
            chunk_id = hashlib.md5(f"{doc['metadata']['filename']}_{i}".encode()).hexdigest() # Unique ID, deterministic
            
            chunk = {
                'id': chunk_id,
                'text': chunk_text,
                'metadata': {
                    **doc['metadata'],  # Include all document metadata
                    'chunk_index': i,
                    'total_chunks': len(text_chunks)
                }
            }
            all_chunks.append(chunk)
    
    return all_chunks

# Chunk all documents
chunks = chunk_documents(documents)
print(f"Created {len(chunks)} chunks from {len(documents)} documents")
print(f"\nSample chunk:")
print(f"  ID: {chunks[0]['id']}")
print(f"  Text preview: {chunks[0]['text'][:100]}...")
print(f"  Metadata: {chunks[0]['metadata']}")

Created 1444 chunks from 10 documents

Sample chunk:
  ID: 81e54cc8b9bfede8ca5821bb2badac3d
  Text preview: 

Thomson Reuters StreetEvents Event Brief
E D I T E D   V E R S I O N

Q2 2018 Apple Inc Earnings C...
  Metadata: {'filename': '2018-May-01-AAPL.txt', 'filepath': 'earnings-call-transcripts/Transcripts/AAPL/2018-May-01-AAPL.txt', 'ticker': 'AAPL', 'date': '2018-May-01', 'year': '2018', 'chunk_index': 0, 'total_chunks': 119}


### Step 3: Generate Embeddings

Convert text chunks into vector embeddings using OpenAI's embedding model.

In [8]:
def get_embedding(text: str, model: str = EMBEDDING_MODEL) -> List[float]:
    """
    Get embedding for a single text using OpenAI API.
    
    Args:
        text: Text to embed
        model: OpenAI embedding model to use
    
    Returns:
        List of floats representing the embedding
    """
    # Clean the text, more steps could be added here
    text = text.replace("\n", " ").strip()
    
    response = openai_client.embeddings.create(
        input=text,
        model=model
    )
    
    return response.data[0].embedding

# Test embedding generation
test_text = "This is a test sentence for embedding."
test_embedding = get_embedding(test_text)
print(f"Generated embedding with dimension: {len(test_embedding)}")
print(f"First 5 values: {test_embedding[:5]}")

Generated embedding with dimension: 1536
First 5 values: [0.020807893946766853, 0.0072983973659574986, 0.00849229283630848, -0.016416063532233238, -0.012294281274080276]


In [9]:
def embed_chunks(chunks: List[Dict[str, Any]], 
                batch_size: int = 10) -> List[Dict[str, Any]]:
    """
    Generate embeddings for all chunks.
    
    Args:
        chunks: List of chunk dictionaries
        batch_size: Number of chunks to process at once
    
    Returns:
        List of chunks with embeddings added
    """
    embedded_chunks = []
    
    print(f"Generating embeddings for {len(chunks)} chunks...")
    
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i+batch_size]
        
        for chunk in batch:
            
            try:
                embedding = get_embedding(chunk['text'])
                chunk['embedding'] = embedding
                embedded_chunks.append(chunk)
            except Exception as e:
                print(f"Error embedding chunk {chunk['id']}: {e}")
                continue
        
        # Progress update
        print(f"  Processed {min(i+batch_size, len(chunks))}/{len(chunks)} chunks")
        
        # Rate limiting (to avoid hitting API limits)
        time.sleep(0.5)
    
    print(f"✅ Generated embeddings for {len(embedded_chunks)} chunks")
    return embedded_chunks

# Generate embeddings for a subset of chunks
# TODO: Adjust the number of chunks to embed based on your API limits
chunks_to_embed = chunks[:10]  # Start with just 10 chunks for testing
embedded_chunks = embed_chunks(chunks_to_embed)
print(f"\nFirst embedded chunk has {len(embedded_chunks[0]['embedding'])} dimensions")

Generating embeddings for 10 chunks...
  Processed 10/10 chunks
✅ Generated embeddings for 10 chunks

First embedded chunk has 1536 dimensions


### Step 4: Store in Pinecone

Initialize Pinecone and store the embedded chunks.

In [29]:
def initialize_pinecone():
    """
    Initialize Pinecone client and create index if needed.
    
    Returns:
        Pinecone index object
    """
    # Initialize Pinecone
    pc = Pinecone(api_key=PINECONE_API_KEY)
    
    # Check if index exists
    existing_indexes = [index.name for index in pc.list_indexes()]
    
    if PINECONE_INDEX_NAME not in existing_indexes:
        # TODO: Create the index
        # Hint: Use pc.create_index() with appropriate parameters
        
        print(f"Creating index '{PINECONE_INDEX_NAME}'...")
        pc.create_index(
            name=PINECONE_INDEX_NAME,
            dimension=EMBEDDING_DIMENSION,
            metric='cosine',  # Can also use 'euclidean' or 'dotproduct'
            spec=ServerlessSpec(
                cloud='aws',
                region=PINECONE_ENVIRONMENT
            )
        )
        print(f"✅ Index created")
    else:
        print(f"✅ Using existing index '{PINECONE_INDEX_NAME}'")
    
    # Get the index
    index = pc.Index(PINECONE_INDEX_NAME)
    
    # Wait for index to be ready
    time.sleep(2)
    
    # Print index stats
    stats = index.describe_index_stats()
    print(f"Index stats: {stats}")
    
    return index

# Initialize Pinecone
index = initialize_pinecone()

Creating index 'earnings-calls'...
✅ Index created


  from .autonotebook import tqdm as notebook_tqdm


Index stats: {'dimension': 1536,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {},
 'total_vector_count': 0,
 'vector_type': 'dense'}


In [None]:
def store_in_pinecone(index, embedded_chunks: List[Dict[str, Any]], 
                     batch_size: int = 100):
    """
    Store embedded chunks in Pinecone.
    
    Args:
        index: Pinecone index object
        embedded_chunks: List of chunks with embeddings
        batch_size: Number of vectors to upsert at once
    """
    print(f"Storing {len(embedded_chunks)} chunks in Pinecone...")
    
    for i in range(0, len(embedded_chunks), batch_size):
        batch = embedded_chunks[i:i+batch_size]
        
        # Prepare vectors for upsert
        vectors = []
        for chunk in batch:
            vector = {
                'id': chunk['id'],
                'values': chunk['embedding'],
                'metadata': {
                    'text': chunk['text'], 
                    'ticker': chunk['metadata']['ticker'],
                    'date': chunk['metadata']['date'],
                    'filename': chunk['metadata']['filename'],
                    'chunk_index': chunk['metadata']['chunk_index']
                }
            }
            vectors.append(vector)
        
        # Upsert to Pinecone
        index.upsert(vectors=vectors)
        
        print(f"  Stored {min(i+batch_size, len(embedded_chunks))}/{len(embedded_chunks)} chunks")
    
    print(f"✅ Successfully stored all chunks in Pinecone")
    
    # Print updated stats
    time.sleep(2)  # Wait for index to update
    stats = index.describe_index_stats()
    print(f"Updated index stats: {stats}")

# Store the embedded chunks
store_in_pinecone(index, embedded_chunks)

Storing 10 chunks in Pinecone...
  Stored 10/10 chunks
✅ Successfully stored all chunks in Pinecone
Updated index stats: {'dimension': 1536,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {},
 'total_vector_count': 0,
 'vector_type': 'dense'}


## Part 3: Complete Ingestion Pipeline

Now let's put it all together in a single pipeline function.

In [None]:
def ingest_documents(tickers: List[str] = None, 
                    max_documents: int = None):
    """
    Complete ingestion pipeline: load, chunk, embed, and store documents.
    
    Args:
        tickers: List of tickers to ingest (None = all)
        max_documents: Maximum number of documents to process
    """
    print("="*50)
    print("Starting Document Ingestion Pipeline")
    print("="*50)
    
    # Step 1: Load documents
    print("\n📄 Step 1: Loading documents...")
    documents = load_all_documents(tickers=tickers, limit=max_documents)
    
    if not documents:
        print("No documents found!")
        return
    
    # Step 2: Chunk documents
    print("\n✂️ Step 2: Chunking documents...")
    chunks = chunk_documents(documents)
    print(f"Created {len(chunks)} chunks")
    
    # Step 3: Generate embeddings
    print("\n🔢 Step 3: Generating embeddings...")
    embedded_chunks = embed_chunks(chunks)
    
    # Step 4: Initialize Pinecone
    print("\n🔗 Step 4: Initializing Pinecone...")
    index = initialize_pinecone()
    
    # Step 5: Store in Pinecone
    print("\n💾 Step 5: Storing in Pinecone...")
    store_in_pinecone(index, embedded_chunks)
    
    print("\n" + "="*50)
    print("✅ Ingestion Pipeline Complete!")
    print("="*50)
    
    return index

# TODO: Run the complete pipeline
# Start with a small number of documents for testing
# index = ingest_documents(tickers=['AAPL'], max_documents=2)

## Part 4: Testing the Ingestion

Let's test our ingestion pipeline with a simple query.

In [None]:
def test_retrieval(query: str, top_k: int = 3):
    """
    Test retrieval from Pinecone.
    
    Args:
        query: Query text
        top_k: Number of results to return
    """
    print(f"\n🔍 Query: {query}")
    print("-" * 50)
    
    # Generate embedding for query
    query_embedding = get_embedding(query)
    
    # Initialize Pinecone and get index
    pc = Pinecone(api_key=PINECONE_API_KEY)
    index = pc.Index(PINECONE_INDEX_NAME)
    
    # Query Pinecone
    results = index.query(
        vector=query_embedding,
        top_k=top_k,
        include_metadata=True
    )
    
    # Display results
    print(f"\n📊 Found {len(results['matches'])} relevant chunks:\n")
    
    for i, match in enumerate(results['matches'], 1):
        print(f"Result {i}:")
        print(f"  Score: {match['score']:.4f}")
        print(f"  Ticker: {match['metadata']['ticker']}")
        print(f"  Date: {match['metadata']['date']}")
        print(f"  Text preview: {match['metadata']['text'][:200]}...")
        print()

# Test queries
# TODO: Uncomment and run after ingesting documents
# test_retrieval("What is Apple's revenue growth?")
# test_retrieval("Tell me a#bout iPhone sales")


🔍 Query: What is Apple's revenue growth?
--------------------------------------------------

📊 Found 3 relevant chunks:

Result 1:
  Score: 0.6189
  Ticker: AAPL
  Date: 2018-May-01
  Text preview:   2. Earnings.
          2. Revenues:
               1. 2Q18, $61.1b.
                    1. Up 16% YoverY.
                    2. Sixth consecutive qtr. of accelerating revenue growth.
              ...

Result 2:
  Score: 0.6002
  Ticker: AAPL
  Date: 2018-May-01
  Text preview:  and Japan, revenue up more than 20%.
               4. iPhone's performance capped tremendous fiscal 1H, with $100b in iPhone revenue.
                    1. Up $12b over last year, setting new 1H re...

Result 3:
  Score: 0.5983
  Ticker: AAPL
  Date: 2018-May-01
  Text preview:           8. Had all-time record revenue from App Store, Apple Music, iCloud, Apple Pay and more.
          3. Across all services, paid subscription surpassed 270m; up over 100m from year ago and up ...



# Hybrid search

## 🎯 Workshop Exercises

Now it's your turn! Try these exercises:

### Exercise 1: Improve Chunking
- Modify the `simple_text_splitter` function to split on sentence boundaries
- Experiment with different chunk sizes and overlaps

### Exercise 2: Add More Metadata
- Extract additional metadata from the transcripts (e.g., speaker names, Q&A sections)
- Add this metadata to your chunks

### Exercise 3: Look into hybrid search 
- Create sparse vector for the chunks (BM25/TF-IDF), https://docs.pinecone.io/guides/search/hybrid-search
- Build both query reps the same way (dense embedding + sparse keywords), then send them together

## 📚 Next Steps

After completing the ingestion pipeline, you can:
1. Build the **Retrieval Pipeline** to query your data
2. Create the **Generation Pipeline** to answer questions
3. Combine everything into a complete RAG system