## Amplitude Analytics RAG System
 
This notebook implements a RAG (Retrieval Augmented Generation) system for Amplitude analytics using:
- ChromaDB for vector storage
- Claude AI for natural language processing
- Custom parsing for event documentation

In [1]:
# ## Setup
# First, let's install the required packages:
!pip install chromadb anthropic



In [2]:
import re
from typing import List, Dict, Optional
import chromadb
from chromadb.utils import embedding_functions
from anthropic import Anthropic
import os
import json
import time


### Event Documentation Parser

In [3]:
class EventDocParser:
    """Parser for event documentation in markdown format."""
    
    @staticmethod
    def parse_markdown_file(file_path: str) -> List[Dict]:
        """
        Parse event documentation from a markdown file.
        
        Args:
            file_path: Path to the markdown file
            
        Returns:
            List of dictionaries containing parsed event information
        """
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
        
        event_blocks = re.split(r'\nEvent:', content)
        event_blocks = [block for block in event_blocks if block.strip()]
        
        parsed_events = []
        for block in event_blocks:
            event_info = EventDocParser._parse_event_block(f"Event:{block}")
            if event_info:
                parsed_events.append(event_info)
                
        return parsed_events
    
    @staticmethod
    def _parse_event_block(block: str) -> Optional[Dict]:
        """Parse a single event block."""
        try:
            event_match = re.search(r'Event:\s*(.+?)(?:\n|$)', block)
            if not event_match:
                return None
            event_name = event_match.group(1).strip()
            
            desc_match = re.search(r'Description:\s*(.+?)(?:\n[A-Za-z]|$)', block)
            description = desc_match.group(1).strip() if desc_match else ""
            
            attributes = {}
            attr_section = re.search(r'Attributes:\n((?:  - .+\n?)+)', block)
            if attr_section:
                attr_lines = attr_section.group(1).split('\n')
                for line in attr_lines:
                    if line.strip().startswith('-'):
                        attr_match = re.search(r'-\s*(\w+)\s*\((\w+)\):\s*(.+)', line)
                        if attr_match:
                            name, attr_type, desc = attr_match.groups()
                            attributes[name] = {
                                "type": attr_type.strip(),
                                "description": desc.strip()
                            }
            
            return {
                "event_name": event_name,
                "description": description,
                "attributes": attributes
            }
            
        except Exception as e:
            print(f"Error parsing event block: {e}")
            return None

## Main RAG System

In [4]:
class AmplitudeAnalyticsRAG:
    def __init__(self, 
                 collection_name: str = "amplitude_docs",
                 anthropic_api_key: Optional[str] = None):
        """Initialize the RAG system."""
        self.client = chromadb.Client()
        self.embedding_fn = embedding_functions.DefaultEmbeddingFunction()
        self.debugging = False
        
        try:
            self.collection = self.client.get_collection(
                name=collection_name,
                embedding_function=self.embedding_fn
            )
            print(f"Retrieved existing collection: {collection_name}")
        except:
            self.collection = self.client.create_collection(
                name=collection_name,
                embedding_function=self.embedding_fn
            )
            print(f"Created new collection: {collection_name}")

        self.anthropic = Anthropic(
            api_key=anthropic_api_key or os.getenv("ANTHROPIC_API_KEY")
        )

    def process_markdown_documentation(self, markdown_file_path: str) -> int:
        """Process event documentation with enhanced attribute storage."""
        try:
            events = EventDocParser.parse_markdown_file(markdown_file_path)
        except Exception as e:
            print(f"Error parsing markdown file: {e}")
            return 0
        if self.debugging:
            print(f"Found {len(events)} events to process")
    
        for i, event in enumerate(events):
            # Create a more comprehensive and searchable document content
            doc_text = f"""
            Event Documentation:
            Event Name: {event['event_name']}
            
            Description: {event['event_name']} - {event['description']}
            
            This event can be used for analyzing:
            - Video tracking and analytics
            - User engagement metrics
            - Content performance monitoring
            
            Available Attributes for Analysis:
            """
            
            # Add attributes with more context
            for attr_name, attr_info in event['attributes'].items():
                doc_text += f"\n- {attr_name} ({attr_info['type']}): {attr_info['description']}"
                # Add usage hints for common attributes
                if 'video' in attr_name.lower():
                    doc_text += "\n  Use this attribute for video-specific analysis and filtering."
                if 'duration' in attr_name.lower():
                    doc_text += "\n  Important for time-based analysis and completion rates."
                if 'type' in attr_name.lower():
                    doc_text += "\n  Useful for segmentation and comparative analysis."
            
            # Add common use cases section
            doc_text += """
            
            Common Analytics Use Cases:
            - Tracking video completion rates
            - Analyzing user engagement patterns
            - Measuring content performance
            - Segmenting by video types
            - Monitoring viewer behavior
            """
            
            # Store in ChromaDB with enhanced metadata
            try:
                metadata = {
                    "event_name": event['event_name'],
                    "type": "event_documentation",
                    "attribute_count": len(event['attributes']),
                    "attributes": json.dumps({
                        name: {
                            "type": info["type"],
                            "description": info["description"]
                        }
                        for name, info in event['attributes'].items()
                    }),
                    "keywords": "video, completion, rates, analytics, tracking, metrics, engagement"
                }

                if self.debugging:
                    print(f"Storing event {i+1}/{len(events)}: {event['event_name']}")
                
                self.collection.add(
                    documents=[doc_text],
                    metadatas=[metadata],
                    ids=[f"doc_{i}"]
                )
            except Exception as e:
                print(f"Error storing event {event['event_name']}: {e}")
                continue
        
        return len(events)

    def get_amplitude_guidance(self, query: str) -> str:
        """Get enhanced Amplitude implementation guidance using Claude."""
        try:
            count = self.collection.count()
            if count == 0:
                return "No documentation has been loaded into the system yet. Please process your documentation first."

            if self.debugging:
                print(f"\nSearching collection with {count} documents...")
            
            # Query including all necessary fields
            results = self.collection.query(
                query_texts=[query],
                n_results=min(10, count),
                include=['metadatas', 'distances', 'documents'] 
            )
            
            if not results.get('documents') or not results['documents'][0]:
                if self.debugging:
                    print("No documents found in query results")
                return "No relevant documentation found for your query. Please try a different question."
    
            relevant_docs = results['documents'][0]
            relevant_metadata = results['metadatas'][0]
            distances = results['distances'][0]

            if self.debugging:
                print(f"\nFound {len(relevant_docs)} relevant documents")
            
            # Create context with ranked results
            context_parts = ["# Available Event Documentation\n"]
            
            for doc, meta, distance in zip(relevant_docs, relevant_metadata, distances):
                similarity = 1 - distance  # Convert distance to similarity score
                context_parts.append(f"\nRelevance Score: {similarity:.4f}")
                context_parts.append(doc)
                
                if meta and 'attributes' in meta:
                    try:
                        attributes = json.loads(meta['attributes'])
                        context_parts.append("\nDetailed Attributes:")
                        for attr_name, attr_info in attributes.items():
                            context_parts.append(
                                f"- {attr_name} ({attr_info['type']}): {attr_info['description']}"
                            )
                    except json.JSONDecodeError:
                        print(f"Warning: Could not parse attributes JSON for document")
            
            context_parts.extend(["\n# Query", query])
            context = "\n\n".join(context_parts)

            if self.debugging:
                print(f"\nPrepared context with {len(relevant_docs)} documents")
    
            prompt = f"""You are an Amplitude analytics expert. Using the provided event documentation and detailed 
            attribute information, create comprehensive implementation steps for analyzing video metrics. 
            Focus specifically on the user's query while utilizing the available events and attributes.
    
            Include:
            1. The most appropriate chart type and why
            2. Exact events to use (from the provided documentation)
            3. Required filters and their configuration, using the specific attributes available
            4. How to set up grouping using available attributes
            5. Additional settings or considerations
            6. Preview of expected insights
            7. Any relevant attribute combinations that could enhance the analysis
    
            User Query: {query}
    
            Available Documentation and Attributes:
            {context}
    
            Please provide a detailed, step-by-step response that someone could follow to implement 
            this analysis in Amplitude. Include both technical steps and explanatory notes."""
    
            response = self.anthropic.messages.create(
                model="claude-3-opus-20240229",
                max_tokens=1000,
                messages=[{
                    "role": "user",
                    "content": prompt
                }]
            )
            
            return response.content
    
        except Exception as e:
            print(f"Debug - Full error: {str(e)}")
            return f"Error processing query: {str(e)}"

    def get_event_attributes(self, event_name: str) -> Dict:
        """
        Get detailed attribute information for a specific event.
        
        Args:
            event_name: Name of the event to look up (with or without 'Event:' prefix)
            
        Returns:
            Dictionary containing event details and attributes
        """
        try:
            # Clean up the event name by removing 'Event:' if present
            clean_event_name = event_name.replace('Event:', '').strip()

            print(f"Searching for event: {clean_event_name}")  # Debug info
            
            # Query collection
            results = self.collection.get(
                where={"event_name": clean_event_name},
                include=['metadatas']
            )
            
            if not results['metadatas']:
                # If not found, try alternative format
                results = self.collection.get(
                    where={"event_name": f"Event: {clean_event_name}"},
                    include=['metadatas']
                )
            
            if not results['metadatas']:
                return {
                    "error": f"Event not found: {clean_event_name}",
                    "searched_names": [clean_event_name, f"Event: {clean_event_name}"]
                }
                
            metadata = results['metadatas'][0]
            if 'attributes' in metadata:
                return {
                    "event_name": clean_event_name,
                    "attributes": json.loads(metadata['attributes'])
                }
            
            return {"error": "No attribute information found"}
                
        except Exception as e:
            return {"error": str(e)}

    def get_collection_stats(self) -> Dict:
        """Get enhanced statistics about the stored documentation."""
        try:
            count = self.collection.count()
            
            sample = self.collection.get(
                limit=5,
                include=['documents', 'metadatas']
            )
            
            stats = {
                "total_events": count,
                "sample_events": [
                    {
                        "event": meta['event_name'],
                        "attribute_count": meta.get('attribute_count', 0),
                        "attributes": json.loads(meta['attributes']) if 'attributes' in meta else {}
                    }
                    for meta in sample['metadatas']
                ]
            }
            
            return stats
            
        except Exception as e:
            print(f"Error getting collection stats: {e}")
            return {"error": str(e)}

    def debug_collection(self):
        """Debug method to check what's actually stored in the collection."""
        try:
            # Get count
            count = self.collection.count()
            print(f"\nTotal documents in collection: {count}")
            
            if count > 0:
                # Get all documents
                results = self.collection.get(
                    include=['documents', 'metadatas'],
                    limit=count
                )
                
                print("\nSample of stored documents:")
                for i, (doc, meta) in enumerate(zip(results['documents'], results['metadatas'])):
                    print(f"\n--- Document {i+1} ---")
                    print("Metadata:", json.dumps(meta, indent=2))
                    print("First 200 chars of document:", doc[:200])
                    if i >= 2:  # Show just first 3 documents
                        break
                        
                # Try a simple query
                test_query = "video"
                print(f"\nTesting simple query: '{test_query}'")
                query_results = self.collection.query(
                    query_texts=[test_query],
                    n_results=1
                )
                print("Query results:", json.dumps(query_results, indent=2))
                
        except Exception as e:
            print(f"Debug error: {e}")

In [5]:
def parse_text_blocks(text_blocks):
    for block in text_blocks:
        if hasattr(block, 'text'):
            text = block.text
            # Split text into steps based on the pattern "\n\n<number>."
            steps = re.split(r'\n\n(\d+)\.', text)
            
            if steps[0].strip():  # Print intro if any
                print("\nIntroduction:")
                print(steps[0].strip())

            for i in range(1, len(steps), 2):
                step_number = steps[i]
                step_text = steps[i+1].strip() if i+1 < len(steps) else ""
                print(f"\nStep {step_number}:")
                print(step_text)

In [6]:
def load_env_from_json(file_path):
    """Loads security keys from a JSON file and sets them as environment variables."""
    try:
        with open(file_path, 'r') as file:
            secrets = json.load(file)
            for key, value in secrets.items():
                os.environ[key] = value
                print(f"Loaded {key} into environment variables")  # Optional, for debugging
    except FileNotFoundError:
        print(f"Error: The file {file_path} was not found.")
    except json.JSONDecodeError:
        print(f"Error: Could not parse {file_path}. Ensure it is valid JSON.")

In [7]:
load_env_from_json('secrets.json')

Loaded ANTHROPIC_API_KEY into environment variables


In [8]:
rag = AmplitudeAnalyticsRAG()

Created new collection: amplitude_docs


In [9]:
documentation_path = "MT-DataSchema.md"  # Update this path
num_events = rag.process_markdown_documentation(documentation_path)
print(f"Processed {num_events} events")

Processed 41 events


In [10]:
stats = rag.get_collection_stats()
print("\nCollection Statistics:")
print(json.dumps(stats, indent=2))


Collection Statistics:
{
  "total_events": 41,
  "sample_events": [
    {
      "event": "Event: Video 75% Complete",
      "attribute_count": 10,
      "attributes": {
        "video_session_id": {
          "type": "text",
          "description": "Video session unique for each video"
        },
        "video_casted": {
          "type": "text",
          "description": "A boolean representing whether the video is currently casted"
        },
        "video_duration": {
          "type": "decimal",
          "description": "The duration of the video in seconds"
        },
        "video_title": {
          "type": "text",
          "description": "The title of the video"
        },
        "video_source": {
          "type": "text",
          "description": "The source of the video like \"kaltura\", \"webiny\", etc"
        },
        "video_id": {
          "type": "text",
          "description": "The ID of the video asset from the platform source"
        },
        "video_publi

In [11]:
event_name = "Video 75% Complete"  # Replace with an actual event name
event_info = rag.get_event_attributes(event_name)
print(f"\nAttributes for {event_name}:")
print(json.dumps(event_info, indent=2))

Searching for event: Video 75% Complete

Attributes for Video 75% Complete:
{
  "event_name": "Video 75% Complete",
  "attributes": {
    "video_session_id": {
      "type": "text",
      "description": "Video session unique for each video"
    },
    "video_casted": {
      "type": "text",
      "description": "A boolean representing whether the video is currently casted"
    },
    "video_duration": {
      "type": "decimal",
      "description": "The duration of the video in seconds"
    },
    "video_title": {
      "type": "text",
      "description": "The title of the video"
    },
    "video_source": {
      "type": "text",
      "description": "The source of the video like \"kaltura\", \"webiny\", etc"
    },
    "video_id": {
      "type": "text",
      "description": "The ID of the video asset from the platform source"
    },
    "video_publication_date": {
      "type": "text",
      "description": "The publication date for the video in ISO standard format."
    },
    "vide

In [12]:
start_time = time.time()
query = "How can I get the top 10 articles visited in the last 30 days?"
guidance = rag.get_amplitude_guidance(query)
parse_text_blocks(guidance)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time:.8f} seconds")


Introduction:
Based on the provided event documentation and attributes, the most relevant event for tracking article visits appears to be the "Viewed Page/Screen" event. This event contains attributes like content_title, content_type, and content_publication_date that can help identify the top articles.

Here are the step-by-step instructions to get the top 10 articles visited in the last 30 days using Amplitude:

Step 1:
Chart Type:
   - Choose the "Bar Chart" visualization.
   - This allows comparing the total visits for each article in a clear, visual way.

Step 2:
Event Selection: 
   - Select the "Viewed Page/Screen" event.
   - This event represents a page view and contains attributes relevant to articles.

Step 3:
Filters:
   - Add a filter for "content_type" = "article" to only include article page views.
   - Set a filter for the last 30 days using the date picker or by specifying "Last 30 days" in the time range selector.
   - This focuses the analysis on recent article visi

In [13]:
start_time = time.time()
query = "Show me how to track video engagement"
guidance = rag.get_amplitude_guidance(query)
parse_text_blocks(guidance)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time:.8f} seconds")


Introduction:
To track video engagement in Amplitude using the provided event documentation and attributes, follow these steps:

Step 1:
Chart Type: 
   - Use a Funnel chart to visualize the progression of users through key video milestones (25%, 50%, 75%, 100% completion).
   - This chart type allows you to identify drop-off points and understand overall video engagement.

Step 2:
Events to Use:
   - Video 25% Complete
   - Video 50% Complete
   - Video 75% Complete 
   - Video 100% Complete

Step 3:
Filters:
   - Apply filters based on relevant attributes to focus on specific segments or video types.
   - Example: Filter by "video_type" to analyze engagement for different video categories (e.g., long form, short form, fast tv).
   - Use the "video_source" attribute to filter by video platform (e.g., kaltura, webiny) if needed.

Step 4:
Grouping:
   - Group the funnel chart by the "video_title" attribute to compare engagement across different videos.
   - This will help identify top-

In [14]:
start_time = time.time()
query = "Show me how to track video engagement"
guidance = rag.get_amplitude_guidance(query)
parse_text_blocks(guidance)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time:.8f} seconds")


Introduction:
To track video engagement in Amplitude using the provided event documentation and attributes, follow these steps:

Step 1:
Chart Type:
   - Use a "Funnel" chart to visualize the drop-off in user engagement at different video completion milestones.
   - A funnel chart effectively shows the percentage of users who progress through each stage, helping identify where engagement declines.

Step 2:
Events to Use:
   - Video View Started
   - Video 25% Complete
   - Video 50% Complete
   - Video 75% Complete
   - Video 100% Complete
   These events capture key milestones in video playback and provide insights into engagement levels.

Step 3:
Filters:
   - Apply filters to focus on specific video attributes:
     - video_type: Filter by the type of video (e.g., "long form", "short form") to compare engagement across different formats.
     - video_source: Filter by the source of the video (e.g., "kaltura", "webiny") to analyze performance across different platforms.
     - video

In [15]:
start_time = time.time()
query = "What metrics are available for video analysis?"
guidance = rag.get_amplitude_guidance(query)
parse_text_blocks(guidance)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time:.8f} seconds")


Introduction:
Based on the provided event documentation and attribute details, there are several metrics available for comprehensive video analysis in Amplitude. Here are the implementation steps to set up video analysis:

Step 1:
Chart Type: 
   - Use a funnel chart to analyze video completion rates at different milestones (25%, 50%, 75%, 100%).
   - Funnel charts effectively visualize user progression through sequential steps, making it ideal for tracking video completion.

Step 2:
Events to Use:
   - "Video 25% Complete" 
   - "Video 50% Complete"
   - "Video 75% Complete"
   - "Video 100% Complete"
   These events track when users reach specific milestones in the video playback.

Step 3:
Filters:
   - Apply filters based on relevant attributes to narrow down the analysis:
     - "video_type" to compare completion rates across different video types (e.g., long form, short form, fast tv)
     - "video_source" to analyze performance by video source (e.g., kaltura, webiny)
     - "vid