<a href="https://colab.research.google.com/github/devansh-29-glitch/mindos/blob/main/MindOS_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install rich tqdm
import os, sys, time, threading, queue, random, json

In [None]:
class Message:
    def __init__(self, topic, payload=None, source=None):
        self.topic = topic
        self.payload = payload
        self.source = source

class NeuralBus:
    def __init__(self):
        self.subscribers = {}

    def subscribe(self, topic):
        q = queue.Queue()
        self.subscribers.setdefault(topic, []).append(q)
        return q

    def publish(self, msg):
        for q in self.subscribers.get(msg.topic, []):
            q.put(msg)

# Shared topics
class topics:
    TELEMETRY_HEARTBEAT = "telemetry.heartbeat"
    INTERNAL_THOUGHT = "internal.thought"

In [None]:
from datetime import datetime

class BaseModule(threading.Thread):
    HEARTBEAT_EVERY = 2.0

    def __init__(self, bus, name):
        super().__init__(daemon=True)
        self.name = name
        self.bus = bus
        # ✅ IMPORTANT: Call Event() with parentheses
        self._should_stop = threading.Event()
        self._inbox = bus.subscribe(topics.INTERNAL_THOUGHT)

    def publish(self, topic, payload):
        self.bus.publish(Message(topic=topic, payload=payload, source=self.name))

    def run(self):
        print(f"[{datetime.now().strftime('%H:%M:%S')}] {self.name} starting...")
        # ✅ use .is_set() to check the event status
        while not self._should_stop.is_set():
            self.publish(topics.TELEMETRY_HEARTBEAT, {"module": self.name, "ts": time.time()})
            time.sleep(self.HEARTBEAT_EVERY)
        print(f"[{datetime.now().strftime('%H:%M:%S')}] {self.name} stopped.")

    def stop(self):
        # ✅ Trigger the event to stop the loop
        self._should_stop.set()


In [None]:
class Perception(BaseModule):
    def run(self):
        print(f"{self.name} sensing environment...")
        time.sleep(1.5)
        self.publish(topics.INTERNAL_THOUGHT, "Perceived: external stimulus")
        super().run()

class Memory(BaseModule):
    def run(self):
        q = self.bus.subscribe(topics.INTERNAL_THOUGHT)
        while not self._should_stop.is_set():
            try:
                msg = q.get(timeout=1)
                print(f"{self.name} encoding memory →", msg.payload)
            except queue.Empty:
                pass
        super().run()

class Reasoning(BaseModule):
    def run(self):
        q = self.bus.subscribe(topics.INTERNAL_THOUGHT)
        while not self._should_stop.is_set():
            try:
                msg = q.get(timeout=1)
                idea = f"Inferred meaning of '{msg.payload}'"
                print(f"{self.name} thinking → {idea}")
            except queue.Empty:
                pass
        super().run()

class Emotion(BaseModule):
    def run(self):
        while not self._should_stop.is_set():
            mood = random.choice(["calm", "curious", "focused", "tense"])
            print(f"{self.name} feels {mood}")
            time.sleep(2.5)
        super().run()

class Action(BaseModule):
    def run(self):
        while not self._should_stop.is_set():
            act = random.choice(["observe", "store memory", "analyze", "react"])
            print(f"{self.name} executes: {act}")
            time.sleep(2.0)
        super().run()


In [None]:
class Kernel:
    def __init__(self):
        self.bus = NeuralBus()
        self.modules = [
            Perception(self.bus, "Perception"),
            Memory(self.bus, "Memory"),
            Reasoning(self.bus, "Reasoning"),
            Emotion(self.bus, "Emotion"),
            Action(self.bus, "Action")
        ]

    def boot(self):
        print("────────────────────────────────────────── MindOS Booting ──────────────────────────────────────────")
        for m in self.modules:
            m.start()
        print("All modules initialized.\n")

    def shutdown(self):
        print("\n────────────────────────────────────────── MindOS Shutdown ──────────────────────────────────────────")
        for m in self.modules: m.stop()
        for m in self.modules: m.join()
        print("All modules stopped. MindOS offline.")


In [None]:

kernel = Kernel()
kernel.boot()

print("\n🧠 Synthetic cognition running...\n")
time.sleep(10)  # run for 10 seconds

kernel.shutdown()

In [None]:
print("\n✅ MindOS demo completed.")
print("This lightweight kernel simulated perception → memory → reasoning → emotion → action loops in real time.")
print("Each module ran as a concurrent thread communicating through the neural bus —")
print("an architectural model of predictive coding and distributed cognition.")
