In [None]:
import ray

if ray.is_initialized:
    ray.shutdown()
ray.init(ignore_reinit_error=True)

2025-05-11 19:29:07,691	INFO worker.py:1843 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.12
Ray version:,2.44.1
Dashboard:,http://127.0.0.1:8265


[36m(StorageNode pid=98608)[0m Node 4422182032 get chunk_ala_5
[36m(StorageNode pid=98608)[0m Node 4422182032 put chunk_ala_5 -> ['a']
[36m(StorageNode pid=98608)[0m Node 4422182032 get chunk_ala_2
[36m(StorageNode pid=98608)[0m Node 4422182032 put chunk_ala_2 -> ['m', 'a']
[36m(StorageNode pid=98612)[0m Node 4429582608 put chunk_ala_0 -> ['A', 'l']
[36m(StorageNode pid=98612)[0m Node 4429582608 put chunk_ala_4 -> ['o', 't']
[36m(StorageNode pid=98610)[0m Node 4442247056 put chunk_ala_3 -> [' ', 'k']
[36m(StorageNode pid=98611)[0m Node 4354000592 put chunk_ala_1 -> ['a', ' ']
[36m(NameNode pid=98607)[0m NameNode state:
[36m(NameNode pid=98607)[0m 	Artifact ala:
[36m(NameNode pid=98607)[0m 	Chunk chunk_ala_0 -> Node 4429582608
[36m(NameNode pid=98607)[0m 	Chunk chunk_ala_1 -> Node 4354000592
[36m(NameNode pid=98607)[0m 	Chunk chunk_ala_2 -> Node 4422182032
[36m(NameNode pid=98607)[0m 	Chunk chunk_ala_3 -> Node 4442247056
[36m(NameNode pid=98607)[0m 	Chunk c

In [2]:
from dataclasses import dataclass
from typing import Any
import random
from copy import deepcopy


@dataclass
class Artefact:
    name: str
    data: str

In [3]:


@ray.remote
class StorageNode:
    def __init__(self, max_size: int):
        self.size = max_size
        self.data = {}
        self.id = id(self)

    def put(self, name: str, value: str) -> bool:
        print(f'Node {self.id} put {name} -> {value}')
        if len(self.data) < self.size:
            self.data[name] = value
            return True
        else:
            return False

    def get(self, name: str) -> Any | None:
        print(f'Node {self.id} get {name}')
        return self.data.get(name, None)

    def drop(self, name: str) -> bool:
        print(f'Node {self.id} drop {name}')
        if name in self.data:
            del self.data[name]
            return True
        else:
            return False

    def is_full(self) -> bool:
        return len(self.data) >= self.size

    def list_state(self) -> None:
        for key, value in self.data.items():
            print(f'Node {self.id} state: {key} -> {value}')
        print("--------------------------")

    def id(self) -> str:
        return str(self.id)

In [4]:
from enum import StrEnum, auto


class NodeState(StrEnum):
    healthy = auto()
    failing = auto()


@ray.remote
class NodeMonitor():
    def __init__(self, nodes: list[StorageNode]):
        self.nodes = {ray.get(node.id.remote()): node for node in nodes}
        self.node_states = {
            ray.get(node.id.remote()): NodeState.healthy for node in nodes}

    def get_node_state(self, node_id: str) -> NodeState:
        if node_id not in self.nodes:
            raise ValueError(f"Node {node_id} not found")
        return self.node_states.get(node_id, NodeState.healthy)

    def is_node_healthy(self, node: StorageNode) -> bool:
        node_id = ray.get(node.id.remote())
        if node_id not in self.nodes:
            raise ValueError(f"Node {node_id} not found")
        return self.get_node_state(node_id) == NodeState.healthy

    def set_node_state(self, node_id: str, state: NodeState) -> None:
        if node_id not in self.nodes:
            raise ValueError(f"Node {node_id} not found")
        self.node_states[node_id] = state

In [5]:


@ray.remote
class NameNode:
    def __init__(self, nodes_count=5, max_size=20):
        self.nodes = [StorageNode.remote(max_size) for _ in range(nodes_count)]
        self.artefacts = {}
        self.monitor = NodeMonitor.remote(self.nodes)

    def save(self, artefact: Artefact, chunk_count: int = None, replicas: int = 1) -> dict:
        """
        Save data to the storage nodes.
        :param data: List of data to be saved
        :param chunk_count: Number of chunks to split the data into (default: None - split into free nodes)
        :param replicas: Number of replicas to create (default: 1) 
        """

        if artefact.name in self.artefacts:
            self.delete(artefact.name)

        free_nodes = [node for node in self.nodes if not ray.get(
            node.is_full.remote())]

        if len(free_nodes) == 0:
            return {'status': 'error', 'message': 'No free nodes available'}

        if chunk_count is not None and chunk_count > len(free_nodes):
            return {'status': 'error', 'message': f'Chunk count {chunk_count} exceeds available nodes {len(free_nodes)}'}

        # Split data into chunks
        data = list(artefact.data)
        chunks = []
        chunk_size = len(data) // (chunk_count or len(free_nodes))
        for i in range(0, len(data), chunk_size):
            chunks.append(data[i:i + chunk_size])

        artifact_id = artefact.name
        self.artefacts[artifact_id] = [[] for _ in range(len(chunks))]
        for _ in range(replicas):
            result = self.__load_chunks_on_nodes(
                artifact_id, chunks, free_nodes, self.artefacts[artifact_id])
            if result.get('status') != 'ok':
                print('Error occurred when inserting artefact',
                      result.get('message', ""))
            self.artefacts[artifact_id] = result['state']

        return {'status': 'ok', 'message': f'Artifact {artifact_id} saved', 'id': artifact_id}

    def delete(self, artefact_id: str) -> dict:
        """
        Delete artefact from the storage nodes.
        :param artefact_id: ID of the artefact to be deleted
        """
        if artefact_id not in self.artefacts:
            return {'status': 'error', 'message': f'Artifact {artefact_id} not found'}

        state = self.artefacts[artefact_id]
        for chunk_id, nodes in enumerate(state):
            for node in nodes:
                key = self.__chunk_key(artefact_id, chunk_id)
                if not ray.get(node.drop.remote(key)):
                    raise Exception(
                        f"Failed to drop chunk {key} from node {node}")
        del self.artefacts[artefact_id]
        return {'status': 'ok', 'message': f'Artifact {artefact_id} deleted'}

    def __load_chunks_on_nodes(self, artefact_id: str, chunks: list, nodes: list, state: list) -> dict:
        """
        Load chunks on free nodes.
        :param chunks: List of chunks to be loaded
        :param free_nodes: List of free nodes to load the chunks on
        """
        chunks_with_index = list(enumerate(chunks))
        nodes_copy = deepcopy(nodes)

        random.shuffle(chunks_with_index)

        for chunk_id, chunk in chunks_with_index:
            key = self.__chunk_key(artefact_id, chunk_id)
            random.shuffle(nodes_copy)
            for node in nodes_copy:
                is_full = ray.get(node.is_full.remote())
                contains = ray.get(node.get.remote(key))
                if not is_full and not contains:
                    if not node.put.remote(key, chunk):
                        raise Exception(
                            f"Failed to put chunk {key} into node {node}")
                    state[chunk_id].append(node)
                    break
            else:
                return {
                    'status': 'full',
                    'message': f'Failed to find a node for chunk {chunk_id} - all nodes full',
                    'state': state
                }

        return {'status': 'ok', 'message': f'{len(chunks)} chunks loaded', 'state': state}

    def load_chunk(self, artifact_id: str, chunk_id: int) -> dict:
        if artifact_id not in self.artefacts:
            return {'status': 'error', 'message': f'Artifact {artifact_id} not found'}
        state = self.artefacts[artifact_id]
        if chunk_id not in state:
            return {'status': 'error', 'message': f'Chunk {chunk_id} not found in artifact {artifact_id}'}
        nodes = state[chunk_id]
        node = next(n for n in nodes if ray.get(
            self.monitor.is_node_healthy.remote(n)))
        key = self.__chunk_key(artifact_id, chunk_id)
        chunk = ray.get(node.get.remote(key))
        return {'status': 'ok', 'message': f'Chunk {chunk_id} loaded', 'data': chunk}

    def load(self, artifact_id: str) -> dict:
        if artifact_id not in self.artefacts:
            return {'status': 'error', 'message': f'Artifact {artifact_id} not found'}

        state = self.artefacts[artifact_id]
        result = []
        for chunk_id, nodes in enumerate(state):
            node = next(
                n for n in nodes if ray.get(self.monitor.is_node_healthy.remote(n)))
            print(
                f'Loading chunk {chunk_id} from node {node}')
            key = self.__chunk_key(artifact_id, chunk_id)
            chunk = ray.get(node.get.remote(key))
            if chunk is None:
                return {'status': 'error', 'message': f'Chunk {chunk_id} not found in artifact {artifact_id}'}
            result.extend(chunk)
        return {'status': 'ok', 'message': f'Artifact {artifact_id} loaded', 'data': "".join(result)}

    # UTILS
    def pprint_state(self):
        print(f'NameNode state:')
        for id, state in self.artefacts.items():
            print(f'\tArtifact {id}:')
            for chunk_id, nodes in enumerate(state):
                for node in nodes:
                    print(
                        f'\tChunk {self.__chunk_key(id, chunk_id)} -> Node {ray.get(node.id.remote())}')
                    ray.get(node.list_state.remote())
        print("--------------------------")
        return True

    def get_monitor(self):
        return self.monitor

    def get_nodes(self):
        return self.nodes

    def __chunk_key(self, artifact_id: str, chunk_id: int) -> str:
        return f'chunk_{artifact_id}_{chunk_id}'

In [6]:
core = NameNode.remote()

In [7]:
# Example usage
data = "Ala ma kota"
data = Artefact(name="ala", data=data)
res = core.save.remote(data)
print(ray.get(res))

{'status': 'ok', 'message': 'Artifact ala saved', 'id': 'ala'}


In [8]:
ray.get(core.pprint_state.remote())

True

In [9]:
ray.get(core.load.remote("ala"))

{'status': 'ok', 'message': 'Artifact ala loaded', 'data': 'Ala ma kota'}

In [10]:
data = Artefact(name='kotek', data='Wlazł kotek na płotek')
res = core.save.remote(data, replicas=2)
print(ray.get(res))

{'status': 'ok', 'message': 'Artifact kotek saved', 'id': 'kotek'}


In [11]:
ray.get(core.pprint_state.remote())
ray.get(core.load.remote("kotek"))

{'status': 'ok',
 'message': 'Artifact kotek loaded',
 'data': 'Wlazł kotek na płotek'}

In [12]:
ray.get(core.delete.remote("ala"))
ray.get(core.pprint_state.remote())

True

In [13]:
nodes = ray.get(core.get_nodes.remote())
ray.get(nodes[1].list_state.remote())

In [14]:
data = Artefact(name='kotek', data='I mruga')
res = core.save.remote(data)
print(ray.get(res))
ray.get(core.load.remote("kotek"))

{'status': 'ok', 'message': 'Artifact kotek saved', 'id': 'kotek'}


{'status': 'ok', 'message': 'Artifact kotek loaded', 'data': 'I mruga'}

In [15]:
ray.get(core.pprint_state.remote())

True

In [16]:
ray.shutdown()