In [None]:
from langfuse import Langfuse
from datetime import datetime, timedelta
import pandas as pd
from tqdm import tqdm

# Initialize Langfuse client
langfuse = Langfuse(
    public_key="pk-lf-3ebaa19e-3978-4f96-a366-03f98697c5e4",
    secret_key="sk-lf-3e648242-1c6a-414b-91aa-cf46c91190f2",
    host="https://langfuse.kavida.ai"  # or your self-hosted URL
)

# Date range: 26 January to 30 January (inclusive)
start_date = datetime(2025, 1, 26)   # 26 Jan 2025 00:00:00
end_date = datetime(2025, 1, 30, 23, 59, 59)   # 30 Jan 2025 end of day

# # Fetch traces for the date range (Langfuse SDK v3 uses api.trace.list)
# first_traces = langfuse.api.trace.list(
#     from_timestamp=start_date,
#     to_timestamp=end_date,
#     limit=50
# )
# total_pages = first_traces.meta.total_pages
# for page_num in tqdm(range(2, total_pages + 1)):
#     traces = langfuse.api.trace.list(
#         page=page_num,
#         from_timestamp=start_date,
#         to_timestamp=end_date,
#         limit=50
#     )
#     first_traces.data.extend(traces.data)

In [None]:
import concurrent.futures
from tqdm import tqdm
import time
import random

PAGE_LIMIT = 50  # items per page (Langfuse API)

def fetch_page_with_retry(page_num, start_date, end_date, langfuse, max_retries=3, base_delay=1):
    """Fetch a page with exponential backoff retry logic (uses Langfuse SDK v3 api.trace.list)"""
    retries = 0
    while retries <= max_retries:
        try:
            return langfuse.api.trace.list(
                page=page_num,
                limit=PAGE_LIMIT,
                from_timestamp=start_date,
                to_timestamp=end_date
            ).data
        except Exception as e:
            retries += 1
            if retries > max_retries:
                raise
            
            # Exponential backoff with jitter
            delay = base_delay * (2 ** (retries - 1)) + random.uniform(0, 0.5)
            print(f"Error fetching page {page_num}, retrying in {delay:.2f}s... ({retries}/{max_retries})")
            print(f"Error: {str(e)}")
            time.sleep(delay)

# Get first page to determine total pages (Langfuse SDK v3: api.trace.list)
first_traces = langfuse.api.trace.list(
    from_timestamp=start_date,
    to_timestamp=end_date,
    limit=PAGE_LIMIT
)

total_pages = first_traces.meta.total_pages
all_traces = first_traces.data.copy()  # Start with the first page data

# Use ThreadPoolExecutor to fetch the remaining pages in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Create future tasks for each page (skip page 1 as we already have it)
    future_to_page = {
        executor.submit(fetch_page_with_retry, page_num, start_date, end_date, langfuse): page_num
        for page_num in range(2, total_pages + 1)
    }
    
    # Process results as they complete
    for future in tqdm(concurrent.futures.as_completed(future_to_page), total=len(future_to_page)):
        page_num = future_to_page[future]
        try:
            page_data = future.result()
            all_traces.extend(page_data)
        except Exception as exc:
            print(f"Page {page_num} failed after all retry attempts: {exc}")

# Now all_traces contains all the data from all pages

In [6]:
data = []
# for trace in tqdm(first_traces.data):
for trace in tqdm(all_traces):
    # for span in trace.spans:
    print(trace)
    print(trace.name)
    break
    # if trace.metadata is not None and hasattr(trace, 'total_cost'):
    #     module = trace.metadata.get('module', 'unknown')
    #     cost = trace.total_cost or 0
    #     data.append({
    #         'module': module,
    #         'cost': cost,
    #         'timestamp': trace.timestamp,
    #         'user_id': trace.user_id if hasattr(trace, 'user_id') else trace.metadata.get('user_id', 'unknown'),
    #     })


  0%|          | 0/51388 [00:00<?, ?it/s]

id='3b133c36-4955-42e0-9dbc-1a2c93fec3c8' timestamp=datetime.datetime(2025, 7, 4, 9, 11, 4, 81000, tzinfo=datetime.timezone.utc) name='ActionItemsSummarizer' input={'messages': [{'role': 'system', 'content': '\nYou are a helpful assistant that summarizes alerts for a user on the Kavida platform. Kavida helps manage purchase orders, procurement, and the overall supply chain.\n\nYour primary goal is to provide clear, actionable summary of alerts, Seamlessly stitching information in sentences while making it readable to human by group sentences in paragraphs/bullet points of line items that are affected, prioritizing the most critical ones based on business impact.\n\n**Key details to extract and present for each critical alert:**\n1.  **Reason for the alert:** Identify this from the *agentpo_reference_text* excluding definition & explanation of terms and present it in **bold** \n2.  **Source of the information:**\n    *   Examine *agentpo_reference_text*.\n    *   If a clear document fil




In [None]:
#!/usr/bin/env python3
"""
Cost Estimation Function for Document and Purchase Order Processing
===================================================================

This script provides a comprehensive cost estimation function that:
1. Processes langfuse traces from the last 7 days
2. Calculates costs for Gemini 2.5 Pro and Flash models
3. Segregates traces into DAES and PO updater sections
4. Accounts for document processing (attachment_id) and PO processing (po_number)
5. Handles litellm_completion traces without user_id
"""

from langfuse import Langfuse
from datetime import datetime, timedelta
import pandas as pd
from tqdm import tqdm
import concurrent.futures
import time
import random
from typing import Dict, List, Any, Tuple
from collections import defaultdict
import json


class CostEstimator:
    """
    A comprehensive cost estimator for processing documents and purchase orders
    using langfuse traces and various AI models.
    """
    
    def __init__(self, langfuse_client: Langfuse):
        """
        Initialize the cost estimator with langfuse client.
        
        Args:
            langfuse_client: Initialized Langfuse client
        """
        self.langfuse = langfuse_client
        
        # Model pricing (per 1M tokens)
        self.pricing = {
            'gemini_2_5_pro': {
                'input_cost': 1.25,  # $1.25 per 1M tokens
                'output_cost': 2.50  # $2.50 per 1M tokens
            },
            'gemini_2_5_flash': {
                'input_cost': 0.30,  # $0.30 per 1M tokens
                'output_cost': 1.20  # $1.20 per 1M tokens (corrected from 10 which seemed too high)
            }
        }
        
        # DAES trace patterns
        self.daes_flash_patterns = [
            "DocExtractor", 
            "TabularContentCommentContextExtractor", 
            "GPTExcelColumnMapper"
        ]
        
        self.daes_pro_patterns = [
            "GPTCVAttachmentClassification", 
            "GPTCVExcelAttachmentClassification"
        ]
        
        # PO updater patterns
        self.po_updater_patterns = [
            "AgenticPOUpdater", 
            "litellm-completion"
        ]
    
    def fetch_traces_with_retry(self, page_num: int, start_date: datetime, 
                               end_date: datetime, max_retries: int = 3, 
                               base_delay: float = 1.0) -> List[Any]:
        """
        Fetch traces with exponential backoff retry logic.
        
        Args:
            page_num: Page number to fetch
            start_date: Start date for traces
            end_date: End date for traces
            max_retries: Maximum number of retry attempts
            base_delay: Base delay for exponential backoff
            
        Returns:
            List of traces for the page
        """
        retries = 0
        while retries <= max_retries:
            try:
                return self.langfuse.api.trace.list(
                    page=page_num,
                    limit=50,
                    from_timestamp=start_date,
                    to_timestamp=end_date
                ).data
            except Exception as e:
                retries += 1
                if retries > max_retries:
                    print(f"Failed to fetch page {page_num} after {max_retries} retries: {e}")
                    return []
                
                # Exponential backoff with jitter
                delay = base_delay * (2 ** (retries - 1)) + random.uniform(0, 0.5)
                print(f"Error fetching page {page_num}, retrying in {delay:.2f}s... ({retries}/{max_retries})")
                time.sleep(delay)
        
        return []
    
    def fetch_all_traces(self, days_back: int = 7) -> List[Any]:
        """
        Fetch all traces from the last N days using parallel processing.
        
        Args:
            days_back: Number of days to look back
            
        Returns:
            List of all traces
        """
        # Calculate date range
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days_back)
        
        print(f"Fetching traces from {start_date} to {end_date}")
        
        # Get first page to determine total pages (Langfuse SDK v3: api.trace.list)
        try:
            first_traces = self.langfuse.api.trace.list(
                from_timestamp=start_date,
                to_timestamp=end_date,
                limit=50
            )
            total_pages = first_traces.meta.total_pages
            all_traces = first_traces.data.copy()
        except Exception as e:
            print(f"Error fetching first page: {e}")
            return []
        
        if total_pages <= 1:
            return all_traces
        
        print(f"Total pages to fetch: {total_pages}")
        
        # Use ThreadPoolExecutor to fetch remaining pages in parallel
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            future_to_page = {
                executor.submit(self.fetch_traces_with_retry, page_num, start_date, end_date): page_num
                for page_num in range(2, total_pages + 1)
            }
            
            # Process results as they complete
            for future in tqdm(concurrent.futures.as_completed(future_to_page), 
                             total=len(future_to_page), desc="Fetching traces"):
                page_num = future_to_page[future]
                try:
                    page_data = future.result()
                    all_traces.extend(page_data)
                except Exception as exc:
                    print(f"Page {page_num} failed: {exc}")
        
        print(f"Total traces fetched: {len(all_traces)}")
        return all_traces
    
    def extract_tokens_from_trace(self, trace: Any) -> Tuple[int, int]:
        """
        Extract input and output tokens from a trace.
        
        Args:
            trace: Langfuse trace object
            
        Returns:
            Tuple of (input_tokens, output_tokens)
        """
        input_tokens = 0
        output_tokens = 0
        
        # Try to get tokens from observations (Langfuse SDK v3: api.observations.get)
        if hasattr(trace, 'observations'):
            for obs_id in trace.observations:
                try:
                    observation = self.langfuse.api.observations.get(observation_id=obs_id)
                    if hasattr(observation, 'usage') and observation.usage:
                        usage = observation.usage
                        input_tokens += getattr(usage, 'input', 0)
                        output_tokens += getattr(usage, 'output', 0)
                except Exception as e:
                    # Continue processing other observations
                    continue
        
        # Fallback: try to estimate from total_cost if available
        if input_tokens == 0 and output_tokens == 0 and hasattr(trace, 'total_cost') and trace.total_cost:
            # Use a rough estimation based on average token costs
            # This is a fallback estimation
            estimated_total_tokens = int(trace.total_cost / 0.002 * 1000000)  # Rough estimation
            input_tokens = estimated_total_tokens // 2
            output_tokens = estimated_total_tokens // 2
        
        return input_tokens, output_tokens
    
    def categorize_trace(self, trace: Any) -> Tuple[str, str]:
        """
        Categorize a trace into DAES or PO updater and determine model type.
        
        Args:
            trace: Langfuse trace object
            
        Returns:
            Tuple of (category, model_type) where:
            - category: 'daes' or 'po_updater'
            - model_type: 'gemini_2_5_pro' or 'gemini_2_5_flash'
        """
        trace_name = trace.name if hasattr(trace, 'name') else ''
        
        # Check DAES patterns
        for pattern in self.daes_flash_patterns:
            if trace_name.endswith(pattern):
                return 'daes', 'gemini_2_5_flash'
        
        for pattern in self.daes_pro_patterns:
            if trace_name.endswith(pattern):
                return 'daes', 'gemini_2_5_pro'
        
        # Check PO updater patterns
        for pattern in self.po_updater_patterns:
            if pattern in trace_name:
                return 'po_updater', 'gemini_2_5_pro'
        
        # Default categorization
        return 'unknown', 'gemini_2_5_pro'
    
    def calculate_cost(self, input_tokens: int, output_tokens: int, model_type: str) -> float:
        """
        Calculate cost based on tokens and model type.
        
        Args:
            input_tokens: Number of input tokens
            output_tokens: Number of output tokens
            model_type: Type of model ('gemini_2_5_pro' or 'gemini_2_5_flash')
            
        Returns:
            Total cost in dollars
        """
        if model_type not in self.pricing:
            model_type = 'gemini_2_5_pro'  # Default fallback
        
        pricing = self.pricing[model_type]
        
        input_cost = (input_tokens / 1_000_000) * pricing['input_cost']
        output_cost = (output_tokens / 1_000_000) * pricing['output_cost']
        
        return input_cost + output_cost
    
    def process_traces(self, traces: List[Any]) -> Dict[str, Any]:
        """
        Process all traces and calculate costs.
        
        Args:
            traces: List of langfuse traces
            
        Returns:
            Dictionary containing processed cost data
        """
        processed_data = {
            'daes': defaultdict(list),
            'po_updater': defaultdict(list),
            'unknown': defaultdict(list)
        }
        
        # Track unique documents and POs
        unique_attachments = set()
        unique_pos = set()
        
        # Store litellm traces for later distribution
        litellm_traces = []
        agentic_po_traces = []
        
        print("Processing traces...")
        for trace in tqdm(traces, desc="Processing traces"):
            try:
                # Extract basic information
                user_id = getattr(trace, 'user_id', None)
                if not user_id and hasattr(trace, 'metadata') and trace.metadata:
                    user_id = trace.metadata.get('user_id', 'unknown')
                
                # Extract attachment_id and po_number from metadata if available
                attachment_id = None
                po_number = None
                
                if hasattr(trace, 'metadata') and trace.metadata:
                    attachment_id = trace.metadata.get('attachment_id')
                    po_number = trace.metadata.get('po_number')
                
                # Track unique values
                if attachment_id:
                    unique_attachments.add(attachment_id)
                if po_number:
                    unique_pos.add(po_number)
                
                # Get tokens
                input_tokens, output_tokens = self.extract_tokens_from_trace(trace)
                
                # Categorize trace
                category, model_type = self.categorize_trace(trace)
                
                # Calculate cost
                cost = self.calculate_cost(input_tokens, output_tokens, model_type)
                
                # Store trace data
                trace_data = {
                    'trace_id': getattr(trace, 'id', 'unknown'),
                    'name': getattr(trace, 'name', 'unknown'),
                    'user_id': user_id,
                    'timestamp': getattr(trace, 'timestamp', None),
                    'input_tokens': input_tokens,
                    'output_tokens': output_tokens,
                    'model_type': model_type,
                    'cost': cost,
                    'attachment_id': attachment_id,
                    'po_number': po_number
                }
                
                # Handle litellm traces separately
                if 'litellm-completion' in trace_data['name']:
                    litellm_traces.append(trace_data)
                elif 'AgenticPOUpdater' in trace_data['name']:
                    agentic_po_traces.append(trace_data)
                else:
                    processed_data[category][user_id or 'unknown'].append(trace_data)
                
            except Exception as e:
                print(f"Error processing trace: {e}")
                continue
        
        # Distribute litellm traces equally among AgenticPOUpdater calls (when both exist)
        if litellm_traces and agentic_po_traces:
            self.distribute_litellm_traces(litellm_traces, agentic_po_traces, processed_data)
        elif agentic_po_traces:
            # AgenticPOUpdater traces without litellm: add them to processed_data as-is
            for trace_data in agentic_po_traces:
                processed_data['po_updater'][trace_data['user_id'] or 'unknown'].append(trace_data)
        
        # Add summary statistics
        processed_data['summary'] = {
            'total_traces': len(traces),
            'unique_attachments': len(unique_attachments),
            'unique_pos': len(unique_pos),
            'unique_users': len(set(user_id for category_data in processed_data.values() 
                                   if isinstance(category_data, dict)
                                   for user_id in category_data.keys())),
            'attachment_list': list(unique_attachments),
            'po_list': list(unique_pos)
        }
        
        return processed_data
    
    def distribute_litellm_traces(self, litellm_traces: List[Dict], 
                                 agentic_po_traces: List[Dict], 
                                 processed_data: Dict[str, Any]):
        """
        Distribute litellm traces equally among AgenticPOUpdater calls.
        
        Args:
            litellm_traces: List of litellm trace data
            agentic_po_traces: List of AgenticPOUpdater trace data
            processed_data: Main processed data dictionary to update
        """
        if not agentic_po_traces:
            # If no AgenticPOUpdater traces, add litellm traces as unknown
            for trace_data in litellm_traces:
                processed_data['unknown'][trace_data['user_id'] or 'unknown'].append(trace_data)
            return
        
        # Calculate total tokens from litellm traces
        total_input_tokens = sum(trace['input_tokens'] for trace in litellm_traces)
        total_output_tokens = sum(trace['output_tokens'] for trace in litellm_traces)
        
        # Distribute equally among AgenticPOUpdater traces
        tokens_per_trace = {
            'input': total_input_tokens // len(agentic_po_traces),
            'output': total_output_tokens // len(agentic_po_traces)
        }
        
        # Add distributed tokens to each AgenticPOUpdater trace
        for trace_data in agentic_po_traces:
            trace_data['input_tokens'] += tokens_per_trace['input']
            trace_data['output_tokens'] += tokens_per_trace['output']
            trace_data['cost'] = self.calculate_cost(
                trace_data['input_tokens'], 
                trace_data['output_tokens'], 
                trace_data['model_type']
            )
            
            # Add to processed data
            processed_data['po_updater'][trace_data['user_id'] or 'unknown'].append(trace_data)
    
    def generate_cost_report(self, processed_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Generate a comprehensive cost report.
        
        Args:
            processed_data: Processed trace data
            
        Returns:
            Dictionary containing cost report
        """
        report = {
            'summary': processed_data['summary'],
            'costs_by_category': {},
            'costs_by_user': {},
            'costs_by_model': defaultdict(float),
            'document_processing_costs': {},
            'po_processing_costs': {},
            'total_cost': 0.0
        }
        
        # Calculate costs by category
        for category in ['daes', 'po_updater', 'unknown']:
            category_data = processed_data[category]
            if not isinstance(category_data, dict):
                continue
                
            category_cost = 0.0
            category_tokens = {'input': 0, 'output': 0}
            
            for user_id, traces in category_data.items():
                user_cost = sum(trace['cost'] for trace in traces)
                category_cost += user_cost
                
                # Track tokens
                for trace in traces:
                    category_tokens['input'] += trace['input_tokens']
                    category_tokens['output'] += trace['output_tokens']
                    
                    # Track model costs
                    report['costs_by_model'][trace['model_type']] += trace['cost']
            
            report['costs_by_category'][category] = {
                'total_cost': category_cost,
                'total_input_tokens': category_tokens['input'],
                'total_output_tokens': category_tokens['output'],
                'trace_count': sum(len(traces) for traces in category_data.values())
            }
            
            report['total_cost'] += category_cost
        
        # Calculate costs by user
        for category in ['daes', 'po_updater', 'unknown']:
            category_data = processed_data[category]
            if not isinstance(category_data, dict):
                continue
                
            for user_id, traces in category_data.items():
                if user_id not in report['costs_by_user']:
                    report['costs_by_user'][user_id] = {
                        'daes': 0.0,
                        'po_updater': 0.0,
                        'unknown': 0.0,
                        'total': 0.0
                    }
                
                user_cost = sum(trace['cost'] for trace in traces)
                report['costs_by_user'][user_id][category] = user_cost
                report['costs_by_user'][user_id]['total'] += user_cost
        
        # Calculate per-document and per-PO costs (guard against missing category keys)
        if report['summary']['unique_attachments'] > 0 and 'daes' in report['costs_by_category']:
            daes_cost = report['costs_by_category']['daes']['total_cost']
            report['document_processing_costs'] = {
                'total_documents': report['summary']['unique_attachments'],
                'total_daes_cost': daes_cost,
                'cost_per_document': daes_cost / report['summary']['unique_attachments']
            }
        
        if report['summary']['unique_pos'] > 0 and 'po_updater' in report['costs_by_category']:
            po_cost = report['costs_by_category']['po_updater']['total_cost']
            report['po_processing_costs'] = {
                'total_pos': report['summary']['unique_pos'],
                'total_po_cost': po_cost,
                'cost_per_po': po_cost / report['summary']['unique_pos']
            }
        
        return report
    
    def estimate_future_costs(self, processed_data: Dict[str, Any], 
                             future_documents: int = 0, 
                             future_pos: int = 0) -> Dict[str, Any]:
        """
        Estimate future costs based on historical data.
        
        Args:
            processed_data: Historical processed data
            future_documents: Number of future documents to process
            future_pos: Number of future POs to process
            
        Returns:
            Dictionary containing future cost estimates
        """
        report = self.generate_cost_report(processed_data)
        
        estimates = {
            'historical_analysis': {
                'documents_processed': report['summary']['unique_attachments'],
                'pos_processed': report['summary']['unique_pos'],
                'cost_per_document': report.get('document_processing_costs', {}).get('cost_per_document', 0),
                'cost_per_po': report.get('po_processing_costs', {}).get('cost_per_po', 0)
            },
            'future_estimates': {}
        }
        
        if future_documents > 0 and 'document_processing_costs' in report:
            doc_cost = report['document_processing_costs']['cost_per_document'] * future_documents
            estimates['future_estimates']['document_processing'] = {
                'documents': future_documents,
                'estimated_cost': doc_cost
            }
        
        if future_pos > 0 and 'po_processing_costs' in report:
            po_cost = report['po_processing_costs']['cost_per_po'] * future_pos
            estimates['future_estimates']['po_processing'] = {
                'pos': future_pos,
                'estimated_cost': po_cost
            }
        
        total_future_cost = sum(
            est.get('estimated_cost', 0) 
            for est in estimates['future_estimates'].values()
        )
        estimates['future_estimates']['total_estimated_cost'] = total_future_cost
        
        return estimates


def main():
    """
    Main function to demonstrate the cost estimation functionality.
    """
    # Initialize Langfuse client
    langfuse = Langfuse(
        public_key="pk-lf-3ebaa19e-3978-4f96-a366-03f98697c5e4",
        secret_key="sk-lf-3e648242-1c6a-414b-91aa-cf46c91190f2",
        host="https://langfuse.kavida.ai"
    )
    
    # Create cost estimator
    estimator = CostEstimator(langfuse)
    
    # Fetch traces from last 7 days
    traces = estimator.fetch_all_traces(days_back=7)
    
    if not traces:
        print("No traces found!")
        return
    
    # Process traces
    processed_data = estimator.process_traces(traces)
    
    # Generate cost report
    report = estimator.generate_cost_report(processed_data)
    
    # Print summary
    print("\n" + "="*60)
    print("COST ESTIMATION REPORT")
    print("="*60)
    
    print(f"\nSummary:")
    print(f"- Total traces processed: {report['summary']['total_traces']}")
    print(f"- Unique documents: {report['summary']['unique_attachments']}")
    print(f"- Unique POs: {report['summary']['unique_pos']}")
    print(f"- Unique users: {report['summary']['unique_users']}")
    print(f"- Total cost: ${report['total_cost']:.4f}")
    
    print(f"\nCosts by Category:")
    for category, data in report['costs_by_category'].items():
        print(f"- {category.upper()}: ${data['total_cost']:.4f} "
              f"({data['trace_count']} traces)")
    
    print(f"\nCosts by Model:")
    for model, cost in report['costs_by_model'].items():
        print(f"- {model}: ${cost:.4f}")
    
    if 'document_processing_costs' in report:
        doc_costs = report['document_processing_costs']
        print(f"\nDocument Processing:")
        print(f"- Cost per document: ${doc_costs['cost_per_document']:.4f}")
        print(f"- Total documents: {doc_costs['total_documents']}")
    
    if 'po_processing_costs' in report:
        po_costs = report['po_processing_costs']
        print(f"\nPO Processing:")
        print(f"- Cost per PO: ${po_costs['cost_per_po']:.4f}")
        print(f"- Total POs: {po_costs['total_pos']}")
    
    # Example future cost estimation
    future_estimates = estimator.estimate_future_costs(
        processed_data, 
        future_documents=100, 
        future_pos=50
    )
    
    print(f"\nFuture Cost Estimates (100 docs, 50 POs):")
    if 'future_estimates' in future_estimates:
        for key, estimate in future_estimates['future_estimates'].items():
            if key != 'total_estimated_cost':
                print(f"- {key}: ${estimate['estimated_cost']:.4f}")
        print(f"- Total estimated cost: ${future_estimates['future_estimates']['total_estimated_cost']:.4f}")
    
    # Save detailed report to JSON
    with open('cost_report.json', 'w') as f:
        # Convert datetime objects to strings for JSON serialization
        json_report = json.dumps(report, indent=2, default=str)
        f.write(json_report)
    
    print(f"\nDetailed report saved to 'cost_report.json'")


if __name__ == "__main__":
    main()

In [1]:
class GeminiFlash:
    input_cost_per_million = 0.3 
    output_cost_per_million = 2.5

    @classmethod
    def input_cost(cls, tokens):
        return (tokens / 1_000_000) * cls.input_cost_per_million
    
    @classmethod
    def output_cost(cls, tokens):
        return (tokens / 1_000_000) * cls.output_cost_per_million

class GeminiPro:
    input_cost_per_million = 1.25 
    output_cost_per_million = 10

    @classmethod
    def input_cost(cls, tokens):
        return (tokens / 1_000_000) * cls.input_cost_per_million
    
    @classmethod
    def output_cost(cls, tokens):
        return (tokens / 1_000_000) * cls.output_cost_per_million

# attachment
# classification
clf_model = GeminiPro
cost_per_clf = clf_model.input_cost(2000) + clf_model.output_cost(80)

extr_model = GeminiFlash
cost_per_extr = extr_model.input_cost(7000) + extr_model.output_cost(900)

cost_per_attachment = cost_per_clf + cost_per_extr
cost_per_attachment  # 0.01 USD

0.00765

In [3]:
hist_ext_model = GeminiFlash
_total_cost = hist_ext_model.input_cost(419686) + hist_ext_model.output_cost(136361)
_total_cost

0.46680829999999995

In [3]:
# PO
n_pre_update_steps = 4
pre_update_model = GeminiPro
pre_update_steps_cost = n_pre_update_steps * (pre_update_model.input_cost(9500) + pre_update_model.output_cost(7000))

# PO updater
n_po_updater_steps = 7
po_updater_model = GeminiPro
po_updater_steps_cost = n_po_updater_steps * (po_updater_model.input_cost(1900) + po_updater_model.output_cost(2000))

# Total cost
num_of_emails = 1.2
total_cost = (pre_update_steps_cost + po_updater_steps_cost) * num_of_emails
total_cost

0.58095

In [27]:
(n_attachments_per_day * cost_per_attachment)

0.022949999999999998

In [28]:
(n_pos_per_day * cost_per_po)

5.8095

In [25]:
cost_per_attachment = cost_per_attachment
cost_per_po = total_cost

n_attachments_per_day = 3
n_pos_per_day = 10

cost_per_day = (n_attachments_per_day * cost_per_attachment) + (n_pos_per_day * cost_per_po)
cost_per_day

5.83245

In [26]:
cost_per_day * 30

174.9735

In [17]:
# Create a list to store relevant data
data = []
# for trace in tqdm(first_traces.data):
for trace in tqdm(all_traces):
    # for span in trace.spans:
    if trace.metadata is not None and hasattr(trace, 'total_cost'):
        module = trace.metadata.get('module', 'unknown')
        cost = trace.total_cost or 0
        data.append({
            'module': module,
            'cost': cost,
            'timestamp': trace.timestamp,
            'user_id': trace.user_id if hasattr(trace, 'user_id') else trace.metadata.get('user_id', 'unknown'),
        })

# Convert to pandas DataFrame
df = pd.DataFrame(data)

# Group by module and calculate total cost
cost_by_module = df.groupby('module')['cost'].sum()

# Print results
print("\nTotal cost by module (for the fetched date range):")
print("==========================================")
cost_by_module = cost_by_module.sort_values(ascending=False)
for module, cost in cost_by_module.items():
    print(f"{module}: ${cost:.4f}")

print("\nTotal overall cost: ${:.4f}".format(cost_by_module.sum()))

# Optional: Create a more detailed analysis
detailed_analysis = {
    'total_cost': cost_by_module.sum(),
    'cost_by_module': cost_by_module.to_dict(),
    'number_of_calls': df.groupby('module').size().sort_values(ascending=False).to_dict(),
    'average_cost_per_call': (df.groupby('module')['cost'].mean().sort_values(ascending=False)).to_dict()
}

print("\nDetailed Analysis:")
print("=================")
print(f"Total number of API calls: {len(df)}")
print("\nCalls per module:")
for module, calls in detailed_analysis['number_of_calls'].items():
    print(f"{module}: {calls} calls")

print("\nAverage cost per call by module:")
for module, avg_cost in detailed_analysis['average_cost_per_call'].items():
    print(f"{module}: ${avg_cost:.4f}")

# # Optional: Save to CSV
# df.to_csv('langfuse_cost_analysis.csv', index=False)

  0%|          | 0/139229 [00:00<?, ?it/s]

100%|██████████| 139229/139229 [00:00<00:00, 427653.02it/s]



Total cost by module for the last 30 days:
GPTOADocExtractor: $688.6593
GPTPODocExtractor: $434.7120
GPTInvoiceDocExtractor: $267.6328
PeriodicPOLatestStateGenerator: $218.3044
GPTDeliveryNoteDocExtractor: $71.0199
POUpdater: $63.8104
GPTCVAttachmentClassificationV2: $59.2851
GPTCVAttachmentClassification: $38.4690
GPTCVExcelAttachmentClassification: $29.8586
TabularContentCommentContextExtractor: $17.3436
GPTMaterialCertDocExtractor: $12.5434
GPTPackingListDocExtractor: $11.7631
GPTDocIsLinkedToPO: $11.3566
GPTDocIsPurchaseOrder: $9.2655
GPTCVAttachmentPOExtractionEnhanced: $6.9221
UserOnboardingPOLatestStateGenerator: $6.8183
GPTBillOfLadingDocExtractor: $6.5602
GPTAirwayBillDocExtractor: $6.1257
GPTWayBillDocExtractor: $2.7506
GPTExcelColumnMapper: $0.0525

Total overall cost: $1963.2530

Detailed Analysis:
Total number of API calls: 129215

Calls per module:
GPTPODocExtractor: 35933 calls
GPTOADocExtractor: 33765 calls
GPTInvoiceDocExtractor: 16511 calls
GPTCVAttachmentClassificat

In [18]:
cost_by_module

module
GPTOADocExtractor                        688.659255
GPTPODocExtractor                        434.712050
GPTInvoiceDocExtractor                   267.632752
PeriodicPOLatestStateGenerator           218.304365
GPTDeliveryNoteDocExtractor               71.019870
POUpdater                                 63.810372
GPTCVAttachmentClassificationV2           59.285058
GPTCVAttachmentClassification             38.469003
GPTCVExcelAttachmentClassification        29.858602
TabularContentCommentContextExtractor     17.343602
GPTMaterialCertDocExtractor               12.543408
GPTPackingListDocExtractor                11.763053
GPTDocIsLinkedToPO                        11.356560
GPTDocIsPurchaseOrder                      9.265545
GPTCVAttachmentPOExtractionEnhanced        6.922133
UserOnboardingPOLatestStateGenerator       6.818268
GPTBillOfLadingDocExtractor                6.560225
GPTAirwayBillDocExtractor                  6.125667
GPTWayBillDocExtractor                     2.750637
GPTEx

# Langfuse cost per module per user.

In [19]:
import pandas as pd
from tqdm import tqdm
import ipywidgets as widgets
from IPython.display import display, clear_output

# Convert to pandas DataFrame
df = pd.DataFrame(data)

# Create overall summary
overall_cost_by_module = df.groupby('module')['cost'].sum().sort_values(ascending=False)
total_overall_cost = overall_cost_by_module.sum()

print("\nOverall Total cost by module:")
print("============================")
for module, cost in overall_cost_by_module.items():
    print(f"{module}: ${cost:.4f}")
print(f"\nTotal overall cost: ${total_overall_cost:.4f}")

# Get unique users for dropdown
unique_users = sorted(df['user_id'].unique())

# Create interactive widget
def show_user_analysis(user):
    clear_output(wait=True)
    
    # Display the dropdown again to maintain UI
    display(user_dropdown)
    
    # Filter data for selected user
    user_data = df[df['user_id'] == user]
    
    if user_data.empty:
        print(f"No data available for user: {user}")
        return
    
    # Calculate costs by module for this user
    user_cost_by_module = user_data.groupby('module')['cost'].sum().sort_values(ascending=False)
    total_user_cost = user_cost_by_module.sum()
    
    print(f"\nCost Analysis for User: {user}")
    print("=" * 50)
    
    # Create and display a formatted table
    module_data = []
    for module, cost in user_cost_by_module.items():
        percent = (cost / total_user_cost) * 100 if total_user_cost > 0 else 0
        calls = user_data[user_data['module'] == module].shape[0]
        avg_cost = cost / calls if calls > 0 else 0
        module_data.append([module, cost, percent, calls, avg_cost])
    
    # Create DataFrame for better display
    result_df = pd.DataFrame(
        module_data, 
        columns=['Module', 'Total Cost ($)', 'Percentage (%)', 'Number of Calls', 'Avg Cost/Call ($)']
    )
    
    # Format the numeric columns
    result_df['Total Cost ($)'] = result_df['Total Cost ($)'].map('${:.4f}'.format)
    result_df['Percentage (%)'] = result_df['Percentage (%)'].map('{:.2f}%'.format)
    result_df['Avg Cost/Call ($)'] = result_df['Avg Cost/Call ($)'].map('${:.4f}'.format)
    
    display(result_df)
    
    print(f"\nTotal cost for user {user}: ${total_user_cost:.4f}")
    print(f"Total calls made by user: {user_data.shape[0]}")
    
    # Optional: Add visualization
    try:
        import matplotlib.pyplot as plt
        plt.figure(figsize=(10, 6))
        user_cost_by_module.plot(kind='bar', color='skyblue')
        plt.title(f'Cost by Module for User: {user}')
        plt.ylabel('Cost ($)')
        plt.xlabel('Module')
        plt.xticks(rotation=45, ha='right')
        plt.tight_layout()
        plt.show()
    except ImportError:
        print("Matplotlib not available for visualization.")

# Create the dropdown widget
user_dropdown = widgets.Dropdown(
    options=[('All Users', 'all')] + [(user, user) for user in unique_users],
    description='Select User:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='50%')
)

# Define what happens when dropdown value changes
def on_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        selected_user = change['new']
        if selected_user == 'all':
            clear_output(wait=True)
            display(user_dropdown)
            print("\nOverall Total cost by module:")
            print("============================")
            for module, cost in overall_cost_by_module.items():
                print(f"{module}: ${cost:.4f}")
            print(f"\nTotal overall cost: ${total_overall_cost:.4f}")
        else:
            show_user_analysis(selected_user)

user_dropdown.observe(on_change)

# Display the dropdown
display(user_dropdown)

Dropdown(description='Select User:', index=28, layout=Layout(width='50%'), options=(('All Users', 'all'), ('62…

Unnamed: 0,Module,Total Cost ($),Percentage (%),Number of Calls,Avg Cost/Call ($)
0,GPTOADocExtractor,$588.1114,67.94%,25535,$0.0230
1,GPTInvoiceDocExtractor,$195.0551,22.53%,12584,$0.0155
2,POUpdater,$32.3232,3.73%,1335,$0.0242
3,GPTPODocExtractor,$17.7234,2.05%,1676,$0.0106
4,TabularContentCommentContextExtractor,$10.8606,1.25%,590,$0.0184
5,GPTCVExcelAttachmentClassification,$7.7939,0.90%,1332,$0.0059
6,GPTCVAttachmentPOExtractionEnhanced,$3.6294,0.42%,116,$0.0313
7,GPTCVAttachmentClassification,$2.9925,0.35%,550,$0.0054
8,GPTPackingListDocExtractor,$2.6413,0.31%,367,$0.0072
9,GPTDocIsPurchaseOrder,$2.4138,0.28%,637,$0.0038


# Errata

In [20]:
# Errata

In [None]:
# Pagination metadata from first fetch (use first_traces, not traces)
first_traces.meta

In [23]:
# Total traces fetched (all pages); first_traces.data has only first page (e.g. 50)
len(all_traces)

50

In [21]:
# Use all_traces (full list); first_traces.data has only PAGE_LIMIT items (e.g. 50)
# Ensure index < len(all_traces), e.g. all_traces[0] or all_traces[min(3410, len(all_traces)-1)]
all_traces[min(3410, len(all_traces) - 1)].metadata if all_traces else None

IndexError: list index out of range