In [16]:
from csv import DictReader
from functools import cached_property, cache


class logger:  # noqa
    log = staticmethod(print)  # noqa

In [17]:
import doctest
from abc import ABC, abstractmethod
from collections import defaultdict

UNDEFINED = object()


class AbstractSerialMapper(ABC):

    def __init__(self):
        self._data = []

    @abstractmethod
    def add(self, value, *args, **kwargs):
        pass

    def __len__(self):
        return len(self._data)


class SerialUnidirectionalMapper(AbstractSerialMapper):
    def __init__(self):
        super().__init__()

    def add(self, data, *args, key=None, **kwargs):
        if key is UNDEFINED or key is None:
            self._data.append([data])
        else:
            self._data[key].append(data)


class SerialBidirectionalMapper(AbstractSerialMapper):

    def __init__(self):
        super().__init__()
        self._data = []
        self._inverse_data = defaultdict(lambda: UNDEFINED)

    def __getitem__(self, key):
        return self._data[key]

    def add(self, value, *args, **kwargs):
        self._inverse_data[value] = len(self._data)
        self._data.append(value)

    @property
    def inverse(self):
        return self._inverse_data


class InMemory2DIndexer(object):
    """
    An in-memory 2D indexer for efficiently mapping rows and columns from CSV data.

    This class provides an efficient mechanism to map and store large datasets
    using bijective mappings between row/column names and their corresponding IDs.

    Attrs:
        LIMIT_TO_LOAD_IN_MEMORY (int): Maximum number of lines to load into memory.
        DEFAULT_CSV_DATA_INSTANTIATOR (callable): Default instantiator to use ...

    Args:
        row_name (str): The name representing the rows.
        column_name (str): The name representing the columns.

    Methods:
        index_from_csv(file_path, row_header, column_header, exclude_columns, instantiator, limit, verbose):
            Indexes data from a CSV file, mapping rows and columns by their headers.

    Example usage:
        >>> indexer1 = InMemory2DIndexer(row_name="ID", column_name="Attribute")
        >>> indexer1._row_name
        'ID'
        >>> indexer1._column_name
        'Attribute'

        >>> indexer2 = InMemory2DIndexer(row_name="userId", column_name="movieId")
        >>> # Testing with correct row and column headers
        >>> indexer2.index_from_csv(
        ...     file_path="./ml-32m/ratings.csv",
        ...     row_header="userId",
        ...     column_header="movieId"
        ... )  # # doctest:+ELLIPSIS
        Limit of entries (.i.e 10) to load has been reached. Exiting without loading the rest...
        <__main__.InMemory2DIndexer object at ...
        >>> # Testing with incorrect row header
        >>> indexer2.index_from_csv(
        ...     file_path="./ml-32m/ratings.csv",
        ...     row_header="WrongHeader",
        ...     column_header="movieId"
        ... )
        Traceback (most recent call last):
          File "/usr/lib/python3.11/doctest.py", line 1355, in __run
            exec(compile(example.source, filename, "single",
          File "<doctest __main__.InMemory2DIndexer[3]>", line 1, in <module>
            indexer.index_from_csv(
          File "/tmp/ipykernel_108131/2003057470.py", line 129, in index_from_csv
            row_header is None or row_header == self._row_name
        AssertionError: Invalid row header provided: 'userId'. The row header must be either None or match the previously defined row header 'ID'.

        >>> # Testing with correct row and column headers
        >>> indexed_data2 = InMemory2DIndexer(row_name="userId", column_name="movieId")
        >>> indexed_data2 = indexer.index_from_csv(
        ...     file_path="./ml-32m/ratings.csv",
        ...     row_header="userId",
        ...     column_header="movieId",
        ...     limit=400
        ... )  # Expected to pass without error
        Limit of entries (.i.e 400) to load has been reached. Exiting without loading the rest...
        <__main__.InMemory2DIndexer object at ...
        >>> # Testing the content of the indexer
        >>> len(indexed_data._row_to_id_bmap._data) == 5
        True
    """

    LIMIT_TO_LOAD_IN_MEMORY = 1_000_000_000_000

    DEFAULT_CSV_DATA_INSTANTIATOR = lambda _cls, data, columns: tuple(
        data[k] for k in (columns if columns else data)
    )

    def __init__(self, row_name: str, column_name: str):  # typing
        self._row_name = row_name
        self._column_name = column_name
        self._column_to_id_bmap = SerialBidirectionalMapper() # bijective mapping
        self._row_to_id_bmap = SerialBidirectionalMapper()
        self._data_by_row_id = SerialUnidirectionalMapper() # subjective mapping
        self._data_by_column_id = SerialUnidirectionalMapper()

        # SOME PRIVATE SAFETY CHECK PROPERTIES
        self.__is_indexed = False

    def index_from_csv(
        self,
        *,
        file_path: str,
        row_header: str,
        column_header: str,
        data_columns=None,
        instantiator=None,
        limit=None,
        verbose=False,
    ):  # typing

        assert (
            row_header is None or row_header == self._row_name
        ), f"Invalid row header provided: '{row_header}'. The row header must be either None or match the previously defined row header '{self._row_name}'."
        assert (
            column_header is None or column_header == self._column_name
        ), f"Invalid column header provided: '{column_header}'. The column header must be either None or match the previously defined column header '{self._column_name}'."

        instantiator = instantiator or self.DEFAULT_CSV_DATA_INSTANTIATOR
        limit_to_load = limit or self.LIMIT_TO_LOAD_IN_MEMORY
        indexed_count = 0

        with open(file_path, mode="r", newline="") as csvfile:
            for line in DictReader(csvfile):
                indexed_count += 1
                row, column = line[row_header], line[column_header]

                row_id = self._row_to_id_bmap.inverse[row]
                column_id = self._column_to_id_bmap.inverse[column]

                if row_id is UNDEFINED:
                    # This row is a new one, so add it
                    self._row_to_id_bmap.add(row)

                if column_id is UNDEFINED:
                    # This column is a new one, so add it
                    self._column_to_id_bmap.add(column)

                # Add the data without the useless columns for indexing
                data = instantiator(line, data_columns)

                self._data_by_row_id.add(data=data, at=row_id)
                self._data_by_column_id.add(data=data, at=column_id)

                if verbose:
                    logger.log(
                        f"Indexed the line {indexed_count} of {file_path} successfully"
                    )

                if indexed_count == limit_to_load:
                    logger.log(
                        f"Limit of entries (.i.e {limit_to_load}) to load has been reached. Exiting without loading the rest... "
                    )
                    break
        self.__is_indexed = True
        return self  # To enable method chaining (Fluent pattern)

    @cached_property
    def matrix(self):  # typing
        if not self.__is_indexed:
            raise AssertionError(
                "Cannot define matrix if the indexer is not indexed, ensure you called `index_from_csv()`"
            )
        return [
            [1 for movie_id in range(indexed_data.columns_count)]
            for user_id in range(indexed_data.rows_count)
        ]

    @cached_property
    def numpy_matrix(self):  # typing
        import numpy as np

        return np.array(self.matrix)

    @property
    def rows_count(self) -> int:
        return len(self._row_to_id_bmap)

    @property
    def columns_count(self) -> int:
        return len(self._column_to_id_bmap)


# FIXME: Replace it by proper unit tests at the end
# doctest.testmod() # Skip this tests for the moment
indexed_data = InMemory2DIndexer(
    row_name="userId", column_name="movieId"
).index_from_csv(
    file_path="./ml-32m/ratings.csv", row_header="userId", column_header="movieId", limit=400
)

# doctest.testmod()
# indexed_data.find_row_by_row_id()
# indexed_data.find_column_by_colum_id()
# indexed_data.find_column_by_id()

Limit of entries (.i.e 400) to load has been reached. Exiting without loading the rest... 


In [19]:
# From here, we will use the domain vocabulary
data_matrix = indexed_data.matrix
data_np_matrix = indexed_data.numpy_matrix
indexed_data.rows_count, indexed_data.columns_count


(5, 327)