### Potrzebne importy

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
import time
import numpy as np
from numpy import random
import os
import pickle

2025-05-13 00:05:38,080	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


### Ray init


In [2]:
if ray.is_initialized:
    ray.shutdown()
ray.init(ignore_reinit_error=True)

2025-05-13 00:05:46,391	INFO worker.py:1843 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.9
Ray version:,2.44.1
Dashboard:,http://127.0.0.1:8265


[36m(NameNode pid=33068)[0m [36mray::DataNode.read()[39m (pid=1600, ip=127.0.0.1, actor_id=ead0b09829635e0ccc57cc4601000000, repr=<__main__.DataNode object at 0x0000020F01227250>)
[36m(NameNode pid=33068)[0m   File "python\\ray\\_raylet.pyx", line 1895, in ray._raylet.execute_task
[36m(NameNode pid=33068)[0m   File "python\\ray\\_raylet.pyx", line 1835, in ray._raylet.execute_task.function_executor
[36m(NameNode pid=33068)[0m   File "c:\Users\stani\AppData\Local\Programs\Python\Python311\Lib\site-packages\ray\_private\function_manager.py", line 689, in actor_method_executor
[36m(NameNode pid=33068)[0m     return method(__ray_actor, *args, **kwargs)
[36m(NameNode pid=33068)[0m            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[36m(NameNode pid=33068)[0m   File "c:\Users\stani\AppData\Local\Programs\Python\Python311\Lib\site-packages\ray\util\tracing\tracing_helper.py", line 463, in _resume_span
[36m(NameNode pid=33068)[0m     return method(self, *_args, **_kwargs)
[36m(

### DataNode

In [3]:
class NodeUnactiveException(Exception):
    def __init__(self, node_id: str):
        super().__init__(f"DataNode '{node_id}' is inactive")
        self.node_id = node_id

In [4]:
@ray.remote
class DataNode:
    def __init__(self, node_id):
        self.node_id = node_id
        self.chunks = {}
        self.ready = True
    def disable(self):
        self.ready = False
    def enable(self):
        self.ready = True
    def store(self, chunk_id, data):
        self.chunks[chunk_id] = data
    def read (self, chunk_id):
        if not self.ready:
            raise NodeUnactiveException(self.node_id)
        return self.chunks[chunk_id]
    def get_chunks(self):
        if not self.ready:
            raise NodeUnactiveException(self.node_id)
        return dict(self.chunks)
    def remove_chunk(self, chunk_id):
        if not chunk_id in self.chunks: raise ValueError(f"There is not such chunk_id")
        self.chunks.pop(chunk_id)

### NameNode

In [5]:
@ray.remote
class NameNode:
    def __init__(self, replication=2, chunk_size = 10):
        self.chunk_size = chunk_size
        self.replication = replication
        self.node_map = {}
        self.block_locations = {}
        self.artifacts = {}
    def register (self, node_id, node):
        if node_id in self.node_map: raise ValueError(f"{node_id} already taken")
        self.node_map[node_id] = node
    
    def _store(self, name, artifact):
        node_ids = list(self.node_map.keys())
        count = len(node_ids)
        if count == 0:
            raise RuntimeError("Register Storage Nodes")
        chunks = [artifact[i:i+self.chunk_size] for i in range(0, len(artifact), self.chunk_size)]
        chunk_ids = []
        for idx, chunk in enumerate(chunks):
            chunk_id = f"{name}_chunk_{idx}"
            chunk_ids.append(chunk_id)
            start = idx % count
            locs = []
            for r in range(self.replication):
                nid = node_ids[(start + r) % count]
                self.node_map[nid].store.remote(chunk_id, chunk)
                locs.append(nid)
            self.block_locations[chunk_id] = locs
        self.artifacts[name] = chunk_ids
    def read_artifact(self, name):
        if not name in self.artifacts: raise ValueError(f"There is not such artifact {name}")
        chunk_ids = self.artifacts[name]
        parts = []
        for chunk_id in chunk_ids:
            chunk = None
            for node_id in self.block_locations[chunk_id]:
                node = self.node_map[node_id]
                try:
                    data = ray.get(node.read.remote(chunk_id))
                    break
                except NodeUnactiveException as e:
                    print(e)
            if data is None:
                raise RuntimeError(f"Failed to read chunk {chunk_id}")
            parts.append(data)
        return "".join(parts)

    def delete_artifact(self, name):
        if name not in self.artifacts:
            raise ValueError(f"Artifact {name} not found")
        for chunk_id in self.artifacts[name]:
            for node_id in self.block_locations[chunk_id]:
                self.node_map[node_id].remove_chunk.remote(chunk_id)
            del self.block_locations[chunk_id]
        del self.artifacts[name]
    def reupload_artifact(self, name , artifact):
        self.delete_artifact(name)
        self._store(name, artifact)

    def upload_artifact(self,name, artifact):
        if name in self.artifacts: raise ValueError(f"Artifact {name} exists")
        self._store(name, artifact)
    def list_artifacts(self):
        return list(self.artifacts.keys())

    def list_nodes(self):
        return list(self.node_map.keys())

    def locate(self, name):
        if name not in self.artifacts:
            raise ValueError(f"Artifact {name} not found")
        return { chunk_id: self.block_locations[chunk_id]
                 for chunk_id in self.artifacts[name] }

    def list_node(self, node_id):
        if node_id not in self.node_map:
            raise ValueError(f"Node {node_id} not found")
        return ray.get(self.node_map[node_id].get_chunks.remote())



### Testing

adding first artifact

In [6]:
nn = NameNode.remote(replication=2)
dn1 = DataNode.remote("node1")
dn2 = DataNode.remote("node2")
dn3 = DataNode.remote("node3")

ray.get(nn.register.remote("node1", dn1))
ray.get(nn.register.remote("node2", dn2))
ray.get(nn.register.remote("node3", dn3))

ray.get(nn.upload_artifact.remote("artifact1", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"))

print("read artifact:", ray.get(nn.read_artifact.remote("artifact1")))
print("all artifacts:", ray.get(nn.list_artifacts.remote()))
print("all nodes:", ray.get(nn.list_nodes.remote()))
print("locations:", ray.get(nn.locate.remote("artifact1")))

print("node1 chunks:", ray.get(nn.list_node.remote("node1")))
print("node2 chunks:", ray.get(nn.list_node.remote("node2")))
print("node3 chunks:", ray.get(nn.list_node.remote("node3")))

read artifact: ABCDEFGHIJKLMNOPQRSTUVWXYZ
all artifacts: ['artifact1']
all nodes: ['node1', 'node2', 'node3']
locations: {'artifact1_chunk_0': ['node1', 'node2'], 'artifact1_chunk_1': ['node2', 'node3'], 'artifact1_chunk_2': ['node3', 'node1']}
node1 chunks: {'artifact1_chunk_0': 'ABCDEFGHIJ', 'artifact1_chunk_2': 'UVWXYZ'}
node2 chunks: {'artifact1_chunk_0': 'ABCDEFGHIJ', 'artifact1_chunk_1': 'KLMNOPQRST'}
node3 chunks: {'artifact1_chunk_1': 'KLMNOPQRST', 'artifact1_chunk_2': 'UVWXYZ'}


In [7]:
ray.get(nn.upload_artifact.remote("artifact2", "1234567890"*5))

print("read artifact2:", ray.get(nn.read_artifact.remote("artifact2")))
print("all artifacts:", ray.get(nn.list_artifacts.remote()))
print("all nodes:", ray.get(nn.list_nodes.remote()))
print("locations artifact2:", ray.get(nn.locate.remote("artifact2")))
print("node1 chunks:", ray.get(nn.list_node.remote("node1")))
print("node2 chunks:", ray.get(nn.list_node.remote("node2")))
print("node3 chunks:", ray.get(nn.list_node.remote("node3")))

read artifact2: 12345678901234567890123456789012345678901234567890
all artifacts: ['artifact1', 'artifact2']
all nodes: ['node1', 'node2', 'node3']
locations artifact2: {'artifact2_chunk_0': ['node1', 'node2'], 'artifact2_chunk_1': ['node2', 'node3'], 'artifact2_chunk_2': ['node3', 'node1'], 'artifact2_chunk_3': ['node1', 'node2'], 'artifact2_chunk_4': ['node2', 'node3']}
node1 chunks: {'artifact1_chunk_0': 'ABCDEFGHIJ', 'artifact1_chunk_2': 'UVWXYZ', 'artifact2_chunk_0': '1234567890', 'artifact2_chunk_2': '1234567890', 'artifact2_chunk_3': '1234567890'}
node2 chunks: {'artifact1_chunk_0': 'ABCDEFGHIJ', 'artifact1_chunk_1': 'KLMNOPQRST', 'artifact2_chunk_0': '1234567890', 'artifact2_chunk_1': '1234567890', 'artifact2_chunk_3': '1234567890', 'artifact2_chunk_4': '1234567890'}
node3 chunks: {'artifact1_chunk_1': 'KLMNOPQRST', 'artifact1_chunk_2': 'UVWXYZ', 'artifact2_chunk_1': '1234567890', 'artifact2_chunk_2': '1234567890', 'artifact2_chunk_4': '1234567890'}


In [8]:
ray.get(dn2.disable.remote())

print("read artifact1 with node2 disabled:",
      ray.get(nn.read_artifact.remote("artifact1")))


read artifact1 with node2 disabled: ABCDEFGHIJKLMNOPQRSTUVWXYZ


In [9]:
ray.get(dn2.enable.remote())

In [10]:
# Dodanie dwóch nowych storage node’ów i ich rejestracja
dn4 = DataNode.remote("node4")
dn5 = DataNode.remote("node5")

ray.get(nn.register.remote("node4", dn4))
ray.get(nn.register.remote("node5", dn5))

# Definicje dwóch nowych artefaktów
artifact3 = """Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed non risus. Suspendisse lectus tortor, dignissim sit amet, adipiscing nec, ultricies sed, dolor. 
Cras elementum ultrices diam. Maecenas ligula massa, varius a, semper congue, euismod non, mi. Proin porttitor, orci nec nonummy molestie, enim est eleifend mi, non fermentum diam nisl sit amet erat."""

artifact4 = """Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. 
Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit."""

# Upload artefaktów jako artifact3 i artifact4
ray.get(nn.upload_artifact.remote("artifact3", artifact3))
ray.get(nn.upload_artifact.remote("artifact4", artifact4))

# Odczyt każdego artefaktu i wypisanie stanów
print("read artifact3:", ray.get(nn.read_artifact.remote("artifact3")))
print("read artifact4:", ray.get(nn.read_artifact.remote("artifact4")))

print("all artifacts:", ray.get(nn.list_artifacts.remote()))
print("all nodes:", ray.get(nn.list_nodes.remote()))

print("locations artifact3:", ray.get(nn.locate.remote("artifact3")))
print("locations artifact4:", ray.get(nn.locate.remote("artifact4")))

print("node1 chunks:", ray.get(nn.list_node.remote("node1")))
print("node2 chunks:", ray.get(nn.list_node.remote("node2")))
print("node3 chunks:", ray.get(nn.list_node.remote("node3")))
print("node4 chunks:", ray.get(nn.list_node.remote("node4")))
print("node5 chunks:", ray.get(nn.list_node.remote("node5")))


read artifact3: Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed non risus. Suspendisse lectus tortor, dignissim sit amet, adipiscing nec, ultricies sed, dolor. 
Cras elementum ultrices diam. Maecenas ligula massa, varius a, semper congue, euismod non, mi. Proin porttitor, orci nec nonummy molestie, enim est eleifend mi, non fermentum diam nisl sit amet erat.
read artifact4: Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. 
Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit.
all artifacts: ['artifact1', 'artifact2', 'artifact3', 'artifact4']
all nodes: ['node1', 'node2', 'node3', 'node4', 'node5']
locations artifact3: {'artifact3_chunk_0': ['node1', 'node2'], 'artifact3_chunk_1': ['node2', 'node3'], 'artifact3_chunk_2': ['node3', 'node4'], 'artifact3_chunk_3': ['node4', 'node5'], 'artif

In [11]:
artifact5 = """At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium 
voluptatum deleniti atque corrupti quos dolores et quas molestias excepturi sint occaecati cupiditate 
non provident, similique sunt in culpa qui officia deserunt mollitia animi, id est laborum et dolorum fuga."""

ray.get(nn.reupload_artifact.remote("artifact1", artifact5))

print("read artifact1 after reupload:", ray.get(nn.read_artifact.remote("artifact1")))
print("locations artifact1:", ray.get(nn.locate.remote("artifact1")))

print("all artifacts:", ray.get(nn.list_artifacts.remote()))
print("all nodes:", ray.get(nn.list_nodes.remote()))

print("node1 chunks:", ray.get(nn.list_node.remote("node1")))
print("node2 chunks:", ray.get(nn.list_node.remote("node2")))
print("node3 chunks:", ray.get(nn.list_node.remote("node3")))
print("node4 chunks:", ray.get(nn.list_node.remote("node4")))
print("node5 chunks:", ray.get(nn.list_node.remote("node5")))

read artifact1 after reupload: At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium 
voluptatum deleniti atque corrupti quos dolores et quas molestias excepturi sint occaecati cupiditate 
non provident, similique sunt in culpa qui officia deserunt mollitia animi, id est laborum et dolorum fuga.
locations artifact1: {'artifact1_chunk_0': ['node1', 'node2'], 'artifact1_chunk_1': ['node2', 'node3'], 'artifact1_chunk_2': ['node3', 'node4'], 'artifact1_chunk_3': ['node4', 'node5'], 'artifact1_chunk_4': ['node5', 'node1'], 'artifact1_chunk_5': ['node1', 'node2'], 'artifact1_chunk_6': ['node2', 'node3'], 'artifact1_chunk_7': ['node3', 'node4'], 'artifact1_chunk_8': ['node4', 'node5'], 'artifact1_chunk_9': ['node5', 'node1'], 'artifact1_chunk_10': ['node1', 'node2'], 'artifact1_chunk_11': ['node2', 'node3'], 'artifact1_chunk_12': ['node3', 'node4'], 'artifact1_chunk_13': ['node4', 'node5'], 'artifact1_chunk_14': ['node5', 'node1'], 'artifact1_chunk_15': ['node1