IMPORTS

In [1]:
import os, sys, threading, asyncio, json, uuid, random, string, time
import orjson, shutil
from pathlib import Path
from __future__ import annotations
from concurrent import futures
from types import MappingProxyType
from typing import Final, TypeAlias, Optional, override
from collections.abc import Iterable
from IPython.display import FileLink, display


CONSTANTS

In [None]:
AUTOSAVE_EXECUTOR: Final = futures.ThreadPoolExecutor

UserID : TypeAlias = uuid.UUID
Name : TypeAlias = str
Age : TypeAlias = int

CLASSES

In [3]:
class User():
    """
    Represents a user with a name, age, and unique ID.

    This class defines immutable core attributes (`name`, `age`, `ID`) and a flexible
    `_extras` dictionary for optional dynamic attributes. Instances are hashable and
    comparable by their unique ID.

    Attributes:
        name (str): The user's name.
        age (int): The user's age.
        ID (UUID): The user's unique identifier. Auto-generated if not provided.
        extras (dict): Optional additional data related to the user.
    """

    __slots__ = ("_name", "_age", "_ID", "_extras", "_extras_proxy") 

    def __init__(self, name: Name, age: Age, ID: Optional[UserID] = None) -> None:
        """
        Initialize a User instance.

        Args:
            name (str): The user's name.
            age (int): The user's age.
            ID (UUID, optional): The user's unique ID. Automatically generated if None.

        Raises:
            TypeError: If `name` is not a string, `age` is not an integer, or `ID` is not a UUID.
        """
        if not isinstance(name, str):
            raise TypeError("Name must be a string")
        if not isinstance(age, int):
            raise TypeError("Age must be an integer")
        if ID is not None and not isinstance(ID, UserID):
            raise TypeError("ID must be a UUID")
        
        self._name: Name = name
        self._age: Age = age
        self._ID: Final[UserID] = ID or uuid.uuid4()
        self._extras: dict = {}
        self._extras_proxy = {}

    def __str__(self) -> str:
        """
        Return a human-readable representation of the user.

        Returns:
            str: A string like 'User "Alice" is 25 years old'.
        """
        return f'User "{self._name}" is {self._age} years old'

    def __repr__(self) -> str:
        """
        Return a developer-friendly representation of the user.

        Returns:
            str: A string like 'User(name='Alice', age=25, ID=UUID(...))'.
        """
        return f"User(name={self._name!r}, age={self._age!r}, ID={self._ID!r})"

    def __eq__(self, other) -> bool:
        """
        Determine equality with another User based on unique ID.

        Args:
            other (User): Another user object.

        Returns:
            bool: True if IDs are equal, False otherwise.
        """
        return isinstance(other, User) and self._ID == other._ID

    def __hash__(self) -> int:
        """
        Compute a hash based on the user's unique ID.

        Returns:
            int: Hash value of the user's ID.
        """
        return hash(self._ID)
    
    def serialize_user(self) -> bytes:
        """
        Serialize this user instance into a compact JSON byte string.
        """
        payload = {
            "name": self._name,
            "age": self._age,
            "ID": str(self._ID),
            "extras": self._extras or {}
        }
        return orjson.dumps(payload)
    
    @classmethod
    def deserialize_user(cls, data : bytes) -> User:
        """
        Deserialize from JSON bytes into a User instance.
        """
        obj = orjson.loads(data)
        user = cls(
            name = obj["name"],
            age = obj['age'],
            ID = uuid.UUID(obj['ID'])
        )
        user._extras.update(obj.get("extras", {}))
        return user

    # Properties
    @property
    def name(self) -> Name:
        """str: The user's name (read-only)."""
        return self._name

    @property
    def age(self) -> Age:
        """int: The user's age (read-only)."""
        return self._age

    @property
    def ID(self) -> UserID:
        """UUID: The user's unique identifier (read-only)."""
        return self._ID

    @property
    def extras(self) -> dict:
        """
        dict: Access the optional dynamic attributes.

        Example:
            user.extras["score"] = 100
            print(user.extras.get("score", 0))
        """
        return MappingProxyType(self._extras)


In [4]:
class DataBase():
    """
    In-memory database for storing and managing User objects with efficient
    lookup by ID or name.

    Attributes:
        _database (dict[UserID, User]): Mapping from UserID to User instance.
        _name_index (dict[Name, set[UserID]]): Mapping from user name to a set of UserIDs
            for fast lookup by name.
    """

    __slots__ = ("_database", "_name_index")

    def __init__(self) -> None:
        """
        Initialize an empty database.
        """
        self._database: dict[UserID, User] = {}
        self._name_index: dict[Name, set[UserID]] = {}

    def add_users(self, users: list[User]) -> None:
        """
        Add multiple users to the database.

        Args:
            users (list[User]): List of User instances to add.

        Raises:
            ValueError: If the users list is empty.
            TypeError: If any item in users is not a User instance.
        """
        if not users:
            raise ValueError("User list cannot be empty.")
        for user in users:
            if not isinstance(user, User):
                raise TypeError(f"Expected User instance, got {type(user)}")
            if user.ID not in self._database:
                self._database[user.ID] = user
                self._name_index.setdefault(user.name, set()).add(user.ID)

    def remove_users(self, users: list[User | Name | UserID]) -> None:
        """
        Remove users from the database by User object, UserID, or name.

        Args:
            users (list[User | str | UserID]): List of users to remove.

        Raises:
            ValueError: If the users list is empty.
            TypeError: If an unsupported type is provided.
        """
        if not users:
            raise ValueError("User list cannot be empty.")
        for u in users:
            if isinstance(u, User):
                self._database.pop(u.ID, None)
                self._name_index.get(u.name, set()).discard(u.ID)
            elif isinstance(u, UserID):
                user = self._database.pop(u, None)
                if user:
                    self._name_index.get(user.name, set()).discard(u)
            elif isinstance(u, Name):
                ids = self._name_index.pop(u, set())
                for uid in ids:
                    self._database.pop(uid, None)
            else:
                raise TypeError(f"Unsupported type: {type(u)}")

    def search_users(self, queries: list[User | Name | UserID]) -> list[User]:
        """
        Search for users in the database by User object, UserID, or name.

        Args:
            queries (list[User | str | UserID]): List of queries to search.

        Returns:
            list[User]: List of User objects matching the queries.

        Raises:
            ValueError: If the queries list is empty.
            TypeError: If an unsupported type is provided.
        """
        if not queries:
            raise ValueError("Query list cannot be empty.")
        results = []
        for q in queries:
            if isinstance(q, User):
                if q.ID in self._database:
                    results.append(self._database[q.ID])
            elif isinstance(q, UserID):
                if q in self._database:
                    results.append(self._database[q])
            elif isinstance(q, Name):
                ids = self._name_index.get(q, set())
                results.extend([self._database[uid] for uid in ids])
            else:
                raise TypeError(f"Unsupported type: {type(q)}")
        return results

    def has_user(self, users: list[User | UserID]) -> bool:
        """
        Check whether all given users exist in the database.

        Args:
            users (list[User | UserID]): List of User objects or UserIDs to check.

        Returns:
            bool: True if all users exist, False otherwise.

        Raises:
            ValueError: If the users list is empty.
            TypeError: If an unsupported type is provided.
        """
        if not users:
            raise ValueError("User list cannot be empty.")
        for u in users:
            if isinstance(u, User):
                if u.ID not in self._database:
                    return False
            elif isinstance(u, UserID):
                if u not in self._database:
                    return False
            else:
                raise TypeError(f"Unsupported type: {type(u)}")
        return True

    def clear_database(self) -> None:
        """
        Remove all users from the database and clear the name index.
        """
        self._database.clear()
        self._name_index.clear()

    @property
    def size(self) -> int:
        """
        Return the number of users currently stored in the database.

        Returns:
            int: Number of users in the database.
        """
        return len(self._database)


In [13]:
class FileManager:
    """
    Handles persistence of a DataBase instance to disk, including saving, loading,
    and creating backups of user data.

    Attributes:
        _database (DataBase): The in-memory database instance to persist.
        _filepath (str): The file path used for saving and loading the database.
    """

    __slots__ = ("_database", "_filepath", "_executor", "_autosave_thread", "_autosave_interval", "_stop_event")

    def __init__(self, database: DataBase, filepath: str, autosave_interval: float = 60) -> None:
        """
        Initialize a FileManager with a DataBase and target file path.

        Args:
            database (DataBase): The in-memory database to manage.
            filepath (str): Path to the file where the database will be saved or loaded.
        """
        self._database = database
        self._filepath = filepath
        self._executor = AUTOSAVE_EXECUTOR(max_workers=1)
        self._autosave_interval = autosave_interval
        self._stop_event = threading.Event()
        self._autosave_thread = None

    def ensure_directory(self, path : Path) -> None:
        """
        Ensure the directory exists before save/load.
        """
        _path = Path(path)
        _path.parent.mkdir(parents=True, exist_ok=True)

    def save(self) -> None:
        """
        Serialize all users in the database and save them to the file specified
        by `_filepath`. If the file does not exist, it will be created.

        """
        self.ensure_directory(self._filepath)
        filepath = Path(self._filepath)

        if filepath.exists() and filepath.stat().st_size > 0:
            backup_path = filepath.with_suffix(filepath.suffix + ".bak")
            shutil.copy(filepath, backup_path)

        temp_path = filepath.with_suffix(filepath.suffix + ".tmp")
        
        with open(temp_path, "wb") as f:
            for user in self._database._database.values():
                f.write(user.serialize_user() + b"\n")

        temp_path.replace(filepath)

    def load(self) -> None:
        """
        Load users from the file specified by `_filepath` and populate the database.
        Existing users in the database will be cleared before loading.

        If the file does not exist, this method does nothing.
        """
        self.ensure_directory(self._filepath)
        if not Path(self._filepath).exists():
            return

        users = []
        with open(self._filepath, "rb") as f:
            for line in f:
                if line.strip():
                    users.append(User.deserialize_user(line.strip()))
        self._database.clear_database()
        self._database.add_users(users)

    def backup(self, backup_path: str) -> None:
        """
        Create a backup copy of the database file.

        Args:
            backup_path (str): The destination path for the backup copy.

        Raises:
            FileNotFoundError: If the original database file does not exist.
        """
        self.ensure_directory(self._filepath)

        if not Path(self._filepath).exists():
            return

        shutil.copy(self._filepath, backup_path)

    def start_autosave(self) -> None:
        """
        Start a background thread that autosaves the database every `_autosave_interval` seconds.
        """
        if self._autosave_thread and self._autosave_thread.is_alive():
            return  # Already running

        def autosave_loop():
            while not self._stop_event.is_set():
                self._executor.submit(self.save)
                time.sleep(self._autosave_interval)

        self._autosave_thread = threading.Thread(target=autosave_loop, daemon=True)
        self._autosave_thread.start()

    def stop_autosave(self) -> None:
        """
        Stop the background autosave thread.
        """
        self._stop_event.set()
        if self._autosave_thread:
            self._autosave_thread.join()

    def convert_file(self, from_file: str, to_file: str) -> None:
        """
        Convert binary file to utf-8 encoded readable file
        """
        self.ensure_directory(from_file)
        self.ensure_directory(to_file)
        with open(from_file, "rb") as bf, open(to_file, "w", encoding="utf-8") as ef:
            for line in bf:
                if line.strip():
                    user_data = orjson.loads(line.strip())
                    name = user_data.get("name", "Unknown")
                    age = user_data.get("age", "Unknown")
                    uid = user_data.get("ID", "Unknown")
                    ef.write(f'User "{name}" is {age} years old (ID: {uid})\n')


In [6]:
# --- Helper functions ---
def random_name(length=15) -> str:
    return ''.join(random.choices(string.ascii_letters, k=length))

def random_age() -> int:
    return random.randint(1, 100)

def generate_users(n: int) -> list[User]:
    return [User(name=random_name(), age=random_age()) for _ in range(n)]

Testing

In [None]:
# --- Test parameters ---
AUTOSAVE_INTERVAL: Final[int] = 30
NUM_USERS : Final[int] = 1_000_000 
SEARCH_SIZE : Final[int] = 10_000
ID_SIZE : Final[int] = 10_000
REMOVE_ID_SIZE : Final[int] = 10_000
REMOVE_NAME_SIZE : Final[int] = 10_000
DB_FILE : Final[str] = "test_db.json"
BACKUP_FILE : Final[str] = "test_db_backup.json"
READABLE_FILE : Final[str] = "readable_db.json"

# -----------------------------
# Initialize database and FileManager
db = DataBase()
file_manager = FileManager(db, DB_FILE,  AUTOSAVE_INTERVAL)

# --- Generate users ---
print("Generating users...")
users = generate_users(NUM_USERS)

# --- Add users ---
start = time.perf_counter()
db.add_users(users)
end = time.perf_counter()
print(f"Added {NUM_USERS} users in {end - start:.4f} seconds")

# --- Search users by random names ---
search_names = [random.choice(users).name for _ in range(SEARCH_SIZE)]
start = time.perf_counter()
results = db.search_users(search_names)
end = time.perf_counter()
print(f"Searched {SEARCH_SIZE} random names in {end - start:.4f} seconds, found {len(results)} users")

# --- Check users by random IDs ---
check_ids = [random.choice(users).ID for _ in range(ID_SIZE)]
start = time.perf_counter()
exists = db.has_user(check_ids)
end = time.perf_counter()
print(f"Checked {ID_SIZE} random users in {end - start:.4f} seconds, all exist? {exists}")

# --- Remove users by random IDs ---
remove_ids = [random.choice(users).ID for _ in range(REMOVE_ID_SIZE)]
start = time.perf_counter()
db.remove_users(remove_ids)
end = time.perf_counter()
print(f"Removed {REMOVE_ID_SIZE} users by ID in {end - start:.4f} seconds, new DB size: {db.size}")

# --- Remove users by random names ---
remove_names = [random.choice(users).name for _ in range(REMOVE_NAME_SIZE)]
start = time.perf_counter()
db.remove_users(remove_names)
end = time.perf_counter()
print(f"Removed {REMOVE_NAME_SIZE} users by name in {end - start:.4f} seconds, new DB size: {db.size}")

# --- Save the database to file ---
start = time.perf_counter()
file_manager.save()
end = time.perf_counter()
print(f"Saved database to {DB_FILE} in {end - start:.4f} seconds")

# --- Backup the database ---
start = time.perf_counter()
file_manager.backup(BACKUP_FILE)
end = time.perf_counter()
print(f"Backup database to {BACKUP_FILE} in {end - start:.4f} seconds")

# --- Clear database and reload from file ---
db.clear_database()
print(f"Cleared database, size now: {db.size}")
start = time.perf_counter()
file_manager.load()
end = time.perf_counter()
print(f"Loaded database from {DB_FILE} in {end - start:.4f} seconds, size now: {db.size}")

# --- Optional: Start autosave in background ---
print("Starting autosave (runs every 5 seconds)...")
file_manager._autosave_interval = 5  # change interval to 5 seconds
file_manager.start_autosave()

# Let autosave run a couple of times
time.sleep(12)

# Stop autosave
file_manager.stop_autosave()
print("Autosave stopped.")

file_manager.convert_file(DB_FILE, READABLE_FILE)

display(FileLink(DB_FILE, result_html_prefix="Download DB: "))
display(FileLink(BACKUP_FILE, result_html_prefix="Download Backup: "))
display(FileLink(READABLE_FILE, result_html_prefix="Download Readable: "))


Generating users...
Added 1000000 users in 1.4819 seconds
Searched 10000 random names in 0.0262 seconds, found 10000 users
Checked 10000 random users in 0.0066 seconds, all exist? True
Removed 10000 users by ID in 0.0186 seconds, new DB size: 990043
Removed 10000 users by name in 0.0226 seconds, new DB size: 980191
Saved database to test_db.json in 1.3856 seconds
Backup database to test_db_backup.json in 0.0482 seconds
Cleared database, size now: 0
Loaded database from test_db.json in 5.0377 seconds, size now: 980191
Starting autosave (runs every 5 seconds)...
Autosave stopped.


In [None]:
db.clear_database()
file_manager.save()
display(FileLink(DB_FILE, result_html_prefix="Download DB: "))
display(FileLink(BACKUP_FILE, result_html_prefix="Download Backup: "))
display(FileLink(READABLE_FILE, result_html_prefix="Download Readable: "))

