Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 0 additions & 111 deletions examples/basic_coordinator_node.py

This file was deleted.

76 changes: 76 additions & 0 deletions examples/coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logging
from rich.logging import RichHandler
from pydantic import Field
from rid_lib.types import KoiNetNode, KoiNetEdge
from koi_net.config import NodeConfig, KoiNetConfig
from koi_net.protocol.node import NodeProfile, NodeProvides, NodeType
from koi_net import NodeInterface
from koi_net.context import HandlerContext
from koi_net.processor.handler import HandlerType
from koi_net.processor.knowledge_object import KnowledgeObject
from koi_net.protocol.event import Event, EventType
from koi_net.protocol.edge import EdgeType, generate_edge_bundle

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[RichHandler()]
)

logging.getLogger("koi_net").setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)

class CoordinatorConfig(NodeConfig):
koi_net: KoiNetConfig = Field(default_factory = lambda:
KoiNetConfig(
node_name="coordinator",
node_profile=NodeProfile(
node_type=NodeType.FULL,
provides=NodeProvides(
event=[KoiNetNode, KoiNetEdge],
state=[KoiNetNode, KoiNetEdge]
)
),
cache_directory_path=".coordinator_rid_cache",
event_queues_path="coordinator_event_queues.json",
private_key_pem_path="coordinator_priv_key.pem"
)
)

node = NodeInterface(
config=CoordinatorConfig.load_from_yaml("coordinator_config.yaml"),
use_kobj_processor_thread=True
)

@node.processor.pipeline.register_handler(HandlerType.Network, rid_types=[KoiNetNode])
def handshake_handler(ctx: HandlerContext, kobj: KnowledgeObject):
logger.info("Handling node handshake")

# only respond if node declares itself as NEW
if kobj.event_type != EventType.NEW:
return

logger.info("Sharing this node's bundle with peer")
identity_bundle = ctx.effector.deref(ctx.identity.rid)
ctx.event_queue.push_event_to(
event=Event.from_bundle(EventType.NEW, identity_bundle),
node=kobj.rid,
flush=True
)

logger.info("Proposing new edge")
# defer handling of proposed edge

edge_bundle = generate_edge_bundle(
source=kobj.rid,
target=ctx.identity.rid,
edge_type=EdgeType.WEBHOOK,
rid_types=[KoiNetNode, KoiNetEdge]
)

ctx.handle(rid=edge_bundle.rid, event_type=EventType.FORGET)
ctx.handle(bundle=edge_bundle)

if __name__ == "__main__":
node.server.run()
25 changes: 7 additions & 18 deletions examples/basic_partial_node.py → examples/partial.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import time
import logging
from pydantic import Field
from rich.logging import RichHandler
from koi_net import NodeInterface
from koi_net.protocol.node import NodeProfile, NodeType
from koi_net.config import NodeConfig, KoiNetConfig, NodeContact
from koi_net.config import NodeConfig, KoiNetConfig

logging.basicConfig(
level=logging.INFO,
Expand All @@ -24,25 +23,15 @@ class PartialNodeConfig(NodeConfig):
node_profile=NodeProfile(
node_type=NodeType.PARTIAL
),
cache_directory_path=".basic_partial_rid_cache",
event_queues_path="basic_partial_event_queues.json"
cache_directory_path=".partial_rid_cache",
event_queues_path="partial_event_queues.json",
private_key_pem_path="partial_priv_key.pem"
)
)


node = NodeInterface(
config=PartialNodeConfig.load_from_yaml("basic_partial_config.yaml")
config=PartialNodeConfig.load_from_yaml("partial_config.yaml")
)


node.start()

while True:
neighbors = node.resolver.poll_neighbors()
for node_rid in neighbors:
events = neighbors[node_rid]
for event in events:
node.processor.handle(event=event, source=node_rid)
node.processor.flush_kobj_queue()

time.sleep(5)
if __name__ == "__main__":
node.poller.run()
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "koi-net"
version = "1.1.0-beta.1"
version = "1.1.0-beta.2"
description = "Implementation of KOI-net protocol in Python"
authors = [
{name = "Luke Miller", email = "luke@block.science"}
Expand All @@ -19,15 +19,15 @@ dependencies = [
"pydantic>=2.10.6",
"ruamel.yaml>=0.18.10",
"python-dotenv>=1.1.0",
"cryptography>=45.0.3"
"cryptography>=45.0.3",
"fastapi>=0.115.12",
"uvicorn>=0.34.2"
]

[project.optional-dependencies]
dev = ["twine>=6.0", "build"]
examples = [
"rich",
"fastapi",
"uvicorn"
]

[project.urls]
Expand Down
1 change: 1 addition & 0 deletions src/koi_net/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class KoiNetConfig(BaseModel):
cache_directory_path: str = ".rid_cache"
event_queues_path: str = "event_queues.json"
private_key_pem_path: str = "priv_key.pem"
polling_interval: int = 5

first_contact: NodeContact = Field(default_factory=NodeContact)

Expand Down
84 changes: 34 additions & 50 deletions src/koi_net/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
import httpx
from koi_net.protocol.node import NodeType
from rid_lib.ext import Cache
from .network.resolver import NetworkResolver
from .network.event_queue import NetworkEventQueue
Expand All @@ -17,6 +17,9 @@
from .config import NodeConfig
from .context import HandlerContext, ActionContext
from .effector import Effector
from .server import NodeServer
from .lifecycle import NodeLifecycle
from .poller import NodePoller
from . import default_actions

logger = logging.getLogger(__name__)
Expand All @@ -32,26 +35,24 @@ class NodeInterface:
graph: NetworkGraph
processor: ProcessorInterface
secure: Secure
server: NodeServer

use_kobj_processor_thread: bool

def __init__(
self,
self,
config: NodeConfig,
use_kobj_processor_thread: bool = False,

handlers: list[KnowledgeHandler] | None = None,

cache: Cache | None = None,
processor: ProcessorInterface | None = None
):
self.config = config
self.cache = cache or Cache(
directory_path=self.config.koi_net.cache_directory_path
)

self.identity = NodeIdentity(config=self.config)

self.effector = Effector(cache=self.cache)

self.graph = NetworkGraph(
Expand Down Expand Up @@ -146,47 +147,30 @@ def __init__(
self.effector.set_processor(self.processor)
self.effector.set_resolver(self.resolver)
self.effector.set_action_context(self.action_context)


def start(self) -> None:
"""Starts a node, call this method first.

Starts the processor thread (if enabled). Loads event queues into memory. Generates network graph from nodes and edges in cache. Processes any state changes of node bundle. Initiates handshake with first contact (if provided) if node doesn't have any neighbors.
"""
if self.use_kobj_processor_thread:
logger.info("Starting processor worker thread")
self.processor.worker_thread.start()

# self.network._load_event_queues()
self.graph.generate()

# refresh to reflect changes (if any) in config.yaml
self.effector.deref(self.identity.rid, refresh_cache=True)

logger.debug("Waiting for kobj queue to empty")
if self.use_kobj_processor_thread:
self.processor.kobj_queue.join()
else:
self.processor.flush_kobj_queue()
logger.debug("Done")

if not self.graph.get_neighbors() and self.config.koi_net.first_contact.rid:
logger.debug(f"I don't have any neighbors, reaching out to first contact {self.config.koi_net.first_contact.rid!r}")

self.actor.handshake_with(self.config.koi_net.first_contact.rid)


def stop(self):
"""Stops a node, call this method last.

Finishes processing knowledge object queue. Saves event queues to storage.
"""
logger.info("Stopping node...")

if self.use_kobj_processor_thread:
logger.info(f"Waiting for kobj queue to empty ({self.processor.kobj_queue.unfinished_tasks} tasks remaining)")
self.processor.kobj_queue.join()
else:
self.processor.flush_kobj_queue()

# self.network._save_event_queues()

self.lifecycle = NodeLifecycle(
config=self.config,
identity=self.identity,
graph=self.graph,
processor=self.processor,
effector=self.effector,
actor=self.actor,
use_kobj_processor_thread=use_kobj_processor_thread
)

# if self.config.koi_net.node_profile.node_type == NodeType.FULL:
self.server = NodeServer(
config=self.config,
lifecycle=self.lifecycle,
secure=self.secure,
processor=self.processor,
event_queue=self.event_queue,
response_handler=self.response_handler
)

self.poller = NodePoller(
processor=self.processor,
lifecycle=self.lifecycle,
resolver=self.resolver,
config=self.config
)
Loading