In [9]:
import ray
if ray.is_initialized:
    ray.shutdown()
ray.init(address='ray://localhost:10001', ignore_reinit_error=True)
CHUNK_SIZE = 1024
REPLICATION_FACTOR = 3

2025-05-11 16:04:38,103	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error, log_to_driver
    Ray: 2.44.1
    Python: 3.11.11
This process on Ray Client was started with:
    Ray: 2.44.1
    Python: 3.11.5



In [10]:
@ray.remote
class StorageNode:
    def __init__(self, node_id):
        self.node_id = node_id
        self.chunks = {}
        self.alive = True

    def store_chunk(self, artifact_id, chunk_idx, data):
        if not self.alive:
            raise RuntimeError(f"Node {self.node_id} is down")
        self.chunks.setdefault(artifact_id, {})[chunk_idx] = data
        return True

    def get_chunk(self, artifact_id, chunk_idx):
        return self.chunks.get(artifact_id, {}).get(chunk_idx)

    def delete_artifact(self, artifact_id):
        self.chunks.pop(artifact_id, None)
        return True

    def list_chunks(self):
        return {aid: list(m.keys()) for aid, m in self.chunks.items()}

    def set_status(self, alive: bool):
        self.alive = alive
        return self.alive

    def is_alive(self):
        return self.alive




In [11]:
@ray.remote
class NameNode:
    def __init__(self):
        self.metadata = {}
        self.locations = {}
        self.nodes = {}

    def register_node(self, node_id, actor_handle):
        self.nodes[node_id] = actor_handle
        return True

    def unregister_node(self, node_id):
        self.nodes.pop(node_id, None)
        return True

    def put_artifact(self, artifact_id, name, data):
        chunks = [data[i:i+CHUNK_SIZE] for i in range(0, len(data), CHUNK_SIZE)]
        self.metadata[artifact_id] = {'name': name, 'chunks': len(chunks)}
        self.locations[artifact_id] = {}
        node_ids = list(self.nodes.keys())
        for idx, chunk in enumerate(chunks):
            replicas = [node_ids[(idx + r) % len(node_ids)] for r in range(REPLICATION_FACTOR)]
            self.locations[artifact_id][idx] = replicas
            for nid in replicas:
                self.nodes[nid].store_chunk.remote(artifact_id, idx, chunk)
        return True

    def get_artifact(self, artifact_id):
        if artifact_id not in self.metadata:
            raise KeyError("Artifact not found")
        total = self.metadata[artifact_id]['chunks']
        result = []
        for idx in range(total):
            for nid in self.locations[artifact_id][idx]:
                node = self.nodes[nid]
                if ray.get(node.is_alive.remote()):
                    data = ray.get(node.get_chunk.remote(artifact_id, idx))
                    result.append(data)
                    break
            else:
                raise RuntimeError(f"No alive replica for chunk {idx}")
        return self.metadata[artifact_id]['name'], ''.join(result)

    def update_artifact(self, artifact_id, new_data):
        name = self.metadata[artifact_id]['name']
        self.delete_artifact(artifact_id)
        return self.put_artifact(artifact_id, name, new_data)

    def delete_artifact(self, artifact_id):
        for idx, nodes in self.locations.get(artifact_id, {}).items():
            for nid in nodes:
                if nid in self.nodes:
                    self.nodes[nid].delete_artifact.remote(artifact_id)
        self.metadata.pop(artifact_id, None)
        self.locations.pop(artifact_id, None)
        return True

    def list_metadata(self):
        return self.metadata

    def list_locations(self):
        return self.locations


In [13]:
name_node = NameNode.remote()
nodes = {}
for i in range(5):
    nid = f"node-{i}"
    node = StorageNode.remote(nid)
    nodes[nid] = node
    ray.get(name_node.register_node.remote(nid, node))

node_with_options = StorageNode.options(name="node-special", num_cpus=0.5).remote("node-special")
nodes['node-special'] = node_with_options
ray.get(name_node.register_node.remote("node-special", node_with_options))
# PUT
long_data = 'A' * 5000
ray.get(name_node.put_artifact.remote('art1', 'TestArtifact', long_data))
print('Metadata after PUT:', ray.get(name_node.list_metadata.remote()))
print('Locations after PUT:', ray.get(name_node.list_locations.remote()))

# GET
name, content = ray.get(name_node.get_artifact.remote('art1'))
print(f"GET: {name}, length={len(content)}")

# UPDATE
new_data = 'B' * 3000
ray.get(name_node.update_artifact.remote('art1', new_data))
print('Metadata after UPDATE:', ray.get(name_node.list_metadata.remote()))

# DELETE
ray.get(name_node.delete_artifact.remote('art1'))
print('Metadata after DELETE:', ray.get(name_node.list_metadata.remote()))

# Failover
ray.get(nodes['node-2'].set_status.remote(False))
ray.get(name_node.put_artifact.remote('art2', 'Artifact2', 'C'*2048))
print('Locations with failure:', ray.get(name_node.list_locations.remote()))
name2, content2 = ray.get(name_node.get_artifact.remote('art2'))
print(f"GET after failure: {name2}, length={len(content2)}")



Metadata after PUT: {'art1': {'name': 'TestArtifact', 'chunks': 5}}
Locations after PUT: {'art1': {0: ['node-0', 'node-1', 'node-2'], 1: ['node-1', 'node-2', 'node-3'], 2: ['node-2', 'node-3', 'node-4'], 3: ['node-3', 'node-4', 'node-special'], 4: ['node-4', 'node-special', 'node-0']}}
GET: TestArtifact, length=5000
Metadata after UPDATE: {'art1': {'name': 'TestArtifact', 'chunks': 3}}
Metadata after DELETE: {}
Locations with failure: {'art2': {0: ['node-0', 'node-1', 'node-2'], 1: ['node-1', 'node-2', 'node-3']}}
GET after failure: Artifact2, length=2048
