In [None]:
!uv pip install pandas python-dotenv "arize[Tracing]" numpy

In [None]:
import time
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import uuid
import random
import string
from contextlib import contextmanager
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Benchmarking functions
@contextmanager
def timer():
    """Context manager to time execution"""
    start_time = time.time()
    yield
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"Execution time: {execution_time:.2f} seconds")
    
# Configuration
TARGET_ROWS = 10_000_000  # Change this to test different sizes (e.g., 1_000_000, 10_000_000, 20_000_000)
TEXT_LENGTH = 25_000  # 25k characters
UNIQUE_KEYWORD_ROWS = 10_000  # Number of rows with unique keywords
TIMESTAMP_SPREAD_DAYS = 90  # Spread over past 90 days

# Unique keywords for search testing
UNIQUE_KEYWORDS = [
    "BENCHMARK_ALPHA",
    "BENCHMARK_BETA", 
    "BENCHMARK_GAMMA",
    "BENCHMARK_DELTA",
    "BENCHMARK_EPSILON",
    "BENCHMARK_ZETA",
    "BENCHMARK_ETA",
    "BENCHMARK_THETA",
    "BENCHMARK_IOTA",
    "BENCHMARK_KAPPA"
]

print(f"Target dataset size: {TARGET_ROWS:,} rows")


## Step 1: Load Downloaded Data

First, download span data from Arize UI and save it as a CSV file. Update the path below to point to your downloaded file.


In [None]:
# Load the downloaded span data
# UPDATE THIS PATH to your downloaded CSV file
DATA_FILE_PATH = "tracing_export.csv"  # Change this to your actual file path

try:
    df_original = pd.read_csv(DATA_FILE_PATH)
    print(f"Loaded {len(df_original)} rows from {DATA_FILE_PATH}")
    print(f"Columns: {list(df_original.columns)}")
except FileNotFoundError:
    print(f"ERROR: File not found at {DATA_FILE_PATH}")
    print("Please download span data from Arize UI and update the DATA_FILE_PATH variable")
    raise


In [None]:
# =============================================================================
# DATA PREPARATION FUNCTIONS
# =============================================================================

def generate_large_text(base_text, target_length, unique_keyword=None):
    """Generate text of specified length with optional unique keyword"""
    # Start with unique keyword if provided
    parts = [f"SEARCHABLE_CONTENT: {unique_keyword}\n\n"] if unique_keyword else []
    parts.append(str(base_text) if base_text else "")
    
    # Fill remaining space with lorem ipsum variations
    lorem_base = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. "
    while sum(len(p) for p in parts) < target_length:
        random_word = ''.join(random.choices(string.ascii_lowercase, k=random.randint(5, 15)))
        parts.append(f"{lorem_base}Random_{random_word}. ")
    
    return ''.join(parts)[:target_length]


def duplicate_rows(df, target_rows):
    """Duplicate dataframe rows to reach target number"""
    current_rows = len(df)
    if current_rows >= target_rows:
        return df
    
    # Calculate multiplication factor and duplicate
    multiplier = (target_rows // current_rows) + 1
    print(f"   Duplicating {current_rows} rows {multiplier}x to reach {target_rows:,}")
    
    df_list = [df.copy() for _ in range(multiplier)]
    df_final = pd.concat(df_list, ignore_index=True).iloc[:target_rows].copy()
    df_final['unique_id'] = [str(uuid.uuid4()) for _ in range(len(df_final))]
    
    return df_final


def spread_timestamps(df, days_back=90):
    """Spread timestamps over the past N days from today"""
    num_rows = len(df)
    end_time = datetime.now()
    start_time = end_time - timedelta(days=days_back)
    
    print(f"   Spreading {num_rows:,} timestamps from {start_time.strftime('%Y-%m-%d')} to {end_time.strftime('%Y-%m-%d')}")
    
    # Generate evenly spaced timestamps and shuffle them
    time_increment = timedelta(days=days_back) / num_rows
    timestamps = [start_time + (i * time_increment) for i in range(num_rows)]
    random.shuffle(timestamps)
    
    # Update timestamp column (prefer start_time for log_spans compatibility)
    timestamp_col = next((col for col in ['start_time', 'timestamp', 'time'] if col in df.columns), 'start_time')
    df[timestamp_col] = timestamps
    
    return df


def prepare_data(df_original, target_rows, text_length=TEXT_LENGTH):
    """Optimized data preparation pipeline for any dataset size"""
    print(f"\n=== Preparing {target_rows:,} rows ===")
    
    # Step 1: Duplicate to target size
    print("1. Duplicating rows...")
    with timer():
        df = duplicate_rows(df_original, target_rows)
    
    # Step 2: Spread timestamps over past 90 days
    print("2. Spreading timestamps...")
    with timer():
        df = spread_timestamps(df, TIMESTAMP_SPREAD_DAYS)
    
    # Step 3: Generate large text with keywords (universally optimized)
    print(f"3. Generating {text_length:,}-char text...")
    with timer():
        input_col = next((col for col in ['attributes.input.value', 'input', 'prompt'] if col in df.columns), 'attributes.input.value')
        
        # Random keyword distribution
        keyword_rows = min(UNIQUE_KEYWORD_ROWS, len(df))
        print(f"   Distributing {keyword_rows} keywords across {len(df):,} rows...")
        
        # Efficient random sampling (works for all dataset sizes)
        if len(df) > 1_000_000:
            import numpy as np
            random_indices = np.random.choice(len(df), size=keyword_rows, replace=False)
            random_keyword_indices = set(random_indices)
        else:
            random_keyword_indices = set(random.sample(range(len(df)), keyword_rows))
        
        # Pre-generate all text variants once (major performance boost)
        print("   Pre-generating text templates...")
        base_text = generate_large_text("", text_length, None)
        keyword_texts = {kw: generate_large_text("", text_length, kw) for kw in UNIQUE_KEYWORDS}
        
        # Create keyword assignments
        keyword_assignments = {}
        for idx, row_index in enumerate(sorted(random_keyword_indices)):
            keyword = UNIQUE_KEYWORDS[idx % len(UNIQUE_KEYWORDS)]
            keyword_assignments[row_index] = keyword
        
        # Show sample assignments
        sample_assignments = list(keyword_assignments.items())[:10]
        for row_idx, keyword in sample_assignments:
            print(f"   Row {row_idx}: '{keyword}'")
        
        # Optimized text assignment (handles all dataset sizes efficiently)
        if len(df) > 2_000_000:
            # Chunked processing for very large datasets
            print("   Using chunked processing for memory efficiency...")
            chunk_size = 50_000
            
            for chunk_start in range(0, len(df), chunk_size):
                chunk_end = min(chunk_start + chunk_size, len(df))
                
                chunk_texts = [
                    keyword_texts.get(keyword_assignments.get(i), base_text) 
                    for i in range(chunk_start, chunk_end)
                ]
                df.iloc[chunk_start:chunk_end, df.columns.get_loc(input_col)] = chunk_texts
                
                # Progress reporting every 1M rows
                if chunk_start % 1_000_000 == 0 and chunk_start > 0:
                    print(f"     Progress: {chunk_end:,}/{len(df):,} rows ({chunk_end/len(df)*100:.1f}%)")
        else:
            # Fast vectorized assignment for smaller datasets
            text_values = [keyword_texts.get(keyword_assignments.get(i), base_text) for i in range(len(df))]
            df[input_col] = text_values
        
        print(f"   ✅ Keywords distributed across {keyword_rows} random rows")
        
    print(f"✅ Data preparation complete: {len(df):,} rows")
    return df


In [None]:
# Prepare dataset (single optimized function handles all sizes)
df_prepared = prepare_data(df_original, TARGET_ROWS)


## Step 2: Upload Spans to Arize


In [None]:
from arize.pandas.logger import Client

# Configuration for span logging
ARIZE_SPACE_ID = os.getenv("ARIZE_SPACE_ID")
ARIZE_API_KEY = os.getenv("ARIZE_API_KEY")

# Create a unique project name for this benchmark
PROJECT_NAME = f"TextSearchBench-{TARGET_ROWS}-{datetime.now().strftime('%H%M')}"

# Setup Arize client for logging spans
arize_client = Client(
    space_id=ARIZE_SPACE_ID,
    api_key=ARIZE_API_KEY,
)

print(f"Arize project name: {PROJECT_NAME}")
print("✅ Arize client setup complete!")


In [None]:
def upload_spans_to_arize(df, batch_size=50_000):
    """Upload dataframe rows as spans to Arize using log_spans with optimized batching"""
    
    total_rows = len(df)
    print(f"\n=== Uploading {total_rows:,} spans to Arize (Optimized with Batching) ===")
    print(f"Project: {PROJECT_NAME}")
    print(f"Batch size: {batch_size:,} rows per batch")
    
    # Prepare spans DataFrame efficiently
    print("\nPreparing spans DataFrame...")
    with timer():
        spans_df = prepare_spans_dataframe(df)
        print(f"   ✅ Prepared {len(spans_df):,} spans with {len(spans_df.columns)} columns")
    
    # Batch upload with progress tracking and error handling
    print(f"\n🚀 Uploading in {(total_rows + batch_size - 1) // batch_size} batches...")
    
    successful_batches = 0
    failed_batches = 0
    total_uploaded = 0
    
    with timer():
        for batch_start in range(0, total_rows, batch_size):
            batch_end = min(batch_start + batch_size, total_rows)
            batch_num = (batch_start // batch_size) + 1
            total_batches = (total_rows + batch_size - 1) // batch_size
            
            print(f"\n📦 Batch {batch_num}/{total_batches}: Uploading rows {batch_start:,}-{batch_end:,}...")
            
            # Extract batch
            batch_df = spans_df.iloc[batch_start:batch_end].copy()
            batch_df.reset_index(drop=True, inplace=True)
            
            try:
                # Upload batch with timeout and retry logic
                response = arize_client.log_spans(
                    dataframe=batch_df,
                    model_id=PROJECT_NAME,
                    model_version="1.0",
                    validate=True,
                    verbose=False  # Reduce noise for batched uploads
                )
                
                if response.status_code == 200:
                    successful_batches += 1
                    total_uploaded += len(batch_df)
                    print(f"   ✅ Batch {batch_num} uploaded successfully ({len(batch_df):,} spans)")
                else:
                    failed_batches += 1
                    print(f"   ❌ Batch {batch_num} failed: {response.status_code} - {response.text[:200]}...")
                    
            except Exception as e:
                failed_batches += 1
                print(f"   ❌ Batch {batch_num} failed with exception: {str(e)[:200]}...")
                
            # Progress summary every 10 batches or at the end
            if batch_num % 10 == 0 or batch_num == total_batches:
                success_rate = (successful_batches / batch_num) * 100
                print(f"   📊 Progress: {total_uploaded:,}/{total_rows:,} spans uploaded ({success_rate:.1f}% success rate)")
    
    # Final summary
    print(f"\n📈 Upload Summary:")
    print(f"   Total spans: {total_rows:,}")
    print(f"   Successfully uploaded: {total_uploaded:,}")
    print(f"   Failed batches: {failed_batches}")
    print(f"   Success rate: {(successful_batches / ((total_rows + batch_size - 1) // batch_size)) * 100:.1f}%")
    
    if total_uploaded == total_rows:
        print(f"   ✅ All spans uploaded successfully to project: {PROJECT_NAME}")
        return True
    else:
        print(f"   ⚠️  Partial upload completed. Check failed batches above.")
        return False


def prepare_spans_dataframe(df):
    """Efficiently prepare spans DataFrame with proper data types"""
    # Create clean spans DataFrame with only the required columns
    spans_df = pd.DataFrame()
    
    # Required columns - ensure proper data types
    spans_df['context.trace_id'] = df['context.trace_id'].fillna('').astype(str)
    spans_df['context.span_id'] = df['context.span_id'].fillna('').astype(str)  
    spans_df['name'] = df['name'].fillna('LLM_span').astype(str)
    
    # Handle timestamps
    spans_df['start_time'] = pd.to_datetime(df['start_time'], errors='coerce')
    
    # Calculate end times using latency_ms
    if 'latency_ms' in df.columns:
        latency_ms = pd.to_numeric(df['latency_ms'], errors='coerce').fillna(1000.0)
        spans_df['end_time'] = spans_df['start_time'] + pd.to_timedelta(latency_ms, unit='ms')
    else:
        # Generate random latency if not available
        latency_ms = [random.uniform(100, 2000) for _ in range(len(df))]
        spans_df['end_time'] = spans_df['start_time'] + pd.to_timedelta(latency_ms, unit='ms')
    
    # Handle input/output attributes - ensure they are strings
    if 'attributes.input.value' in df.columns:
        spans_df['attributes.input.value'] = df['attributes.input.value'].fillna('').astype(str)
    
    if 'attributes.output.value' in df.columns:
        spans_df['attributes.output.value'] = df['attributes.output.value'].fillna('').astype(str)
    
    # Handle status code
    if 'status_code' in df.columns:
        spans_df['status_code'] = df['status_code'].fillna('OK').astype(str)
    else:
        spans_df['status_code'] = 'OK'
    
    # Handle span kind
    if 'attributes.openinference.span.kind' in df.columns:
        spans_df['attributes.openinference.span.kind'] = df['attributes.openinference.span.kind'].fillna('LLM').astype(str)
    else:
        spans_df['attributes.openinference.span.kind'] = 'LLM'
    
    # Handle parent_id if present
    if 'parent_id' in df.columns:
        parent_ids = df['parent_id'].fillna('')
        spans_df['parent_id'] = parent_ids.astype(str)
        spans_df.loc[spans_df['parent_id'] == '', 'parent_id'] = None
    
    # Handle token counts if present
    for col_mapping in [
        ('totalTokenCount', 'attributes.llm.token_count.total'),
        ('attributes.llm.token_count.prompt', 'attributes.llm.token_count.prompt'),
        ('attributes.llm.token_count.completion', 'attributes.llm.token_count.completion')
    ]:
        if col_mapping[0] in df.columns:
            spans_df[col_mapping[1]] = pd.to_numeric(df[col_mapping[0]], errors='coerce').fillna(0).astype(int)
    
    return spans_df


In [None]:
# Upload the prepared dataset to Arize with optimized batching
# Batch size of 50K is optimal for most datasets - adjust if needed
upload_spans_to_arize(df_prepared, batch_size=50_000)
