# Experiments

> How to run experiments

In [1]:
# | default_exp project.experiments

In [2]:
# | export
from functools import wraps
import asyncio
import typing as t
import os

from fastcore.utils import patch
from tqdm import tqdm

from ragas_experimental.project.core import Project
from ragas_experimental.model.pydantic_model import ExtendedPydanticBaseModel as BaseModel
from ragas_experimental.utils import async_to_sync, create_nano_id
from ragas_experimental.dataset import Dataset, BaseModelType
from ragas_experimental.experiment import Experiment
import ragas_experimental.typing as rt

  from .autonotebook import tqdm as notebook_tqdm


## Basics

In [3]:
#| export
# Add this helper function similar to create_dataset_columns in core.ipynb
async def create_experiment_columns(project_id, experiment_id, columns, create_experiment_column_func):
    tasks = []
    for column in columns:
        tasks.append(create_experiment_column_func(
            project_id=project_id,
            experiment_id=experiment_id,
            id=create_nano_id(),
            name=column["name"],
            type=column["type"],
            settings=column["settings"]
        ))
    return await asyncio.gather(*tasks)

In [4]:
# | export
def get_experiment_from_local(
    self: Project,
    name: str,
    model: t.Type[BaseModel]
) -> Experiment:
    """Create an experiment in the local filesystem backend.
    
    Args:
        name: Name of the experiment
        model: Model class defining the experiment structure
        
    Returns:
        Experiment: A new experiment configured to use the local backend
    """
    # Use a UUID as the experiment ID
    experiment_id = create_nano_id()

    # Return a new Experiment instance with local backend
    return Experiment(
        name=name,
        model=model,
        project_id=self.project_id,
        experiment_id=experiment_id,
        backend="local",
        local_root_dir=os.path.dirname(self._root_dir)  # Root dir for all projects
    )

In [5]:
# | export
def get_experiment_from_ragas_app(
    self: Project, 
    name: str, 
    model: t.Type[BaseModel]
) -> Experiment:
    """Create an experiment in the Ragas App backend.
    
    Args:
        name: Name of the experiment
        model: Model class defining the experiment structure
        
    Returns:
        Experiment: A new experiment configured to use the ragas_app backend
    """
    # Create the experiment in the API
    sync_version = async_to_sync(self._ragas_api_client.create_experiment)
    experiment_info = sync_version(
        project_id=self.project_id,
        name=name,
    )

    # Create the columns for the experiment
    column_types = rt.ModelConverter.model_to_columns(model)
    sync_version = async_to_sync(create_experiment_columns)
    sync_version(
        project_id=self.project_id,
        experiment_id=experiment_info["id"],
        columns=column_types,
        create_experiment_column_func=self._ragas_api_client.create_experiment_column,
    )
    
    # Return a new Experiment instance with ragas_app backend
    return Experiment(
        name=name,
        model=model,
        project_id=self.project_id,
        experiment_id=experiment_info["id"],
        ragas_api_client=self._ragas_api_client,
        backend="ragas_app"
    )

In [6]:
from ragas_experimental.utils import get_test_directory

In [7]:
# | export
@patch
def create_experiment(
    self: Project, 
    name: str, 
    model: t.Type[BaseModel],
    backend: t.Optional[rt.SUPPORTED_BACKENDS] = None
) -> Experiment:
    """Create a new experiment.

    Args:
        name: Name of the experiment
        model: Model class defining the experiment structure
        backend: The backend to use (defaults to project's backend if not specified)

    Returns:
        Experiment: An experiment object for managing results
    """
    # If backend is not specified, use the project's backend
    if backend is None:
        backend = self.backend

    # Create experiment using the appropriate backend
    if backend == "local":
        return get_experiment_from_local(self, name, model)
    elif backend == "ragas_app":
        return get_experiment_from_ragas_app(self, name, model)
    else:
        raise ValueError(f"Unsupported backend: {backend}")

In [8]:
# | export
@patch
def get_experiment_path(self: Project, experiment_name: str) -> str:
    """Get the filesystem path for an experiment.
    
    Args:
        experiment_name: The name of the experiment
        
    Returns:
        str: The absolute path to the experiment CSV file
    """
    # Create path relative to project root
    return os.path.join(self._root_dir, "experiments", f"{experiment_name}.csv")

In [9]:
class TestModel(BaseModel):
    name: str
    description: str
    price: float
    url: t.Annotated[str, rt.Url()] = "https://www.google.com"
    tags: t.Annotated[t.Literal["test", "test2"], rt.Select(colors=["red", "blue"])] = "test"


In [10]:
local_root_dir = get_test_directory()
p = Project(project_id="test", root_dir=local_root_dir)
exp = p.create_experiment(name="test experiment", model=TestModel)

assert os.path.exists(p.get_experiment_path(exp.name))

In [13]:
# | export
@patch
def get_experiment_by_id(
    self: Project, 
    experiment_id: str, 
    model: t.Type[BaseModel],
    backend: t.Optional[rt.SUPPORTED_BACKENDS] = None
) -> Experiment:
    """Get an existing experiment by ID.
    
    Args:
        experiment_id: The ID of the experiment to retrieve
        model: The model class to use for the experiment results
        backend: The backend to use (defaults to project's backend)
        
    Returns:
        Experiment: The retrieved experiment
    """
    # If backend is not specified, use the project's backend
    if backend is None:
        backend = self.backend
        
    if backend == "ragas_app":
        # Get experiment info from API
        sync_version = async_to_sync(self._ragas_api_client.get_experiment)
        experiment_info = sync_version(
            project_id=self.project_id,
            experiment_id=experiment_id
        )

        # Return Experiment instance with ragas_app backend
        return Experiment(
            name=experiment_info["name"],
            model=model,
            project_id=self.project_id,
            experiment_id=experiment_id,
            ragas_api_client=self._ragas_api_client,
            backend="ragas_app"
        )
    elif backend == "local":
        # For local backend, this is not a typical operation since we use names
        # We could maintain a mapping of IDs to names, but for now just raise an error
        raise NotImplementedError(
            "get_experiment_by_id is not implemented for local backend. "
            "Use get_experiment with the experiment name instead."
        )
    else:
        raise ValueError(f"Unsupported backend: {backend}")

In [14]:
# | export
@patch
def list_experiment_names(
    self: Project,
    backend: t.Optional[rt.SUPPORTED_BACKENDS] = None
) -> t.List[str]:
    """List all experiments in the project.
    
    Args:
        backend: The backend to use (defaults to project's backend)
        
    Returns:
        List[str]: Names of all experiments in the project
    """
    # If backend is not specified, use the project's backend
    if backend is None:
        backend = self.backend
        
    if backend == "ragas_app":
        # Get all experiments from API
        sync_version = async_to_sync(self._ragas_api_client.list_experiments)
        experiments = sync_version(project_id=self.project_id)
        return [experiment["name"] for experiment in experiments]
    elif backend == "local":
        # Get all CSV files in the experiments directory
        experiments_dir = os.path.join(self._root_dir, "experiments")
        if not os.path.exists(experiments_dir):
            return []
            
        return [
            os.path.splitext(f)[0] for f in os.listdir(experiments_dir)
            if f.endswith('.csv')
        ]
    else:
        raise ValueError(f"Unsupported backend: {backend}")

In [16]:
from fastcore.test import *

In [18]:
test_eq(p.list_experiment_names(), ["test experiment"])

In [19]:
# | export
@patch
def get_experiment(
    self: Project, 
    experiment_name: str, 
    model: t.Type[BaseModel],
    backend: t.Optional[rt.SUPPORTED_BACKENDS] = None
) -> Experiment:
    """Get an existing experiment by name.
    
    Args:
        experiment_name: The name of the experiment to retrieve
        model: The model class to use for the experiment results
        backend: The backend to use (defaults to project's backend if not specified)
        
    Returns:
        Experiment: The retrieved experiment
    """
    # If backend is not specified, use the project's backend
    if backend is None:
        backend = self.backend
        
    if backend == "ragas_app":
        # Search for experiment with given name
        sync_version = async_to_sync(self._ragas_api_client.get_experiment_by_name)
        experiment_info = sync_version(
            project_id=self.project_id,
            experiment_name=experiment_name
        )

        # Return Experiment instance with ragas_app backend
        return Experiment(
            name=experiment_info["name"],
            model=model,
            project_id=self.project_id,
            experiment_id=experiment_info["id"],
            ragas_api_client=self._ragas_api_client,
            backend="ragas_app"
        )
    elif backend == "local":
        # Check if the experiment file exists
        experiment_path = self.get_experiment_path(experiment_name)
        if not os.path.exists(experiment_path):
            raise ValueError(f"Experiment '{experiment_name}' does not exist")
            
        # Create experiment instance with a random ID
        experiment_id = create_nano_id()
        
        # Return Experiment instance with local backend
        return Experiment(
            name=experiment_name,
            model=model,
            project_id=self.project_id,
            experiment_id=experiment_id,
            backend="local",
            local_root_dir=os.path.dirname(self._root_dir)  # Root dir for all projects
        )
    else:
        raise ValueError(f"Unsupported backend: {backend}")

In [27]:
test_eq(p.get_experiment("test experiment", TestModel), exp)

## Git Versioning for Experiments

In [28]:
# | export
import git
from pathlib import Path

In [29]:
# | export
def find_git_root(
        start_path: t.Union[str, Path, None] = None # starting path to search from
    ) -> Path:
    """Find the root directory of a git repository by traversing up from the start path."""
    # Start from the current directory if no path is provided
    if start_path is None:
        start_path = Path.cwd()
    else:
        start_path = Path(start_path).resolve()
    
    # Check if the current directory is a git repository
    current_path = start_path
    while current_path != current_path.parent:  # Stop at filesystem root
        if (current_path / '.git').exists() and (current_path / '.git').is_dir():
            return current_path
        
        # Move up to the parent directory
        current_path = current_path.parent
    
    # Final check for the root directory
    if (current_path / '.git').exists() and (current_path / '.git').is_dir():
        return current_path
    
    # No git repository found
    raise ValueError(f"No git repository found in or above {start_path}")

In [30]:
find_git_root()

Path('/Users/jjmachan/workspace/eglabs/ragas')

In [31]:
git.Repo(find_git_root())

<git.repo.base.Repo '/Users/jjmachan/workspace/eglabs/ragas/.git'>

In [18]:
# | export

def version_experiment(
    experiment_name: str,
    commit_message: t.Optional[str] = None,
    repo_path: t.Union[str, Path, None] = None,
    create_branch: bool = True,
    stage_all: bool = False,
) -> str:
    "Version control the current state of the codebase for an experiment."
    # Default to current directory if no repo path is provided
    if repo_path is None:
        repo_path = find_git_root()
    
    # Initialize git repo object
    repo = git.Repo(repo_path)

    # check if there are any changes to the repo
    has_changes = False
    if stage_all and repo.is_dirty(untracked_files=True):
        print("Staging all changes")
        repo.git.add('.')
        has_changes = True
    elif repo.is_dirty(untracked_files=False):
        print("Staging changes to tracked files")
        repo.git.add('-u')
        has_changes = True
    
    # Check if there are uncommitted changes
    if has_changes:
        # Default commit message if none provided
        if commit_message is None:
            commit_message = f"Experiment: {experiment_name}"
        
        # Commit changes
        commit = repo.index.commit(commit_message)
        commit_hash = commit.hexsha
        print(f"Changes committed with hash: {commit_hash[:8]}")
    else:
        # No changes to commit, use current HEAD
        commit_hash = repo.head.commit.hexsha
        print("No changes detected, nothing to commit")
    
    # Format the branch/tag name
    version_name = f"ragas/{experiment_name}"
    
    # Create branch if requested
    if create_branch:
        branch = repo.create_head(version_name, commit_hash)
        print(f"Created branch: {version_name}")
    
    return commit_hash

In [19]:
# | export
def cleanup_experiment_branches(
    prefix: str = "ragas/", 
    repo_path: t.Union[str, Path, None] = None,
    interactive: bool = True,
    dry_run: bool = False
) -> t.List[str]:
    """Clean up git branches with the specified prefix."""
    # Find the git repository root if not provided
    if repo_path is None:
        try:
            repo_path = find_git_root()
        except ValueError as e:
            raise ValueError(f"Cannot cleanup branches: {str(e)}")
    
    # Initialize git repo object
    repo = git.Repo(repo_path)
    current_branch = repo.active_branch.name
    
    # Get all branches matching the prefix
    matching_branches = []
    for branch in repo.branches:
        if branch.name.startswith(prefix):
            matching_branches.append(branch.name)
    
    if not matching_branches:
        print(f"No branches found with prefix '{prefix}'")
        return []
    
    # Remove current branch from the list if present
    if current_branch in matching_branches:
        print(f"Note: Current branch '{current_branch}' will be excluded from deletion")
        matching_branches.remove(current_branch)
        
    if not matching_branches:
        print("No branches available for deletion after excluding current branch")
        return []
    
    # Show branches to the user
    print(f"Found {len(matching_branches)} branches with prefix '{prefix}':")
    for branch_name in matching_branches:
        print(f"- {branch_name}")
    
    # Handle confirmation in interactive mode
    proceed = True
    if interactive and not dry_run:
        confirm = input(f"\nDelete these {len(matching_branches)} branches? (y/n): ").strip().lower()
        proceed = (confirm == 'y')
    
    if not proceed:
        print("Operation cancelled")
        return []
    
    # Perform deletion
    deleted_branches = []
    for branch_name in matching_branches:
        if dry_run:
            print(f"Would delete branch: {branch_name}")
            deleted_branches.append(branch_name)
        else:
            try:
                # Delete the branch
                repo.git.branch('-D', branch_name)
                print(f"Deleted branch: {branch_name}")
                deleted_branches.append(branch_name)
            except git.GitCommandError as e:
                print(f"Error deleting branch '{branch_name}': {str(e)}")
    
    if dry_run:
        print(f"\nDry run complete. {len(deleted_branches)} branches would be deleted.")
    else:
        print(f"\nCleanup complete. {len(deleted_branches)} branches deleted.")
    
    return deleted_branches

In [20]:
cleanup_experiment_branches(dry_run=True)

No branches found with prefix 'ragas/'


[]

## Experiment Wrapper

In [30]:
#| export 
@t.runtime_checkable
class ExperimentProtocol(t.Protocol):
    async def __call__(self, *args, **kwargs): ...
    async def run_async(self, name: str, dataset: Dataset): ...

In [22]:
# | export
from ragas_experimental.project.naming import MemorableNames

In [31]:
#| export
memorable_names = MemorableNames()

In [32]:
# | export
@patch
def experiment(
    self: Project, 
    experiment_model, 
    name_prefix: str = "", 
    save_to_git: bool = False, 
    stage_all: bool = False,
    backend: t.Optional[rt.SUPPORTED_BACKENDS] = None
):
    """Decorator for creating experiment functions.

    Args:
        experiment_model: The model type to use for experiment results
        name_prefix: Optional prefix for experiment names
        save_to_git: Whether to save experiment state to git
        stage_all: Whether to stage all files when saving to git
        backend: Backend to use for this experiment (overrides project's backend)

    Returns:
        Decorator function that wraps experiment functions
    """

    def decorator(func: t.Callable) -> ExperimentProtocol:
        @wraps(func)
        async def wrapped_experiment(*args, **kwargs):
            # Simply call the function
            return await func(*args, **kwargs)

        # Add run method to the wrapped function
        async def run_async(
            dataset: Dataset, 
            name: t.Optional[str] = None, 
            save_to_git: bool = save_to_git, 
            stage_all: bool = stage_all,
            backend: t.Optional[rt.SUPPORTED_BACKENDS] = backend
        ):
            # If name is not provided, generate a memorable name
            if name is None:
                name = memorable_names.generate_unique_name()
            if name_prefix:
                name = f"{name_prefix}-{name}"

            # Determine which backend to use (parameter > decorator > project default)
            effective_backend = backend if backend is not None else self.backend

            experiment_view = None
            try:
                # Create the experiment view using the specified backend
                experiment_view = self.create_experiment(
                    name=name, 
                    model=experiment_model,
                    backend=effective_backend
                )
                
                # Create tasks for all items
                tasks = []
                for item in dataset:
                    tasks.append(wrapped_experiment(item))

                # Calculate total operations (processing + appending)
                total_operations = len(tasks) * 2  # Each item requires processing and appending
                
                # Use tqdm for combined progress tracking
                results = []
                progress_bar = tqdm(total=total_operations, desc="Running experiment")
                
                # Process all items
                for future in asyncio.as_completed(tasks):
                    result = await future
                    if result is not None:
                        results.append(result)
                    progress_bar.update(1)  # Update for task completion
                
                # Append results to experiment view
                for result in results:
                    experiment_view.append(result)
                    progress_bar.update(1)  # Update for append operation
                    
                progress_bar.close()
                
            except Exception as e:
                # Clean up the experiment if there was an error and it was created
                if experiment_view is not None:
                    try:
                        if effective_backend == "ragas_app" and hasattr(self, "_ragas_api_client"):
                            # Delete the experiment in Ragas App
                            sync_version = async_to_sync(self._ragas_api_client.delete_experiment)
                            sync_version(project_id=self.project_id, experiment_id=experiment_view.experiment_id)
                        elif effective_backend == "local":
                            # Delete the local file
                            experiment_path = self.get_experiment_path(experiment_view.name)
                            if os.path.exists(experiment_path):
                                os.remove(experiment_path)
                        # Could add more backend-specific cleanup here
                    except Exception as cleanup_error:
                        print(f"Failed to clean up experiment after error: {cleanup_error}")
                
                # Re-raise the original exception
                raise e

            # save to git if requested
            if save_to_git:
                repo_path = find_git_root()
                version_experiment(experiment_name=name, repo_path=repo_path, stage_all=stage_all)

            return experiment_view

        wrapped_experiment.__setattr__("run_async", run_async)
        return t.cast(ExperimentProtocol, wrapped_experiment)

    return decorator

In [26]:
# | export
@patch
def langfuse_experiment(
    self: Project, experiment_model, name_prefix: str = "", 
    save_to_git: bool = True, stage_all: bool = True
):
    """Decorator for creating experiment functions with Langfuse integration.

    Args:
        experiment_model: The NotionModel type to use for experiment results
        name_prefix: Optional prefix for experiment names
        save_to_git: Whether to save the experiment state to git
        stage_all: Whether to stage all files when saving to git

    Returns:
        Decorator function that wraps experiment functions with Langfuse observation
    """
    # Use the project's backend as the source of truth
    backend = self.backend

    def decorator(func: t.Callable) -> ExperimentProtocol:
        @wraps(func)
        async def langfuse_wrapped_func(*args, **kwargs):
            # Apply langfuse observation directly here
            trace_name = f"{name_prefix}-{func.__name__}" if name_prefix else func.__name__
            observed_func = observe(name=trace_name)(func)
            return await observed_func(*args, **kwargs)
        
        # Now create the experiment wrapper with our already-observed function
        experiment_wrapper = self.experiment(experiment_model, name_prefix, save_to_git, stage_all)(langfuse_wrapped_func)
        
        return t.cast(ExperimentProtocol, experiment_wrapper)

    return decorator

In [27]:
# | export

# this one we have to clean up
from langfuse.decorators import observe

In [None]:
# | export
@patch
def langfuse_experiment(
    self: Project, 
    experiment_model, 
    name_prefix: str = "", 
    save_to_git: bool = True, 
    stage_all: bool = True,
    backend: t.Optional[rt.SUPPORTED_BACKENDS] = None
):
    """Decorator for creating experiment functions with Langfuse integration.

    Args:
        experiment_model: The model type to use for experiment results
        name_prefix: Optional prefix for experiment names
        save_to_git: Whether to save experiment state to git
        stage_all: Whether to stage all files when saving to git
        backend: Backend to use for this experiment (overrides project's backend)

    Returns:
        Decorator function that wraps experiment functions with Langfuse observation
    """

    def decorator(func: t.Callable) -> ExperimentProtocol:
        @wraps(func)
        async def langfuse_wrapped_func(*args, **kwargs):
            # Apply langfuse observation directly here
            trace_name = f"{name_prefix}-{func.__name__}" if name_prefix else func.__name__
            observed_func = observe(name=trace_name)(func)
            return await observed_func(*args, **kwargs)
        
        # Now create the experiment wrapper with our already-observed function
        experiment_wrapper = self.experiment(
            experiment_model, 
            name_prefix, 
            save_to_git, 
            stage_all,
            backend=backend
        )(langfuse_wrapped_func)
        
        return t.cast(ExperimentProtocol, experiment_wrapper)

    return decorator

In [None]:
import os
# import langfuse
from langfuse import Langfuse

In [None]:

langfuse = Langfuse(
  secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
  public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
  host="https://us.cloud.langfuse.com"
)

In [None]:
@p.langfuse_experiment(TextExperimentModel)
async def test_experiment(item: TestModel):
    return TextExperimentModel(**item.model_dump(), response="test response", is_correct="yes")

In [None]:
# | export
@patch
def mlflow_experiment(
    self: Project, experiment_model, name_prefix: str = "",
    save_to_git: bool = True, stage_all: bool = True
):
    """Decorator for creating experiment functions with mlflow integration.

    Args:
        experiment_model: The NotionModel type to use for experiment results
        name_prefix: Optional prefix for experiment names
        save_to_git: Whether to save the experiment state to git
        stage_all: Whether to stage all files when saving to git

    Returns:
        Decorator function that wraps experiment functions with mlflow observation
    """
    # Use the project's backend as the source of truth
    backend = self.backend

    def decorator(func: t.Callable) -> ExperimentProtocol:
        
        @wraps(func)
        async def mlflow_wrapped_func(*args, **kwargs):
            # Apply mlflow observation directly here
            trace_name = f"{name_prefix}-{func.__name__}" if name_prefix else func.__name__
            observed_func = trace(name=trace_name)(func)
            return await observed_func(*args, **kwargs)
        
        # Now create the experiment wrapper with our already-observed function
        experiment_wrapper = self.experiment(
            experiment_model, name_prefix, save_to_git, stage_all
        )(mlflow_wrapped_func)
        
        return t.cast(ExperimentProtocol, experiment_wrapper)

    return decorator

In [None]:
await test_experiment.run_async(test_dataset)

Running experiment: 100%|██████████| 6/6 [00:01<00:00,  4.01it/s]


Experiment(name=cool_matsumoto, model=TextExperimentModel)

## Compare and Plot

In [29]:
# Example of using experiments with a local backend
import tempfile
import os
from pydantic import BaseModel

# Define a test model for our example
class LocalExperimentModel(BaseModel):
    id: int
    name: str
    description: str
    score: float
    experiment_result: str

# The function we want to run as an experiment
async def process_item(item):
    # This would be your actual experiment logic
    return LocalExperimentModel(
        id=item.id,
        name=item.name,
        description=item.description,
        score=float(item.id) * 0.1,
        experiment_result=f"Result for {item.name}"
    )

# Example code using local backend (will be skipped during docs build)
try:
    # Create a temporary directory for demonstration
    test_dir = get_test_directory()
    
    # Create a new project with local backend
    local_project = Project.create(
        name="test_local_experiment_project",
        description="A test project using local backend for experiments",
        backend="local",
        root_dir=test_dir
    )
    
    # Define a test model for the dataset
    class LocalDatasetModel(BaseModel):
        id: int
        name: str
        description: str
    
    # Create a dataset with local backend
    local_dataset = local_project.create_dataset(
        model=LocalDatasetModel,
        name="test_experiment_dataset"
    )
    
    # Add some entries to the dataset
    for i in range(3):
        entry = LocalDatasetModel(
            id=i,
            name=f"Test Item {i}",
            description=f"Description for test item {i}"
        )
        local_dataset.append(entry)
    
    # Create an experiment function
    @local_project.experiment(LocalExperimentModel)
    async def test_local_experiment(item):
        return await process_item(item)
    
    # Run the experiment
    experiment = await test_local_experiment.run_async(local_dataset)
    
    # Check that the experiment file exists
    experiment_path = local_project.get_experiment_path(experiment.name)
    print(f"Experiment file exists: {os.path.exists(experiment_path)}")
    
    # List experiments
    experiments = local_project.list_experiment_names()
    print(f"Experiments in project: {experiments}")
    
    # Get the experiment
    retrieved_experiment = local_project.get_experiment(
        experiment_name=experiment.name,
        model=LocalExperimentModel
    )
    
    # Load the experiment results
    retrieved_experiment.load()
    print(f"Retrieved experiment: {retrieved_experiment}")
    
    # Convert to DataFrame
    df = retrieved_experiment.to_pandas()
    print("\nExperiment results as DataFrame:")
    print(df)

except Exception as e:
    print(f"Note: Example code failed - this is expected during docs building: {e}")

Note: Example code failed - this is expected during docs building: name 'get_test_directory' is not defined


In [None]:
# | export
from mlflow import trace

@patch
def mlflow_experiment(
    self: Project, 
    experiment_model, 
    name_prefix: str = "",
    save_to_git: bool = True, 
    stage_all: bool = True,
    backend: t.Optional[rt.SUPPORTED_BACKENDS] = None
):
    """Decorator for creating experiment functions with mlflow integration.

    Args:
        experiment_model: The model type to use for experiment results
        name_prefix: Optional prefix for experiment names
        save_to_git: Whether to save experiment state to git
        stage_all: Whether to stage all files when saving to git
        backend: Backend to use for this experiment (overrides project's backend)

    Returns:
        Decorator function that wraps experiment functions with mlflow observation
    """

    def decorator(func: t.Callable) -> ExperimentProtocol:
        
        @wraps(func)
        async def mlflow_wrapped_func(*args, **kwargs):
            # Apply mlflow observation directly here
            trace_name = f"{name_prefix}-{func.__name__}" if name_prefix else func.__name__
            observed_func = trace(name=trace_name)(func)
            return await observed_func(*args, **kwargs)
        
        # Now create the experiment wrapper with our already-observed function
        experiment_wrapper = self.experiment(
            experiment_model, 
            name_prefix, 
            save_to_git, 
            stage_all,
            backend=backend
        )(mlflow_wrapped_func)
        
        return t.cast(ExperimentProtocol, experiment_wrapper)

    return decorator

In [None]:
# | export

import logging
from ragas_experimental.utils import plot_experiments_as_subplots

@patch
def compare_and_plot(self: Project, experiment_names: t.List[str], model: t.Type[BaseModel], metric_names: t.List[str]):
    """Compare multiple experiments and generate a plot.

    Args:
        experiment_names: List of experiment IDs to compare
        model: Model class defining the experiment structure
    """
    results = {}
    for experiment_name in tqdm(experiment_names, desc="Fetching experiments"):
        experiment = self.get_experiment(experiment_name, model)
        experiment.load()
        results[experiment_name] = {}
        for row in experiment:
            for metric in metric_names:
                if metric not in results[experiment_name]:
                    results[experiment_name][metric] = []
                if hasattr(row, metric):
                    results[experiment_name][metric].append(getattr(row, metric))
                else:
                    results[metric].append(None)
                    logging.warning(f"Metric {metric} not found in row: {row}")
                    
    
    
    fig = plot_experiments_as_subplots(results,experiment_ids=experiment_names)
    fig.show()
        
        
        
        
    

In [None]:
from ragas_experimental import BaseModel

class TestDataset(BaseModel):
    question: str
    citations: list[str]
    grading_notes: str
    

class ExperimentModel(TestDataset):
    response: str
    score: str
    score_reason: str


In [None]:
p.compare_and_plot(
    experiment_names=["xenodochial_hoare","confident_liskov"],
    model=ExperimentModel,
    metric_names=["score"]
)

Fetching experiments: 100%|██████████| 2/2 [00:05<00:00,  2.60s/it]
