Krzysztof Swędzioł - Ray, File Management System

Instalacja wszystkiego co potrzebne :

In [5]:
!pip install -q "ray[default]"

Wszystkie potrzebne importy:

In [18]:
import ray
import math
import random
import asyncio
ray.shutdown()
ray.init()

2025-05-07 08:02:15,123	INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8266 [39m[22m


0,1
Python version:,3.11.12
Ray version:,2.45.0
Dashboard:,http://127.0.0.1:8266


Konfiguracja:

In [19]:
CHUNK_SIZE = 5
REPLICATION_FACTOR = 2

Wszystkie potrzebne klasy:

In [20]:
@ray.remote(max_restarts=-1, max_task_retries=-1, max_concurrency=8)
class NameNode:
  def __init__(self, poll_interval: float = 1.0):
    self._poll_interval = poll_interval
    self.storage_nodes = []
    self.artefacts = {}                   #{name : [storage_nodes], chunks_amount}
    asyncio.get_event_loop().create_task(self._monitor_loop())

  async def _monitor_loop(self):
    while True:
      await asyncio.sleep(self._poll_interval)
      self.monitor_storage_nodes()

  def add_storage_node(self, storage_node):
    if storage_node in self.storage_nodes:
      print("The storage node is already present, you can't add it again")
    else:
      self.storage_nodes.append(storage_node)

  def remove_storage_node(self, storage_node):
    if storage_node in self.storage_nodes:
      self.storage_nodes.remove(storage_node)
      self.clear_after_storage_node(storage_node)
    else:
      print("The storage node is not present, you can't remove it")

  def clear_after_storage_node(self, storage_node):
    for key in self.artefacts:
      curr_storages = self.artefacts[key][0]
      if storage_node in curr_storages:
        curr_storages.remove(storage_node)
        self.artefacts[key][0] = curr_storages

  def get_two_healthy_nodes(self):
    healthy_nodes = []
    for node in self.storage_nodes:
      try:
        if not ray.get(node.is_broken_node.remote()):
          healthy_nodes.append(node)
      except:
        continue

    if len(healthy_nodes) < 2:
      return None, None

    chosen = random.sample(healthy_nodes, 2)
    return chosen[0], chosen[1]

  def get_artefact(self, name):
    if name not in self.artefacts:
        print("There is no artefact with such name")
        return None

    chunks_amount = self.artefacts[name][1]
    chunk_map = {}

    valid_nodes = []
    for node in self.artefacts[name][0]:
        try:
            if ray.get(node.is_broken_node.remote()) == False:
                valid_nodes.append(node)
        except:
            print("Error checking node status")

    if not valid_nodes:
        print("No available nodes to reconstruct artefact.")
        return None

    refs = [node.get_chunks.remote(name) for node in valid_nodes]
    try:
        results = ray.get(refs)
    except:
        print("Error retrieving chunks from storage nodes.")
        return None

    for chunk_list in results:
        sorted_chunks = sorted(chunk_list, key=lambda x: x[1])
        for chunk_data, chunk_id in sorted_chunks:
            if chunk_id not in chunk_map:
                chunk_map[chunk_id] = chunk_data

    result_list = []
    for i in range(chunks_amount):
        if i in chunk_map:
            result_list.append(chunk_map[i])
        else:
            print(f"Missing chunk {i}!")
            return None

    return "".join(result_list)

  def add_artefact(self, name, artefact):
    if name in self.artefacts:
      print("There is already an artefact with such name")
      return

    if len(self.storage_nodes) < 2:
      print("Not enough storage nodes to replicate artefact")
      return

    chunks_amount = math.ceil(len(artefact) / CHUNK_SIZE)
    self.artefacts[name] = [[], chunks_amount]
    curr_chunk_id = 0

    iterator = 0
    while iterator < len(artefact):
      curr_chunk = artefact[iterator:iterator + CHUNK_SIZE]
      random_node1, random_node2 = self.get_two_healthy_nodes()
      if not random_node1 or not random_node2:
          print("Not enough healthy nodes to replicate chunk.")
          return
      try:
        random_node1.add_chunk.remote(name, [curr_chunk, curr_chunk_id])
        random_node2.add_chunk.remote(name, [curr_chunk, curr_chunk_id])
      except:
        print("Error sending chunk to storage node")
      self.artefacts[name][0].append(random_node1)
      self.artefacts[name][0].append(random_node2)
      curr_chunk_id += 1
      iterator += CHUNK_SIZE

  def remove_artefact(self, name):
    if name not in self.artefacts:
      print("There is no artefact with such name")
      return

    curr_storages = self.artefacts[name][0]
    for storage in curr_storages:
      try:
        storage.remove_chunks.remote(name)
      except:
        print("Failed to notify node to remove chunk")
    del self.artefacts[name]

  def monitor_storage_nodes(self):
    for storage_node in self.storage_nodes:
      try:
        if ray.get(storage_node.is_broken_node.remote()):
          self.remove_storage_node(storage_node)
      except:
        print("Failed to check or remove broken storage node")

  def listen_for_commands(self):
    print("Available commands: add_artefact, remove_artefact, get_artefact, break_node, list_chunks")

  def list_chunks(self):
    for i, node in enumerate(self.storage_nodes):
      try:
        chunks = ray.get(node.get_all_chunks.remote())
        print(f"Node {i+1} ({ray.get(node.get_name.remote())}):")
        if chunks:
          for artefact, chunk_list in chunks.items():
            print(f"  {artefact}: {[chunk[1] for chunk in chunk_list]}")
        else:
          print("  brak chunków")
      except:
        print(f"Node {i+1}: błąd podczas pobierania danych")

In [21]:
@ray.remote
class StorageNode:
    def __init__(self, name):
        self.name = name
        self.chunks = {}  # {name: [[chunk1, id1], [chunk2, id2], ...]}
        self.is_broken = False

    def add_chunk(self, name, chunk):
        if name not in self.chunks:
            self.chunks[name] = []
        self.chunks[name].append(chunk)

    def remove_chunks(self, name):
        if name in self.chunks:
            del self.chunks[name]

    def get_chunks(self, name):
        return self.chunks.get(name, [])

    def fill_artefact_gaps(self, artefact_name, resulting_array):
        chunks = self.chunks.get(artefact_name, [])
        for chunk in chunks:
            chunk_id = chunk[1]
            if resulting_array[chunk_id] is None:
                resulting_array[chunk_id] = chunk[0]
        return resulting_array

    def is_broken_node(self):
        return self.is_broken

    def break_node(self):
        self.is_broken = True

    def get_all_chunks(self):
        return self.chunks

    def get_name(self):
        return self.name


Inicjalizacja wszystkiego :

In [22]:
nameNode = NameNode.remote()

storageNode1 = StorageNode.remote("storageNode1")
storageNode2 = StorageNode.remote("storageNode2")
storageNode3 = StorageNode.remote("storageNode3")
storageNode4 = StorageNode.remote("storageNode4")

ray.get(nameNode.add_storage_node.remote(storageNode1))
ray.get(nameNode.add_storage_node.remote(storageNode2))
ray.get(nameNode.add_storage_node.remote(storageNode3))
ray.get(nameNode.add_storage_node.remote(storageNode4))

storage_nodes = [storageNode1, storageNode2, storageNode3, storageNode4]


Klient i prezentacja działania programu:

In [23]:
def client_interface(name_node, storage_nodes):
    print("Dostępne komendy:")
    print(" - add_artefact <nazwa> <dane>")
    print(" - remove_artefact <nazwa>")
    print(" - get_artefact <nazwa>")
    print(" - list_chunks")
    print(" - break_node <numer_od_1_do_n>")
    print(" - exit")
    print("")

    while True:
        cmd = input(">>> ").strip()
        parts = cmd.split()

        if not parts:
            continue

        match parts[0]:
            case "add_artefact":
                if len(parts) < 3:
                    print("Użycie: add_artefact <nazwa> <dane>")
                else:
                    name = parts[1]
                    data = " ".join(parts[2:])
                    ray.get(name_node.add_artefact.remote(name, data))

            case "remove_artefact":
                if len(parts) != 2:
                    print("Użycie: remove_artefact <nazwa>")
                else:
                    ray.get(name_node.remove_artefact.remote(parts[1]))

            case "get_artefact":
                if len(parts) != 2:
                    print("Użycie: get_artefact <nazwa>")
                else:
                    result = ray.get(name_node.get_artefact.remote(parts[1]))
                    if result:
                        print("Zawartość:", result)

            case "list_chunks":
                ray.get(name_node.list_chunks.remote())

            case "break_node":
                if len(parts) != 2:
                    print("Użycie: break_node <numer_od_1_do_n>")
                else:
                    try:
                        index = int(parts[1]) - 1
                        node = storage_nodes[index]
                        ray.get(node.break_node.remote())
                        print(f" Node {index+1} został uszkodzony.")
                    except:
                        print("Nieprawidłowy numer node'a.")

            case "exit":
                print("Zamykanie klienta.")
                break

            case _:
                print("Nieznana komenda.")

client_interface(nameNode, storage_nodes)

Dostępne komendy:
 - add_artefact <nazwa> <dane>
 - remove_artefact <nazwa>
 - get_artefact <nazwa>
 - list_chunks
 - break_node <numer_od_1_do_n>
 - exit

>>> add_artefact cos skibidibidada
>>> get_artefact cos
Zawartość: skibidibidada
>>> list_chunks
[36m(NameNode pid=26480)[0m Node 1 (storageNode1):
[36m(NameNode pid=26480)[0m   cos: [0, 2]
[36m(NameNode pid=26480)[0m Node 2 (storageNode2):
[36m(NameNode pid=26480)[0m   cos: [1]
[36m(NameNode pid=26480)[0m Node 3 (storageNode3):
[36m(NameNode pid=26480)[0m   cos: [0, 2]
[36m(NameNode pid=26480)[0m Node 4 (storageNode4):
[36m(NameNode pid=26480)[0m   cos: [1]
>>> break_node 1
 Node 1 został uszkodzony.
>>> list_chunks
[36m(NameNode pid=26480)[0m Node 1 (storageNode2):
[36m(NameNode pid=26480)[0m   cos: [1]
[36m(NameNode pid=26480)[0m Node 2 (storageNode3):
[36m(NameNode pid=26480)[0m   cos: [0, 2]
[36m(NameNode pid=26480)[0m Node 3 (storageNode4):
[36m(NameNode pid=26480)[0m   cos: [1]
>>> get_artefact cos