# Dataset

> A python list like object that contains your evaluation data.

In [None]:
# | default_exp dataset

In [None]:
# | hide

from unittest.mock import MagicMock
from fastcore.test import *

In [None]:
# | export
import typing as t

from fastcore.utils import patch
import pandas as pd

from ragas_experimental.model.pydantic_model import ExtendedPydanticBaseModel as BaseModel
from ragas_experimental.utils import create_nano_id, async_to_sync
from ragas_experimental.backends.ragas_api_client import RagasApiClient

In [None]:
# | export
BaseModelType = t.TypeVar("BaseModelType", bound=BaseModel)

class Dataset(t.Generic[BaseModelType]):
    """A list-like interface for managing dataset entries with backend synchronization.
    
    This class behaves like a Python list while synchronizing operations with the
    Ragas backend API.
    """

    def __init__(
        self,
        name: str,
        model: t.Type[BaseModel],
        project_id: str,
        dataset_id: str,
        ragas_api_client: RagasApiClient,
    ):
        self.name = name
        self.model = model
        self.project_id = project_id
        self.dataset_id = dataset_id
        self._ragas_api_client = ragas_api_client
        self._entries: t.List[BaseModelType] = []

        # Initialize column mapping if it doesn't exist yet
        if not hasattr(self.model, "__column_mapping__"):
            self.model.__column_mapping__ = {}
            
        # Get column mappings from API and update the model's mapping
        column_id_map = self._get_column_id_map(dataset_id=dataset_id)
        
        # Update the model's column mapping with the values from the API
        for field_name, column_id in column_id_map.items():
            self.model.__column_mapping__[field_name] = column_id

    def _get_column_id_map(self: "Dataset", dataset_id: str) -> dict:
        """Get a map of column name to column id"""
        sync_func = async_to_sync(self._ragas_api_client.list_dataset_columns)
        columns = sync_func(project_id=self.project_id, dataset_id=dataset_id)
        column_id_map = {column["name"]: column["id"] for column in columns["items"]}

        # add the column id map to the model, selectively overwriting existing column mapping
        for field in self.model.__column_mapping__.keys():
            if field in column_id_map:
                self.model.__column_mapping__[field] = column_id_map[field]
        return column_id_map

    def __getitem__(
        self, key: t.Union[int, slice]
    ) -> t.Union[BaseModelType, "Dataset[BaseModelType]"]:
        """Get an entry by index or slice."""
        if isinstance(key, slice):
            new_dataset = type(self)(
                name=self.name,
                model=self.model,
                project_id=self.project_id,
                dataset_id=self.dataset_id,
                ragas_api_client=self._ragas_api_client,
            )
            new_dataset._entries = self._entries[key]
            return new_dataset
        else:
            return self._entries[key]

    def __setitem__(self, index: int, entry: BaseModelType) -> None:
        """Update an entry at the given index and sync to backend."""
        if not isinstance(entry, self.model):
            raise TypeError(f"Entry must be an instance of {self.model.__name__}")

        # Get existing entry to get its ID
        existing = self._entries[index]
        
        # Update in backend
        self.save(entry)
        
        # Update local cache
        self._entries[index] = entry

    def __repr__(self) -> str:
        return f"Dataset(name={self.name}, model={self.model.__name__}, len={len(self)})"

    def __len__(self) -> int:
        return len(self._entries)

    def __iter__(self) -> t.Iterator[BaseModelType]:
        return iter(self._entries)

In [None]:
# | hide
import ragas_experimental.typing as rt
from ragas_experimental.backends.factory import RagasApiClientFactory
from ragas_experimental.metric.result import MetricResult

In [None]:
# test model
class TestModel(BaseModel):
    id: int
    name: str
    description: str
    tags: t.Literal["tag1", "tag2", "tag3"]
    result: MetricResult

In [None]:
TestModel.__column_mapping__ = {}

In [None]:
test_model = TestModel(
    id=0, 
    name="test", 
    description="test description", 
    result=MetricResult(result=0.5, reason="test reason"), 
    tags="tag1"
)
test_model

TestModel(id=0, name='test', description='test description', tags='tag1', result=0.5)

In [None]:
test_model.__column_mapping__

{'id': 'id',
 'name': 'name',
 'description': 'description',
 'tags': 'tags',
 'result': 'result',
 'result_reason': 'result_reason'}

In [None]:
import os
from ragas_experimental import Project

In [None]:
RAGAS_APP_TOKEN = "api_key"
RAGAS_API_BASE_URL = "https://api.dev.app.ragas.io"

os.environ["RAGAS_APP_TOKEN"] = RAGAS_APP_TOKEN
os.environ["RAGAS_API_BASE_URL"] = RAGAS_API_BASE_URL

In [None]:
ragas_api_client = RagasApiClientFactory.create()

In [None]:
p = Project(project_id="3d9b529b-c23f-4e87-8a26-dd1923749aa7", ragas_app_client=ragas_api_client)
test_dataset = p.create_dataset(name="TestModel_with_long_text", model=TestModel)
test_dataset

Dataset(name=TestModel_with_long_text, model=TestModel, len=0)

In [None]:
# https://dev.app.ragas.io/dashboard/projects/0a7c4ecb-b313-4bb0-81c0-852c9634ce03/datasets/a4f0d169-ebce-4a2b-b758-0ff49c0c4312
TEST_PROJECT_ID = p.project_id
TEST_DATASET_ID = test_dataset.dataset_id
test_project = await ragas_api_client.get_project(project_id=TEST_PROJECT_ID)
test_dataset = await ragas_api_client.get_dataset(project_id=TEST_PROJECT_ID, dataset_id=TEST_DATASET_ID)
test_dataset

{'id': 'aa1fb420-4820-45a6-9502-6cfb7938b7a3',
 'name': 'TestModel_with_long_text',
 'description': None,
 'created_at': '2025-04-16T18:54:04.355883+00:00',
 'updated_at': '2025-04-16T18:54:04.355883+00:00',
 'version_counter': 0,
 'project_id': '3d9b529b-c23f-4e87-8a26-dd1923749aa7'}

In [None]:
dataset = Dataset(
    name="TestModel", model=TestModel, project_id=TEST_PROJECT_ID, dataset_id=TEST_DATASET_ID, ragas_api_client=ragas_api_client
)

In [None]:
#| export
import ragas_experimental.typing as rt

In [None]:
# | export
@patch
def append(self: Dataset, entry: BaseModelType) -> None:
    """Add a new entry to the dataset and sync to Notion."""
    # Create row inside the table

    # first get the columns for the dataset
    column_id_map = self.model.__column_mapping__

    # create the rows
    row_dict_converted = rt.ModelConverter.instance_to_row(entry)
    row_id = create_nano_id()
    row_data = {}
    for column in row_dict_converted["data"]:
        if column["column_id"] in column_id_map:
            row_data[column_id_map[column["column_id"]]] = column["data"]

    sync_func = async_to_sync(self._ragas_api_client.create_dataset_row)
    response = sync_func(
        project_id=self.project_id,
        dataset_id=self.dataset_id,
        id=row_id,
        data=row_data,
    )
    # add the row id to the entry
    entry._row_id = response["id"]
    # Update entry with Notion data (like ID)
    self._entries.append(entry)

In [None]:
dataset.append(test_model)
len(dataset)

1

In [None]:
# | hide
test_eq(len(dataset), 1)

In [None]:
# | export
@patch
def pop(self: Dataset, index: int = -1) -> BaseModelType:
    """Remove and return entry at index, sync deletion to Notion."""
    entry = self._entries[index]
    # get the row id
    row_id = entry._row_id
    if row_id is None:
        raise ValueError("Entry has no row id. This likely means it was not added or synced to the dataset.")

    # soft delete the row
    sync_func = async_to_sync(self._ragas_api_client.delete_dataset_row)
    sync_func(project_id=self.project_id, dataset_id=self.dataset_id, row_id=row_id)

    # Remove from local cache
    return self._entries.pop(index)

In [None]:
dataset.pop()
len(dataset)

0

In [None]:
# | hide
test_eq(len(dataset), 0)

In [None]:
# now add some more entries
for i in range(10):
    dataset.append(test_model)
len(dataset)

10

In [None]:
# | export
@patch
def load(self: Dataset) -> None:
    """Load all entries from the backend API."""
    # Get all rows
    sync_func = async_to_sync(self._ragas_api_client.list_dataset_rows)
    response = sync_func(
        project_id=self.project_id,
        dataset_id=self.dataset_id
    )
    
    # Get column mapping (ID -> name)
    column_map = {v: k for k, v in self.model.__column_mapping__.items()}
    
    # Clear existing entries
    self._entries.clear()
    
    # Process rows
    for row in response.get("items", []):
        model_data = {}
        row_id = row.get("id")
        
        # Convert from API data format to model fields
        for col_id, value in row.get("data", {}).items():
            if col_id in column_map:
                field_name = column_map[col_id]
                model_data[field_name] = value
        
        # Create model instance
        entry = self.model(**model_data)
        
        # Store row ID for future operations
        entry._row_id = row_id
        
        self._entries.append(entry)

In [None]:
dataset.load()

In [None]:
# | export
@patch
def load_as_dicts(self: Dataset) -> t.List[t.Dict]:
    """Load all entries as dictionaries."""
    # Get all rows
    sync_func = async_to_sync(self._ragas_api_client.list_dataset_rows)
    response = sync_func(
        project_id=self.project_id,
        dataset_id=self.dataset_id
    )
    
    # Get column mapping (ID -> name)
    column_map = {v: k for k, v in self.model.__column_mapping__.items()}
    
    # Convert to dicts with field names
    result = []
    for row in response.get("items", []):
        item_dict = {}
        for col_id, value in row.get("data", {}).items():
            if col_id in column_map:
                field_name = column_map[col_id]
                item_dict[field_name] = value
        result.append(item_dict)
    
    return result

In [None]:
dataset.load_as_dicts()

[{'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  

In [None]:
# | export
@patch
def to_pandas(self: Dataset) -> "pd.DataFrame":
    """Convert dataset to pandas DataFrame."""

    # Make sure we have data
    if not self._entries:
        self.load()
    
    # Convert entries to dictionaries
    data = [entry.model_dump() for entry in self._entries]
    return pd.DataFrame(data)

In [None]:
dataset.to_pandas()

Unnamed: 0,id,name,description,result
0,0,test,test description,0.5
1,0,test,test description,0.5
2,0,test,test description,0.5
3,0,test,test description,0.5
4,0,test,test description,0.5
5,0,test,test description,0.5
6,0,test,test description,0.5
7,0,test,test description,0.5
8,0,test,test description,0.5
9,0,test,test description,0.5


In [None]:
# | export
@patch
def save(self: Dataset, item: BaseModelType) -> None:
    """Save changes to an item to the backend."""
    if not isinstance(item, self.model):
        raise TypeError(f"Item must be an instance of {self.model.__name__}")
    
    # Get the row ID
    row_id = None
    if hasattr(item, "_row_id") and item._row_id:
        row_id = item._row_id
    else:
        # Try to find it in our entries by matching
        for i, entry in enumerate(self._entries):
            if id(entry) == id(item):  # Check if it's the same object
                if hasattr(entry, "_row_id") and entry._row_id:
                    row_id = entry._row_id
                    break
    
    if not row_id:
        raise ValueError("Cannot save: item is not from this dataset or was not properly synced")
    
    # Get column mapping and prepare data
    column_id_map = self.model.__column_mapping__
    row_dict = rt.ModelConverter.instance_to_row(item)["data"]
    row_data = {}
    
    for column in row_dict:
        if column["column_id"] in column_id_map:
            row_data[column_id_map[column["column_id"]]] = column["data"]
    
    # Update in backend
    sync_func = async_to_sync(self._ragas_api_client.update_dataset_row)
    response = sync_func(
        project_id=self.project_id,
        dataset_id=self.dataset_id,
        row_id=row_id,
        data=row_data,
    )
    
    # Find and update in local cache if needed
    for i, entry in enumerate(self._entries):
        if hasattr(entry, "_row_id") and entry._row_id == row_id:
            # If it's not the same object, update our copy
            if id(entry) != id(item):
                self._entries[i] = item
            break

In [None]:
d = dataset[0]
d

TestModel(id=0, name='updated name', description='test description', result=0.5)

In [None]:
d.name = "updated name"
dataset.save(d)
dataset[0].name

'updated name'

In [None]:
dataset.load_as_dicts()

[{'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  'result_reason': 'test reason',
  'name': 'test',
  'id': 0},
 {'description': 'test description',
  'result': 0.5,
  

In [None]:
# | export
@patch
def get(self: Dataset, field_value: str, field_name: str = "_row_id") -> t.Optional[BaseModelType]:
    """Get an entry by field value.
    
    Args:
        id_value: The value to match
        field_name: The field to match against (default: "id")
        
    Returns:
        The matching model instance or None if not found
    """
    # Check if we need to load entries
    if not self._entries:
        self.load()
    
    # Search in local entries first
    for entry in self._entries:
        if hasattr(entry, field_name) and getattr(entry, field_name) == field_value:
            return entry
    
    # If not found and field is "id", try to get directly from API
    if field_name == "id":
        # Get column ID for field
        if field_name not in self.model.__column_mapping__:
            return None
        
        column_id = self.model.__column_mapping__[field_name]
        
        # Get rows with filter
        sync_func = async_to_sync(self._ragas_api_client.list_dataset_rows)
        response = sync_func(
            project_id=self.project_id,
            dataset_id=self.dataset_id,
            # We don't have direct filter support in the API client,
            # so this would need to be implemented there.
            # For now, we've already checked our local cache.
        )
        
        # Would parse response here if we had filtering
    
    return None

In [None]:
d._row_id

'eVpgxsmPGwa8'

In [None]:
test_model = dataset.get(d._row_id)
test_model

TestModel(id=0, name='updated name', description='test description', result=0.5)