In [1]:
import json
import weakref
import json
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, cast, List

import redis


import cst_python as cst
from cst_python.core.entities import Memory, Mind

In [2]:
client = redis.Redis(decode_responses=True)
client.flushall()

True

In [3]:
class MemoryEncoder(json.JSONEncoder):
    def default(self, memory:cst.core.entities.Memory):
        return MemoryEncoder.to_dict(memory)
    
    @staticmethod
    def to_dict(memory:cst.core.entities.Memory):
        data = {
            "timestamp": memory.get_timestamp(),
            "evaluation": memory.get_evaluation(),
            "I": memory.get_info(),
            "name": memory.get_name(),
            "id": memory.get_id()
        }

        return data

In [4]:
class MemoryStorageCodelet:
    def __init__(self, mind:Mind, node_name:Optional[str]=None, mind_name:Optional[str]=None, request_timeout:float=500e-3) -> None:
        self._mind = mind
        self._request_timeout = request_timeout
        
        if mind_name is None:
            mind_name = "default_mind"
        self._mind_name = cast(str, mind_name)
        
        self._memories : weakref.WeakValueDictionary[str, Memory] = weakref.WeakValueDictionary()
        
        self._client = redis.Redis(decode_responses=True)
        self._pubsub = self._client.pubsub()
        self._pubsub_thread : redis.client.PubSubWorkerThread = self._pubsub.run_in_thread()

        if node_name is None:
            node_number = self._client.scard(f"{mind_name}:nodes")

            node_name = f"node{node_number}"
            while self._client.sismember(f"{mind_name}:nodes", node_name):
                node_number += 1
                node_name = f"node{node_number}"

        self._node_name = cast(str, node_name)

        self._client.sadd(f"{mind_name}:nodes", node_name)

        transfer_service_addr = f"{self._mind_name}:nodes:{node_name}:transfer_memory"
        self._pubsub.subscribe(**{transfer_service_addr:self.transfer_memory})

        transfer_done_addr = f"{self._mind_name}:nodes:{node_name}:transfer_done"
        self._pubsub.subscribe(**{transfer_done_addr:self.notify_transfer})

        self._last_update : dict[str, float] = {}
        self._waiting_retrieve : set[str] = set()
        
        self._retrieve_executor = ThreadPoolExecutor(3)

        self._waiting_request_events : dict[str, threading.Event] = {}

        self._request = None

    def proc(self) -> None:
        
        #Check new memories

        mind_memories = {}
        for memory in self._mind.raw_memory.all_memories:
            if memory.get_name() == "": #No name -> No MS
                continue

            mind_memories[memory.get_name()] = memory

        mind_memories_names = set(mind_memories.keys())
        memories_names = set(self._memories.keys())

        #Check only not here (memories_names not in mind should be garbage collected)
        difference = mind_memories_names - memories_names
        for memory_name in difference:
            memory : Memory = mind_memories[memory_name]
            self._memories[memory_name] = memory

            if self._client.exists(f"{self._mind_name}:memories:{memory_name}"):
                self._retrieve_executor.submit(self.retrieve_memory, memory)
                
            else: #Send impostor with owner
                memory_impostor = {"name":memory.get_name(),
                                   "evalution" : 0.0,
                                   "I": "",
                                   "id" : "0.0",
                                   "owner": self._node_name}
                
                self._client.hset(f"{self._mind_name}:memories:{memory_name}", mapping=memory_impostor)

            subscribe_func = lambda message : self.update_memory(memory_name)
            self._pubsub.subscribe(**{f"{self._mind_name}:memories:{memory_name}:update":subscribe_func})

        #Update memories
        to_update = self._last_update.keys()
        for memory_name in to_update:
            if memory_name not in self._memories:
                del self._last_update[memory_name]
                continue

            memory = self._memories[memory_name]
            if memory.get_timestamp() > self._last_update[memory_name]:
                self.update_memory(memory_name)

    def transfer_memory(self, message) -> None:
        request = json.loads(message["data"])
        
        memory_name = request["memory_name"]
        requesting_node = request["node"]

        print(self._node_name, "Tranfering", memory_name)

        if memory_name in self._memories:
            memory = self._memories[memory_name]
        else:
            memory = cst.MemoryObject()
            memory.set_name(memory_name)
        
        self.send_memory(memory)

        response_addr = f"{self._mind_name}:nodes:{requesting_node}:transfer_done"
        self._client.publish(response_addr, memory_name)


    def send_memory(self, memory:Memory) -> None:
        memory_name = memory.get_name()
        print(self._node_name, "Send memory", memory_name)
        
        memory_dict = MemoryEncoder.to_dict(memory)
        memory_dict["I"] = json.dumps(memory_dict["I"])
        memory_dict["owner"] = ""


        self._client.hset(f"{self._mind_name}:memories:{memory_name}", mapping=memory_dict)
        self._client.publish(f"{self._mind_name}:memories:{memory_name}:update", "")

        self._last_update[memory_name] = memory.get_timestamp()
        
    def update_memory(self, memory_name:str) -> None:
        print(self._node_name, "Updating memory", memory_name)

        timestamp = float(self._client.hget(f"{self._mind_name}:memories:{memory_name}", "timestamp"))
        memory = self._memories[memory_name]
        memory_timestamp = memory.get_timestamp()
        
        if memory_timestamp < timestamp:
            self._retrieve_executor.submit(self.retrieve_memory, memory)

        elif memory_timestamp> timestamp:
            self.send_memory(memory)

        self._last_update[memory_name] = memory.get_timestamp()

    def retrieve_memory(self, memory:Memory) -> None:
        memory_name = memory.get_name()

        print(self._node_name, "Retrieve", memory_name)

        if memory_name in self._waiting_retrieve:
            return
        self._waiting_retrieve.add(memory_name)

        memory_dict = self._client.hgetall(f"{self._mind_name}:memories:{memory_name}")

        if memory_dict["owner"] != "":
            event = threading.Event()
            self._waiting_request_events[memory_name] = event
            self.request_memory(memory_name, memory_dict["owner"])

            if not event.wait(timeout=self._request_timeout):
                print(self._node_name, "Request failed", memory_name)
                #Request failed
                self.send_memory(memory)
                return 
            
            memory_dict = self._client.hgetall(f"{self._mind_name}:memories:{memory_name}")

        memory.set_evaluation(float(memory_dict["evaluation"]))
        memory.set_id(float(memory_dict["id"]))

        info_json = memory_dict["I"]
        info = json.loads(info_json)

        print(self._node_name, "INFO", info, info_json)

        memory.set_info(info)

        self._last_update[memory_name] = memory.get_timestamp()

        self._waiting_retrieve.remove(memory_name)

    def request_memory(self, memory_name:str, owner_name:str):
        print(self._node_name, "Requesting", memory_name)

        request_addr = f"{self._mind_name}:nodes:{owner_name}:transfer_memory"
        
        request_dict = {"memory_name":memory_name, "node":self._node_name}
        request = json.dumps(request_dict)
        self._client.publish(request_addr, request)

    def notify_transfer(self, message:str) -> None:
        memory_name = message["data"]
        if memory_name in self._waiting_request_events:
            event = self._waiting_request_events[memory_name]
            event.set()
            del self._waiting_request_events[memory_name]

    def __del__(self) -> None:
        self._pubsub_thread.stop()

In [5]:
mind = cst.Mind()
memory1 = mind.create_memory_object("Memory1", "")

In [6]:
ms_codelet = MemoryStorageCodelet(mind, "node0")

In [7]:
ms_codelet.proc()

In [8]:
client.smembers("default_mind:nodes")

{'node0'}

In [9]:
client.hgetall("default_mind:memories:Memory1")

{'name': 'Memory1', 'evalution': '0.0', 'I': '', 'id': '0.0', 'owner': 'node0'}

In [10]:
mind2 = cst.Mind()
mind2_memory1 = mind2.create_memory_object("Memory1", "")
mind2_ms_codelet = MemoryStorageCodelet(mind2)

In [11]:
mind2_memory1

MemoryObject [idmemoryobject=0, timestamp=1725654264.298408, evaluation=0.0, I=, name=Memory1]

In [12]:
mind2_ms_codelet._node_name

'node1'

In [13]:
client.smembers("default_mind:nodes")

{'node0', 'node1'}

In [14]:
mind2_ms_codelet.proc()

node1 Retrieve Memory1
node1 Requesting Memory1
node0 Tranfering Memory1
node0 Send memory Memory1


In [15]:
mind2_memory1

node1 Updating memory Memory1
node0 Updating memory Memory1
node1 Send memory Memory1
node0 Updating memory Memory1
node1 Updating memory Memory1
node0 Retrieve Memory1
node0 INFO  ""
node1 INFO  ""


MemoryObject [idmemoryobject=0.0, timestamp=1725654264.7690487, evaluation=0.0, I=, name=Memory1]

In [16]:
client.hgetall("default_mind:memories:Memory1")

{'name': 'Memory1',
 'evalution': '0.0',
 'I': '""',
 'id': '0',
 'owner': '',
 'timestamp': '1725654264.298408',
 'evaluation': '0.0'}

In [17]:
memory1.set_info("INFO")

-1

In [18]:
ms_codelet.proc()

node0 Updating memory Memory1
node0 Send memory Memory1
node1 Updating memory Memory1
node0 Updating memory Memory1
node1 Retrieve Memory1


In [19]:
client.hgetall("default_mind:memories:Memory1")

node1 INFO INFO "INFO"


{'name': 'Memory1',
 'evalution': '0.0',
 'I': '"INFO"',
 'id': '0.0',
 'owner': '',
 'timestamp': '1725654264.794752',
 'evaluation': '0.0'}

In [20]:
mind2_memory1

MemoryObject [idmemoryobject=0.0, timestamp=1725654264.8138657, evaluation=0.0, I=INFO, name=Memory1]

In [21]:
mind2_ms_codelet.proc()

In [22]:
client.hgetall("default_mind:memories:Memory1")

{'name': 'Memory1',
 'evalution': '0.0',
 'I': '"INFO"',
 'id': '0.0',
 'owner': '',
 'timestamp': '1725654264.794752',
 'evaluation': '0.0'}

In [23]:
mind2_memory1.set_info("INFO2")
mind2_ms_codelet.proc()

node1 Updating memory Memory1
node1 Send memory Memory1
node0 Updating memory Memory1
node1 Updating memory Memory1
node0 Retrieve Memory1
node0 INFO INFO2 "INFO2"


In [24]:
client.hgetall("default_mind:memories:Memory1")

{'name': 'Memory1',
 'evalution': '0.0',
 'I': '"INFO2"',
 'id': '0.0',
 'owner': '',
 'timestamp': '1725654302.9360394',
 'evaluation': '0.0'}

In [25]:
memory1

MemoryObject [idmemoryobject=0.0, timestamp=1725654302.943039, evaluation=0.0, I=INFO2, name=Memory1]